Deprecated old mastodon toot sync script

This commit is contained in:
Brandon Rozek 2022-10-08 01:08:15 -04:00
parent a32e0e61ae
commit 2ed857eccc
2 changed files with 187 additions and 297 deletions

View file

@ -1,110 +1,214 @@
#!/usr/bin/env python
"""
Script to update a stored
copy of Mastodon toots
Script to create Hugo markdown
files from Mastodon Toots
"""
from urllib import request
from glob import glob
from http.client import HTTPResponse
from typing import Optional
from pathlib import Path
from typing import Any, Dict, Optional
import json
import os
import math
import sys
TOOT_SAVE_FILE = "static/data/toots.json"
BACKUP_LOCATION = "static/data/toots.backup.json"
TOOT_CONTENT_LOCATION = "content/toots"
SERVER="https://fosstodon.org"
# Quick way to find user id: https://prouser123.me/mastodon-userid-lookup/
MUID=108219415927856966
# Server default (when < 0) is 20
RETRIEVE_NUM_TOOTS=-1
toots_data = []
# Read in former toot data
has_save = False
try:
with open(TOOT_SAVE_FILE, "r", encoding="UTF-8") as f:
toots_data = json.load(f)
has_save = True
print("Successfully read saved toot data")
except OSError:
print("Unable to read saved toot data")
except Exception:
print("Unable to parse saved toot data")
# Check JSON format...
if not isinstance(toots_data, list):
print("Unexpected JSON format in saved toot data, should be of type list.")
else:
has_save = True
RETRIEVE_NUM_TOOTS=1000
MAX_TOOTS_PER_QUERY=40 # Cannot change (server default)
MAX_TOOT_ID=-1
# Present the user the ability to continue without save data
if not has_save:
user_input = input("Continue without saved data? (y/n) ")
if user_input != "y":
sys.exit(-1)
def retrieve_toots_from_server():
"""
Grabs toots from Mastodon server
"""
global MAX_TOOT_ID
server_data = []
for _ in range(math.ceil(RETRIEVE_NUM_TOOTS // MAX_TOOTS_PER_QUERY)):
# Grab toots from Mastodon
limit_param = "?limit=" + str(RETRIEVE_NUM_TOOTS) \
if RETRIEVE_NUM_TOOTS > 0 else "?"
max_id = "&max_id=" + str(MAX_TOOT_ID) \
if MAX_TOOT_ID > 0 else ""
url = SERVER + "/api/v1/accounts/" + str(MUID) + "/statuses" + limit_param + max_id
response: Optional[HTTPResponse] = None
try:
response = request.urlopen(url)
except Exception:
print("Unable to grab toots from Mastodon.")
if response is None:
sys.exit(-1)
# Parse server response
server_data_part: Optional[list] = None
try:
server_data_part = json.loads(response.read())
except Exception:
print("Malformed JSON response from server.")
if server_data is None:
sys.exit(-1)
if not isinstance(server_data_part, list):
print("Unexpected JSON response, should be of form list.")
sys.exit(-1)
# No more to retrieve
if len(server_data_part) == 0:
break
print(f"Retrieved {len(server_data_part)} toots from server")
server_data.extend(server_data_part)
MAX_TOOT_ID = int(min(server_data_part, key=lambda p: int(p['id']))['id'])
print(f"Successfully grabbed a total of {len(server_data)} toots from server")
return server_data
# Parse former toot ids
saved_toot_ids = set()
for toot in toots_data:
if 'id' in toot:
saved_toot_ids.add(toot['id'])
def findall(p, s):
"""
Yields all the positions of
the pattern p in the string s.
Source: https://stackoverflow.com/a/34445090
"""
i = s.find(p)
while i != -1:
yield i
i = s.find(p, i+1)
# Grab toots from Mastodon
limit_param = "?limit=" + str(RETRIEVE_NUM_TOOTS) \
if RETRIEVE_NUM_TOOTS > 0 else ""
url = SERVER + "/api/v1/accounts/" + str(MUID) + "/statuses" + limit_param
response: Optional[HTTPResponse] = None
def read_json_frontmatter(markdown_contents) -> Optional[Dict[Any, Any]]:
"""
Take the contents from a Hugo markdown
file and read the JSON frontmatter if it
exists.
"""
front_matter_indices = list(findall('---', markdown_contents))
if len(front_matter_indices) < 2:
return None
front_matter = markdown_contents[(front_matter_indices[0] + 3):front_matter_indices[1]]
front_matter_json = None
try:
front_matter_json = json.loads(front_matter)
except Exception:
pass
if not isinstance(front_matter_json, dict):
front_matter_json = None
front_matter_json['content'] = markdown_contents[front_matter_indices[1] + 19:-17]
return front_matter_json
try:
response = request.urlopen(url)
except Exception:
print("Unable to grab toots from Mastodon.")
if response is None:
sys.exit(-1)
# Parse server response
server_data: Optional[list] = None
try:
server_data = json.loads(response.read())
except Exception:
print("Malformed JSON response from server.")
if server_data is None:
sys.exit(-1)
if not isinstance(server_data, list):
print("Unexpected JSON response, should be of form list.")
sys.exit(-1)
def reformat_toot(toot_json):
"""
Takes a toot_json and
slightly modifies it to match
some of the fields Hugo expects.
"""
# Turn URL -> Syndication
toot_url = toot_json['url']
del toot_json['uri']
del toot_json['url']
toot_json['syndication'] = toot_url
# Turn Created At -> Date
toot_date = toot_json['created_at']
del toot_json['created_at']
toot_json['date'] = toot_date
# Strip out highly dynamic account information
del toot_json['account']['locked']
del toot_json['account']['bot']
del toot_json['account']['discoverable']
del toot_json['account']['group']
del toot_json['account']['created_at']
del toot_json['account']['note']
del toot_json['account']['followers_count']
del toot_json['account']['following_count']
del toot_json['account']['statuses_count']
del toot_json['account']['last_status_at']
print("Successfully grabbed toots from server")
def create_toot(toot_json):
"""
Takes a JSON toot from Mastodon
and creates a string representing
the contents of a Hugo markdown
file.
"""
toot_content = toot_json['content']
del toot_json['content']
return "---\n" + \
f"{json.dumps(toot_json)}\n" +\
"---\n" +\
"{{< unsafe >}}\n" +\
f"{toot_content}\n" +\
"{{< /unsafe >}}\n"
def toot_file_from_id(tootid):
"""Returns toot filename from id"""
return f"{TOOT_CONTENT_LOCATION}/{tootid}.md"
def read_toot(tootid) -> Optional[Dict[Any, Any]]:
"""
Given a toot id, return
the markdown file contents
of the toot stored in Hugo
if it exists.
"""
try:
with open(toot_file_from_id(tootid), "r", encoding="UTF-8") as toot_file:
toot_data = read_json_frontmatter(toot_file.read())
return toot_data
except Exception:
return None
def write_toot(toot):
"""
Takes a toot json
and writes it to a hugo
markdown content file.
"""
toot_id = toot['id']
try:
with open(toot_file_from_id(toot_id), "w", encoding="UTF-8") as toot_file:
toot_file.write(create_toot(toot))
except Exception as e:
print("Failed to write toot", toot_id)
# Read in saved toot data
toot_filenames = glob(TOOT_CONTENT_LOCATION + "/*.md")
toot_ids = { Path(fname).stem for fname in toot_filenames }
server_toots = retrieve_toots_from_server()
for stoot in server_toots:
# Skip boosts for now
if stoot['content'] == '':
continue
reformat_toot(stoot)
stoot_id = stoot['id']
# Add new toots to saved toots
for toot in server_data:
if 'id' in toot and toot['id'] not in saved_toot_ids:
toots_data.append(toot)
# If the toot already exists
if stoot_id in toot_ids:
saved_tootdata = read_toot(stoot_id)
if saved_tootdata is None:
print("Unable to read saved toot id", stoot_id)
# Create a backup of the old toots
try:
os.rename(TOOT_SAVE_FILE, BACKUP_LOCATION)
except:
print("Unable to create backup of last toot file")
sys.exit(-1)
# Only update if toot has changed
elif saved_tootdata != stoot:
print("Updating toot id", stoot_id)
write_toot(stoot)
# Write toots_data to the disk
try:
with open(TOOT_SAVE_FILE, "w", encoding="UTF-8") as f:
json.dump(toots_data, f)
except:
print("Unable to write to save location.")
print("Grab backup at", BACKUP_LOCATION)
# New toot found
else:
print("Creating toot id", stoot_id)
write_toot(stoot)
print("Saved toot data to", TOOT_SAVE_FILE)
print("Completed")

View file

@ -1,214 +0,0 @@
#!/usr/bin/env python
"""
Script to create Hugo markdown
files from Mastodon Toots
"""
from urllib import request
from glob import glob
from http.client import HTTPResponse
from pathlib import Path
from typing import Any, Dict, Optional
import json
import math
import sys
TOOT_CONTENT_LOCATION = "content/toots"
SERVER="https://fosstodon.org"
# Quick way to find user id: https://prouser123.me/mastodon-userid-lookup/
MUID=108219415927856966
# Server default (when < 0) is 20
RETRIEVE_NUM_TOOTS=1000
MAX_TOOTS_PER_QUERY=40 # Cannot change (server default)
MAX_TOOT_ID=-1
def retrieve_toots_from_server():
"""
Grabs toots from Mastodon server
"""
global MAX_TOOT_ID
server_data = []
for _ in range(math.ceil(RETRIEVE_NUM_TOOTS // MAX_TOOTS_PER_QUERY)):
# Grab toots from Mastodon
limit_param = "?limit=" + str(RETRIEVE_NUM_TOOTS) \
if RETRIEVE_NUM_TOOTS > 0 else "?"
max_id = "&max_id=" + str(MAX_TOOT_ID) \
if MAX_TOOT_ID > 0 else ""
url = SERVER + "/api/v1/accounts/" + str(MUID) + "/statuses" + limit_param + max_id
response: Optional[HTTPResponse] = None
try:
response = request.urlopen(url)
except Exception:
print("Unable to grab toots from Mastodon.")
if response is None:
sys.exit(-1)
# Parse server response
server_data_part: Optional[list] = None
try:
server_data_part = json.loads(response.read())
except Exception:
print("Malformed JSON response from server.")
if server_data is None:
sys.exit(-1)
if not isinstance(server_data_part, list):
print("Unexpected JSON response, should be of form list.")
sys.exit(-1)
# No more to retrieve
if len(server_data_part) == 0:
break
print(f"Retrieved {len(server_data_part)} toots from server")
server_data.extend(server_data_part)
MAX_TOOT_ID = int(min(server_data_part, key=lambda p: int(p['id']))['id'])
print(f"Successfully grabbed a total of {len(server_data)} toots from server")
return server_data
def findall(p, s):
"""
Yields all the positions of
the pattern p in the string s.
Source: https://stackoverflow.com/a/34445090
"""
i = s.find(p)
while i != -1:
yield i
i = s.find(p, i+1)
def read_json_frontmatter(markdown_contents) -> Optional[Dict[Any, Any]]:
"""
Take the contents from a Hugo markdown
file and read the JSON frontmatter if it
exists.
"""
front_matter_indices = list(findall('---', markdown_contents))
if len(front_matter_indices) < 2:
return None
front_matter = markdown_contents[(front_matter_indices[0] + 3):front_matter_indices[1]]
front_matter_json = None
try:
front_matter_json = json.loads(front_matter)
except Exception:
pass
if not isinstance(front_matter_json, dict):
front_matter_json = None
front_matter_json['content'] = markdown_contents[front_matter_indices[1] + 19:-17]
return front_matter_json
def reformat_toot(toot_json):
"""
Takes a toot_json and
slightly modifies it to match
some of the fields Hugo expects.
"""
# Turn URL -> Syndication
toot_url = toot_json['url']
del toot_json['uri']
del toot_json['url']
toot_json['syndication'] = toot_url
# Turn Created At -> Date
toot_date = toot_json['created_at']
del toot_json['created_at']
toot_json['date'] = toot_date
# Strip out highly dynamic account information
del toot_json['account']['locked']
del toot_json['account']['bot']
del toot_json['account']['discoverable']
del toot_json['account']['group']
del toot_json['account']['created_at']
del toot_json['account']['note']
del toot_json['account']['followers_count']
del toot_json['account']['following_count']
del toot_json['account']['statuses_count']
del toot_json['account']['last_status_at']
def create_toot(toot_json):
"""
Takes a JSON toot from Mastodon
and creates a string representing
the contents of a Hugo markdown
file.
"""
toot_content = toot_json['content']
del toot_json['content']
return "---\n" + \
f"{json.dumps(toot_json)}\n" +\
"---\n" +\
"{{< unsafe >}}\n" +\
f"{toot_content}\n" +\
"{{< /unsafe >}}\n"
def toot_file_from_id(tootid):
"""Returns toot filename from id"""
return f"{TOOT_CONTENT_LOCATION}/{tootid}.md"
def read_toot(tootid) -> Optional[Dict[Any, Any]]:
"""
Given a toot id, return
the markdown file contents
of the toot stored in Hugo
if it exists.
"""
try:
with open(toot_file_from_id(tootid), "r", encoding="UTF-8") as toot_file:
toot_data = read_json_frontmatter(toot_file.read())
return toot_data
except Exception:
return None
def write_toot(toot):
"""
Takes a toot json
and writes it to a hugo
markdown content file.
"""
toot_id = toot['id']
try:
with open(toot_file_from_id(toot_id), "w", encoding="UTF-8") as toot_file:
toot_file.write(create_toot(toot))
except Exception as e:
print("Failed to write toot", toot_id)
# Read in saved toot data
toot_filenames = glob(TOOT_CONTENT_LOCATION + "/*.md")
toot_ids = { Path(fname).stem for fname in toot_filenames }
server_toots = retrieve_toots_from_server()
for stoot in server_toots:
# Skip boosts for now
if stoot['content'] == '':
continue
reformat_toot(stoot)
stoot_id = stoot['id']
# If the toot already exists
if stoot_id in toot_ids:
saved_tootdata = read_toot(stoot_id)
if saved_tootdata is None:
print("Unable to read saved toot id", stoot_id)
# Only update if toot has changed
elif saved_tootdata != stoot:
print("Updating toot id", stoot_id)
write_toot(stoot)
# New toot found
else:
print("Creating toot id", stoot_id)
write_toot(stoot)
print("Completed")