__author__ = "Peter Molnar" __copyright__ = "Copyright 2017, Peter Molnar" __license__ = "apache-2.0" __maintainer__ = "Peter Molnar" __email__ = "mail@petermolnar.net" import glob import os from datetime import datetime, timezone, UTC from collections import namedtuple from time import time from shutil import copyfileobj import re import json import subprocess import logging from shutil import copy2 as cp from copy import deepcopy import sqlite3 import urllib.request import urllib.parse import hashlib from lxml import etree import wand.image import wand.drawing import jinja2 import requests # pyyaml import yaml # git+https://github.com/eentzel/htmltruncate.py from htmltruncate import truncate, UnbalancedError # python-frontmatter import frontmatter # from weasyprint import HTML logging.getLogger("").setLevel(logging.INFO) console_handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") console_handler.setFormatter(formatter) logging.getLogger("").addHandler(console_handler) logging.getLogger("asyncio").setLevel(logging.INFO) MarkdownImage = namedtuple("MarkdownImage", ["match", "alt", "fname", "title", "css"]) BASEPATH = os.path.dirname(os.path.realpath(__file__)) TMPPATH = os.path.join(BASEPATH, ".tmp") if not os.path.exists(TMPPATH): os.makedirs(TMPPATH) SITEVARS = { "domain": "petermolnar.net", "name": "petermolnar.net", "url": "https://petermolnar.net", "silos": { "twitter": ["https://twitter/petermolnar"], "flick": [ "https://www.flickr.com/photos/petermolnareu", "https://www.flickr.com/photos/36003160@N08", "https://www.flickr.com/people/petermolnareu", "https://www.flickr.com/people/36003160@N08", ], "mastodon": [ "https://indieweb.social/@pmlnr", ], "tumblr": [ "https://pmlnr.tumblr.com", ], }, "link": { "hub": "https://petermolnar.superfeedr.com/", "webmention": "https://webmention.petermolnar.net/webmention", # "micropub": "https://petermolnar.net/webhook.php", # "microsub": "https://aperture.p3k.io/microsub/83", # "authorization_endpoint": "https://indieauth.com/auth", # "token_endpoint": "https://tokens.indieauth.com/token", }, } WATERMARK = os.path.join(BASEPATH, ".templates", "watermark.png") IMGSIZES = { "src": {"size": 720, "suffix": ""}, "href": {"size": 1280, "suffix": "_large"}, "huge": {"size": 1920, "suffix": "_huge"}, "small": {"size": 480, "suffix": "_small"}, } RE_CODE = re.compile(r"^[~`]{3,4}.+$", re.MULTILINE) RE_PRECODE = re.compile(r'
')
RE_MYURL = re.compile(r'(^(%s[^"]+)$|"(%s[^"]+)")' % (SITEVARS["url"], SITEVARS["url"]))
RE_MDIMG = re.compile(
r"(?P!\[(?P[^]]+)?]\((?P[^\s\]]+)"
r"(?:\s[\'\"](?P[^\"\']+)[\'\"])?\)(?:{(?P[^}]+)})?)",
re.IGNORECASE,
)
RE_AUTHOR = re.compile(r"P[eé]ter Moln[aá]r|Moln[aá]r P[eé]ter|petermolnar\.(?:eu|net)")
RE_TIMEZONEFIX = re.compile(r"([0-9-]+T[0-9:]+[+-][0-9]{2}):?([0-9]{2})")
# https://www.peterbe.com/plog/fastest-python-function-to-slugify-a-string
NON_URL_SAFE = [
'"',
"#",
"$",
"%",
"&",
"+",
",",
"/",
":",
";",
"=",
"?",
"@",
"[",
"]",
"^",
"`",
"{",
"|",
"}",
"~",
"'",
".",
"\\",
]
# TRANSLATE_TABLE = {ord(char): "" for char in NON_URL_SAFE}
RE_NON_URL_SAFE = re.compile(
r"[{}]".format("".join(re.escape(x) for x in NON_URL_SAFE))
)
RE_REMOVESCHEME = re.compile(r"^https?://(?:www)?")
def slugify(text):
text = RE_REMOVESCHEME.sub("", text).strip()
text = RE_NON_URL_SAFE.sub("", text).strip()
text = text.lower()
text = "_".join(re.split(r"\s+", text))
return text
J2 = jinja2.Environment(
loader=jinja2.FileSystemLoader(searchpath=os.path.join(BASEPATH, ".templates")),
lstrip_blocks=True,
trim_blocks=True,
)
J2.globals["year"] = datetime.now(UTC).strftime("%Y")
with open(os.path.join(BASEPATH, ".templates", "lens.json"), "rt") as __f:
LENS = json.loads(__f.read())
with open(os.path.join(BASEPATH, ".templates", "manuallens.json"), "rt") as ___f:
MANUALLENS = json.loads(___f.read())
def unix_timestamp():
return int(datetime.now(UTC).timestamp())
def relurl(text, baseurl=SITEVARS["url"]):
for match, standalone, href in RE_MYURL.findall(str(text)):
needsquotes = False
url = href
if len(href):
needsquotes = True
else:
url = standalone
r = os.path.relpath(url, baseurl)
if url.endswith("/") and not r.endswith("/"):
r = f"{r}/index.html"
if needsquotes:
r = f'"{r}"'
text = text.replace(match, r)
return text
J2.filters["relurl"] = relurl
def printdate(rfc3339):
dt = datetime.fromisoformat(rfc3339)
return str(dt.strftime("%d %B, %Y"))
J2.filters["printdate"] = printdate
RE_BASEEXTDOMAIN = re.compile(r"^(?:www|web|pmlnr)?\.?(?P .*)$")
def extractdomain(url):
url = urllib.parse.urlparse(url)
return RE_BASEEXTDOMAIN.sub(r"\1", url.hostname)
def syndicationlink(url):
return extractdomain(url)
J2.filters["syndicationlink"] = syndicationlink
J2.filters["extractdomain"] = extractdomain
def insertfile(fpath):
fpath = os.path.join(os.path.join(BASEPATH, ".templates"), fpath)
if not os.path.exists(fpath):
return ""
with open(os.path.join(os.path.join(BASEPATH, ".templates"), fpath), "rt") as f:
return f.read()
J2.filters["insertfile"] = insertfile
def cachefile(source, target, mtime=None):
content = None
if not mtime:
if os.path.islink(source):
mtimeof = os.path.realpath(source)
else:
mtimeof = source
mtime = os.path.getmtime(mtimeof)
if os.path.exists(target):
if mtime <= os.path.getmtime(target):
# logging.info(f"cache file {target} age > {source}")
with open(target, "rt") as f:
content = f.read()
else:
logging.debug(f"cache file {target} is too old")
else:
logging.debug(f"no cache found under {target}")
return content
class cached_property(object):
def __init__(self, method, name=None):
self.method = method
self.name = name or method.__name__
def __get__(self, inst, cls):
if inst is None:
return self
result = self.method(inst)
setattr(inst, self.name, result)
return result
class WebImage(object):
@property
def imgsizes(self):
r = deepcopy(IMGSIZES)
for name, details in r.items():
r[name]["fpath"] = os.path.join(
self.dirname,
"%s%s%s" % (self.name, details["suffix"], self.fext),
)
return r
def make_map(self):
if "MAPBOX_TOKEN" not in os.environ or not os.environ["MAPBOX_TOKEN"]:
return
token = os.environ["MAPBOX_TOKEN"]
mapfpath = os.path.join(self.dirname, "map.png")
if (
os.path.exists(mapfpath)
and os.path.exists(self.original)
and os.path.getmtime(mapfpath) >= os.path.getmtime(self.original)
):
return
if "GPSLatitude" not in self.exif or "GPSLongitude" not in self.exif:
logging.debug("gps info missing from exif at: %s", self.fpath)
return
lat = round(float(self.exif["GPSLatitude"]), 3)
lon = round(float(self.exif["GPSLongitude"]), 3)
url = (
"https://api.mapbox.com/styles/v1/mapbox/"
f"outdoors-v11/static/pin-s({lon},{lat})/{lon},{lat},11,20/"
f"720x480?access_token={token}"
)
logging.info("requesting map for %s with URL %s", self.fpath, url)
req = urllib.request.Request(url, method="GET")
response = urllib.request.urlopen(req)
logging.info("saving map file to %s", mapfpath)
with open(mapfpath, "wb") as f:
copyfileobj(response, f)
t = time()
os.utime(self.parent.fpath, (int(t), int(t)))
def linktoreal(self, source):
realtarget = source.replace(
self.dirname, os.path.dirname(os.path.realpath(self.fpath))
)
if not os.path.exists(realtarget):
logging.warning(
f"missing realtarget {realtarget} - can't symlink {source} yet"
)
return
target = os.path.relpath(realtarget, os.path.dirname(source))
if os.path.exists(source) and os.path.islink(source):
return
if os.path.exists(source) and not os.path.islink(source):
logging.warning(f"replacing file {source} with symlink to {target}")
os.unlink(source)
logging.debug(f"creating symlink from {source} to {target}")
os.symlink(target, source)
# this is to set the mtime of the symlink itself
ts = str(
datetime.fromtimestamp(int(os.path.getmtime(realtarget)))
.replace(tzinfo=timezone.utc)
.strftime("%Y%m%d%H%M")
)
os.system(f"touch -h -t {ts} {source}")
def __init__(self, fpath, mdimg, parent):
self.fpath = fpath
self.mdimg = mdimg
self.parent = parent
self.fname = os.path.basename(self.fpath)
self.dirname = os.path.dirname(self.fpath)
self.name, self.fext = os.path.splitext(self.fname)
self.original = self.fpath.replace(self.fname, f".{self.name}.orig{self.fext}")
self.is_featured = False
if os.path.exists(self.fpath):
self.is_link = os.path.islink(self.fpath)
else:
self.is_link = False
if self.is_link:
self.linktoreal(self.original)
elif not os.path.exists(self.original):
cp(self.fpath, self.original)
self.size = max(self.exif["ImageHeight"], self.exif["ImageWidth"])
img = None
for name, details in self.imgsizes.items():
# special case of items symlinked to other posts' images
if self.is_link:
self.linktoreal(details["fpath"])
continue
# image is too small for this size
if details["size"] >= self.size:
continue
# image already exists and is
if os.path.exists(details["fpath"]) and (
(
os.path.getmtime(details["fpath"])
>= os.path.getmtime(self.original)
and os.path.getsize(self.original)
!= os.path.getsize(details["fpath"])
)
or (
os.path.getmtime(details["fpath"]) > os.path.getmtime(self.original)
and os.path.getsize(self.original)
== os.path.getsize(details["fpath"])
)
):
logging.debug(
"resized image %s for %s already exists", name, self.fpath
)
continue
if not img:
img = wand.image.Image(filename=self.original)
img.auto_orient()
if self.is_my_photo:
logging.info(f"{self.fpath} needs watermarking")
with wand.image.Image(filename=WATERMARK) as wmark:
with wand.drawing.Drawing():
w = img.height * 0.2
h = wmark.height * (w / wmark.width)
if img.width > img.height:
x = img.width - w - (img.width * 0.01)
y = img.height - h - (img.height * 0.01)
else:
x = img.width - h - (img.width * 0.01)
y = img.height - w - (img.height * 0.01)
w = round(w)
h = round(h)
x = round(x)
y = round(y)
wmark.resize(w, h)
if img.width <= img.height:
wmark.rotate(-90)
img.composite(image=wmark, left=x, top=y)
crop = details.get("crop", False)
ratio = max(img.width, img.height) / min(img.width, img.height)
horizontal = True if (img.width / img.height) >= 1 else False
with img.clone() as thumb:
# panorama: reverse "horizontal" because the limit
# should be on the shorter side, not the longer, and
# make it a bit smaller, than the actual limit
# 2.39 is the wide angle cinematic view: anything
# wider, than that is panorama land
# this is to maintain a viewable panorama
if ratio > 2.39 and not crop:
details["size"] = int(details["size"] * 0.6)
horizontal = not horizontal
# w = img.width
# h = img.height
if horizontal != crop:
w = details["size"]
h = int(float(details["size"] / img.width) * img.height)
else:
h = details["size"]
w = int(float(details["size"] / img.height) * img.width)
thumb.resize(w, h)
if crop:
thumb.liquid_rescale(details["size"], details["size"], 1, 1)
if self.exif.get("FileType", "").lower() == "jpeg":
if "small" == name:
thumb.compression_quality = 70
else:
thumb.compression_quality = 86
thumb.unsharp_mask(radius=1, sigma=0.5, amount=0.7, threshold=0.5)
thumb.format = "pjpeg"
# this is to make sure pjpeg happens
output = details["fpath"]
with open(output, "wb") as o:
wmarkmsg = " "
if self.is_my_photo:
wmarkmsg = " watermarked "
logging.info(f"saving{wmarkmsg}image ({w}x{h}) to {output}")
thumb.save(file=o)
if self.exif.get("FileType", "").lower() == "jpeg":
cmd = (
"exiftool",
f"-XMP:Source={self.parent.url}",
"-overwrite_original",
output,
)
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
if stderr:
raise OSError(f"Error writing EXIF to {output}: {stderr}")
@cached_property
def is_my_photo(self):
for candidate in ["Artist", "Copyright"]:
if candidate in self.exif:
if RE_AUTHOR.search(self.exif[candidate]):
return True
return False
@cached_property
def src(self):
return self.fpath.replace(BASEPATH, SITEVARS["url"])
@cached_property
def tmplvars(self):
if len(self.mdimg.alt):
alt = self.mdimg.alt
else:
alt = self.exif.get("Description", "")
if len(self.mdimg.title):
title = self.mdimg.title
else:
title = self.exif.get("Headline", self.fname)
# width = IMGSIZES["src"]["size"]
# height = IMGSIZES["src"]["size"]
with wand.image.Image(filename=self.fpath) as img:
width = img.width
height = img.height
tmplvars = {
"src": self.src,
"alt": alt,
"caption": alt,
"title": title,
"featured": self.is_featured,
"width": width,
"height": height,
"orientation": "horizontal" if width > height else "vertical",
"mime": self.exif.get("MIMEType", "image/jpeg"),
"bytesize": os.path.getsize(self.fpath),
"licensor": SITEVARS["url"],
"name": self.name,
}
for s in ["huge", "href", "small"]:
maybe = os.path.join(
self.dirname,
"%s%s%s" % (self.name, IMGSIZES[s]["suffix"], self.fext),
)
if os.path.exists(maybe):
tmplvars[s] = maybe.replace(BASEPATH, SITEVARS["url"])
# if "href" not in tmplvars:
# raise KeyError(f"failed to locate 'href' for {self.fpath} used in {self.parent.fpath}")
if self.is_my_photo:
tmplvars["license"] = "CC-BY-NC-ND-4.0"
tmplvars["exif"] = {}
mapping = {
"camera": ["Model"],
"aperture": ["FNumber", "Aperture"],
"shutter": ["ExposureTime"],
"focallength": ["FocalLength", "FocalLengthIn35mmFormat"],
"iso": ["ISO"],
"lens": ["LensID", "LensSpec", "Lens"],
"created": ["CreateDate", "DateTimeOriginal"],
"latitude": ["GPSLatitude"],
"longitude": ["GPSLongitude"],
}
for k, candidates in mapping.items():
for candidate in candidates:
maybe = self.exif.get(candidate, None)
if maybe:
tmplvars["exif"][k] = maybe
break
# lens info is a bit fragmented, so let's try to identify the
# real lens, plus add the URL for it
if "lens" in tmplvars["exif"] and tmplvars["exif"]["lens"] in LENS:
tmplvars["exif"]["lens"] = LENS[tmplvars["exif"]["lens"]]
elif (
"focallength" in tmplvars["exif"]
and "camera" in tmplvars["exif"]
and "created" in tmplvars["exif"]
and tmplvars["exif"]["focallength"] in MANUALLENS
):
epoch = int(
datetime.fromisoformat(
tmplvars["exif"]["created"].replace('"', "")
).timestamp()
)
e = tmplvars["exif"]
for lens in MANUALLENS[tmplvars["exif"]["focallength"]]:
if tmplvars["exif"]["camera"] not in lens["camera"]:
continue
if "maxepoch" in lens and epoch > int(lens["maxepoch"]):
continue
if "minepoch" in lens and epoch < int(lens["minepoch"]):
continue
tmplvars["exif"]["lens"] = lens
break
if (
"lens" in tmplvars["exif"]
and "name" not in tmplvars["exif"]["lens"]
):
logging.error(
f"failed to identify manual lens at {self.fpath}, exif is {e}"
)
del tmplvars["exif"]["lens"]
elif "lens" in tmplvars["exif"]:
tmplvars["exif"]["lens"] = {
"name": tmplvars["exif"]["lens"],
"url": "",
}
for e in ["latitude", "longitude"]:
if e in tmplvars["exif"]:
tmplvars["exif"][e] = round(float(tmplvars["exif"][e]), 3)
return tmplvars
@property
def printhtml(self):
if len(self.mdimg.css):
return self.mdimg.match
v = deepcopy(self.tmplvars)
for s in ["huge", "href"]:
if s in v:
v["src"] = v[s]
break
tmpl = J2.get_template("Figure.j2.html")
r = tmpl.render(v)
del v
return r
def __str__(self):
if len(self.mdimg.css):
return self.mdimg.match
tmpl = J2.get_template("Figure.j2.html")
# v = self.tmplvars
r = tmpl.render(self.tmplvars)
return r
@cached_property
def exif(self):
if self.is_link:
cachepath = os.path.join(
os.path.dirname(os.path.realpath(self.fpath)),
self.dirname,
f".{self.fname}.exif.json",
)
else:
cachepath = os.path.join(self.dirname, f".{self.fname}.exif.json")
content = cachefile(self.original, cachepath)
if content:
return json.loads(content)
cmd = (
"exiftool",
"-sort",
"-json",
"-dateFormat",
'"%Y-%m-%dT%H:%M:%S+00:00"',
"-MIMEType",
"-FileType",
"-FileName",
"-FileSize#",
"-ModifyDate",
"-CreateDate",
"-DateTimeOriginal",
"-ImageHeight",
"-ImageWidth",
"-Aperture",
"-FOV",
"-ISO",
"-FocalLength",
"-FNumber",
"-FocalLengthIn35mmFormat",
"-ExposureTime",
"-Model",
"-GPSLongitude#",
"-GPSLatitude#",
"-LensID",
"-LensSpec",
"-Lens",
"-ReleaseDate",
"-Description",
"-Headline",
"-HierarchicalSubject",
"-Copyright",
"-Artist",
"-By-line",
"-CopyrightNotice",
"-CopyrightOwnerID",
"-CopyrightOwnerName",
"-Creator",
"-Rights",
"-plus:Licensor",
"-xmpRights:WebStatement",
self.original,
)
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
if stderr:
raise OSError(f"Error reading EXIF from {self.original}: {stderr}")
exif = json.loads(stdout.decode("utf-8").strip()).pop()
with open(cachepath, "wt") as f:
logging.info(f"updating EXIF for {self.original} at {cachepath}")
f.write(json.dumps(exif, indent=4, sort_keys=True))
return exif
class MarkdownDoc(object):
def __init__(self, fpath):
self.fpath = fpath
self.fname = os.path.basename(self.fpath)
self.dirname = os.path.dirname(fpath)
self.name, self.fext = os.path.splitext(self.fname)
@property
def mtime(self):
return int(os.path.getmtime(self.fpath))
@cached_property
def parsed(self):
with open(self.fpath, mode="rt") as f:
meta, txt = frontmatter.parse(f.read())
if "author" not in meta:
raise LookupError(f"Missing author on {self.fpath}")
return meta, txt.strip()
@property
def meta(self) -> dict:
return self.parsed[0]
@property
def txt(self) -> str:
if not self.parsed[1] or not len(self.parsed[1]):
return str("")
else:
return self.parsed[1]
@property
def author(self):
return self.meta["author"]
def save(self):
m = deepcopy(self.meta)
t = pandoc_formattedmarkdown(self.txt)
logging.info(f"=> WRITING MARKDOWN FILE <= {self.fpath}")
with open(self.fpath, "wt") as f:
f.write(
"---\n%s\n---\n\n%s"
% (
yaml.dump(
m,
default_flow_style=False,
indent=4,
allow_unicode=True,
),
t,
)
)
class Comment(MarkdownDoc):
@cached_property
def parsed(self):
meta, txt = super().parsed
if "source" not in meta:
raise LookupError(f"Missing 'source' on {self.fpath}")
if "target" not in meta:
raise LookupError(f"Missing 'target' on {self.fpath}")
if "type" not in meta:
raise LookupError(f"Missing 'type' on {self.fpath}")
return meta, txt
@property
def dt(self):
try:
dt = datetime.fromisoformat(self.meta["date"])
except TypeError as err:
raise TypeError(f"failed 'date' parsing on {self.fpath}: {err}")
if self.mtime != int(dt.timestamp()):
os.utime(self.fpath, (int(dt.timestamp()), int(dt.timestamp())))
return dt
@property
def tmplvars(self):
return self.meta
class Entry(MarkdownDoc):
def __init__(self, fpath):
super().__init__(fpath)
self.subentries = {}
@cached_property
def parsed(self):
meta, txt = super().parsed
if "published" not in meta:
raise LookupError(f"Missing 'published' on {self.fpath}")
if "copies" not in meta:
meta["copies"] = []
return meta, txt
@property
def syndicate(self):
s = ["http://web.archive.org/web/"]
if "syndicate" in self.meta:
s.extend(self.meta["syndicate"])
if "photo" == self.category:
s.append("https://brid.gy/publish/flickr")
s.append("https://brid.gy/publish/tumblr")
return list(set(s))
@property
def dt(self):
try:
dt = datetime.fromisoformat(self.meta["published"])
except TypeError as err:
raise ValueError(f"failed 'published' parsing on {self.fpath}: {err}")
return dt
@property
def is_live(self):
r = requests.get(self.url)
if r.status_code != requests.codes.ok:
return False
else:
return True
@property
def is_future(self):
my_ts = datetime.fromisoformat(self.meta["published"]).timestamp()
unix_ts = unix_timestamp()
if my_ts > unix_ts:
return True
else:
return False
@property
def title(self):
if "title" in self.meta and len(self.meta["title"]) > 0:
return self.meta["title"]
else:
return printdate(self.dt.isoformat())
@property
def updated(self):
return (
datetime.fromtimestamp(self.mtime).replace(tzinfo=timezone.utc).isoformat()
)
@property
def mtime(self):
mtime = int(os.path.getmtime(self.fpath))
if not self.is_future:
mtime = max(mtime, int(self.dt.timestamp()))
if len(self.subentries):
mtime = max(mtime, max([v.mtime for v in self.subentries.values()]))
if len(self.comments):
mtime = max(mtime, max([v.mtime for v in self.comments.values()]))
return mtime
# everything second level, e.g. article/entry/index.md has a category
@cached_property
def category(self):
pathdiff = os.path.relpath(self.fpath, BASEPATH)
if 2 == pathdiff.count("/"):
return pathdiff.split("/")[0]
else:
return None
# everything second level, e.g. article/entry/index.md has a category
@cached_property
def entry(self):
if "index.md" == self.fname:
return os.path.basename(self.dirname)
else:
return self.dirname
@cached_property
def type(self):
pathdiff = os.path.relpath(self.fpath, BASEPATH)
if pathdiff.count("/") >= 2:
return "post"
elif pathdiff.count("/") == 1:
subentries = glob.glob(
os.path.join(self.dirname, "**", "index.md"), recursive=True
)
if len(subentries) > 1:
return "category"
else:
return "page"
else:
return "home"
@cached_property
def comments(self):
urls = []
comments = {}
for candidate in glob.glob(os.path.join(self.dirname, "*.md")):
if candidate.endswith("index.md") or candidate.endswith("README.md"):
continue
comment = Comment(candidate)
skip = False
if "type" in comment.meta and comment.meta["type"] in [
"like",
"bookmark",
"repost",
]:
skip = True
for silo, silourls in SITEVARS["silos"].items():
for silourl in silourls:
if silourl in comment.meta["source"]:
skip = True
if comment.meta["source"] not in self.meta["copies"]:
logging.info(
f"found and adding a new syndication URL in webmentions: {silourl} for {self.fpath}"
)
self.meta["copies"].append(comment.meta["source"])
self.save()
if skip:
continue
if comment.meta["source"] in urls:
raise LookupError(
"duplicate comments? Check:\n\t%s\n\t%s"
% (comment.meta["source"], candidate)
)
else:
urls.append(comment.meta["source"])
comments[int(comment.dt.timestamp())] = comment
return comments
def images(self):
images = {}
for match, alt, fname, title, css in RE_MDIMG.findall(self.txt):
mdimg = MarkdownImage(match, alt, fname, title, css)
imgpath = os.path.join(self.dirname, fname)
if not os.path.exists(imgpath):
raise OSError(f"{imgpath} is missing from {self.fpath}")
else:
webimg = WebImage(imgpath, mdimg, self)
if webimg.name == self.entry and webimg.is_my_photo is True:
webimg.is_featured = True
webimg.make_map()
images.update({match: webimg})
return images
@cached_property
def featured_image(self):
images = self.images()
if len(images):
for match, webimg in images.items():
if webimg.is_featured:
return match, webimg
return None, None
@cached_property
def html(self):
if not len(self.txt):
return ""
txt = self.txt
images = self.images()
if len(images):
# remove the featured image from the content, that will
# be added separetely
# replace all the others with their HTML version
for match, webimg in images.items():
if webimg.is_featured:
txt = txt.replace(match, "")
else:
txt = txt.replace(match, str(webimg))
c = pandoc(txt)
c = RE_PRECODE.sub(r'', c)
return c
@cached_property
def description(self):
if "summary" in self.meta and self.meta["summary"]:
return self.meta["summary"].strip()
# return ""
try:
t = truncate(self.html, 255, "…")
return t
except UnbalancedError as e:
logging.info(e)
logging.info(self.html)
return ""
@property
def url(self):
return "%s/" % (self.dirname.replace(BASEPATH, SITEVARS["url"]))
@cached_property
def tmplvars(self):
post = deepcopy(self.meta)
post.update(
{
"title": self.title,
"html": self.html,
# "gmi": md2gemini(self.txt),
"description": self.description,
"entry": self.entry,
"category": self.category,
"url": self.url,
"updated": self.updated,
"year": self.dt.strftime("%Y"),
"type": self.type,
"has_code": RE_CODE.search(self.txt),
"has_map": os.path.exists(os.path.join(self.dirname, "map.png")),
"syndicate": self.syndicate,
}
)
webimg = self.featured_image[1]
if webimg:
post.update({"image": webimg.tmplvars})
post["image"].update({"html": str(webimg), "print": webimg.printhtml})
if "license" not in post:
if webimg:
post.update({"license": "CC-BY-NC-ND-4.0"})
else:
post.update({"license": "CC-BY-4.0"})
if len(self.comments):
post["comments"] = [
self.comments[k].tmplvars
for k in sorted(self.comments.keys(), reverse=True)
]
headerimg = os.path.join(self.dirname, "h1.svg")
if os.path.exists(headerimg):
post.update({"headerimg": headerimg})
return post
def write_gopher(self):
gopherpath = os.path.join(self.dirname, "gophermap")
gopher = cachefile(self.fpath, gopherpath, self.mtime)
if "category" == self.type and not gopher:
logging.info(f"saving gophermap {gopherpath}")
with open(gopherpath, "wt") as f:
lines = [
"%s - %s" % (self.title, SITEVARS["name"]),
"",
"",
]
for subentry in [
self.subentries[k]
for k in sorted(self.subentries.keys(), reverse=True)
]:
line = "0%s\t/%s\t%s\t70" % (
subentry.title,
os.path.relpath(subentry.fpath, BASEPATH),
SITEVARS["domain"],
)
lines.append(line)
if "summary" in subentry.meta and len(subentry.meta["summary"]):
lines.extend(
pandoc_formattedtext(subentry.meta["summary"]).split("\n")
)
for img in subentry.images().values():
line = "I%s\t/%s\t%s\t70" % (
img.fname,
os.path.relpath(img.fpath, BASEPATH),
SITEVARS["domain"],
)
lines.append(line)
lines.append("")
f.write("\r\n".join(lines))
def write_html(self):
htmlpath = os.path.join(self.dirname, f"{self.name}.html")
html = cachefile(self.fpath, htmlpath, self.mtime)
if not html:
logging.info(f"saving {htmlpath}")
with open(htmlpath, "wt") as f:
if "category" == self.type:
tmpl = J2.get_template("Category.j2.html")
else:
tmpl = J2.get_template("Singular.j2.html")
tmplvars = {
"baseurl": self.url,
"site": SITEVARS,
"post": self.tmplvars,
}
if len(self.subentries):
tmplvars["subentries"] = [
self.subentries[k].tmplvars
for k in sorted(self.subentries.keys(), reverse=True)
]
html = tmpl.render(tmplvars)
f.write(html)
del tmpl
del tmplvars
return html
# def write_pdf(self):
# htmlpath = os.path.join(self.dirname, f"{self.name}.html")
# pdfpath = os.path.join(self.dirname, f"{self.name}.pdf")
# pdf = False
# if os.path.exists(pdfpath):
# # not self.mtime because the pdf only contains the
# # post, not the comments or webmentions
# if os.path.getmtime(pdfpath) >= os.path.getmtime(self.fpath):
# pdf = True
# if not pdf:
# logging.info(f"saving {pdfpath}")
# HTML(htmlpath).write_pdf(pdfpath)
def __str__(self):
self.write_gopher()
r = self.write_html()
# if self.category:
# self.write_pdf()
return r
class SearchDB(object):
def __init__(self):
self.is_changed = False
self.fpath = os.path.join(BASEPATH, "search.sqlite")
self.db = sqlite3.connect(self.fpath)
self.db.execute("PRAGMA auto_vacuum = INCREMENTAL;")
self.db.execute("PRAGMA journal_mode = MEMORY;")
self.db.execute("PRAGMA temp_store = MEMORY;")
self.db.execute("PRAGMA locking_mode = NORMAL;")
self.db.execute("PRAGMA synchronous = FULL;")
self.db.execute('PRAGMA encoding = "UTF-8";')
self.db.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS data USING fts4(
url,
mtime,
title,
content,
summary,
featuredimg,
category,
author,
published,
notindexed=url,
notindexed=mtime,
notindexed=title,
notindexed=featuredimg,
notindexed=category,
notindexed=author,
notindexed=published,
tokenize=porter
)"""
)
@property
def mtime(self):
if os.path.exists(self.fpath):
mtime = int(os.path.getmtime(self.fpath))
else:
mtime = 0
return mtime
def __exit__(self):
self.db.commit()
self.db.execute("PRAGMA auto_vacuum;")
self.db.close()
def append(self, post):
logging.info(f"adding {post.dirname} to search")
# existing_mtime = 0
exists = False
maybe = self.db.execute(
"SELECT mtime FROM data WHERE url = ?", (post.url,)
).fetchone()
if maybe and int(maybe[0]) < post.mtime:
logging.info(
f"{post.url} needs updating in search, deleting previous entry"
)
self.db.execute("DELETE FROM data WHERE url=?", (post.url,))
elif maybe and int(maybe[0]) >= post.mtime:
exists = True
if post.featured_image[1]:
featuredimg = post.featured_image[1].src
else:
featuredimg = ""
corpus = "\n".join(
[post.title, post.url, post.description, post.txt, featuredimg]
)
if not exists:
logging.info(f"updating search with {post.url}")
self.db.execute(
"""
INSERT INTO data (url, mtime, title, content, summary, featuredimg, category, author, published)
VALUES (?,?,?,?,?,?,?,?,?);
""",
(
post.url,
post.mtime,
post.title,
corpus,
post.description,
featuredimg,
post.category,
post.meta["author"]["name"],
post.dt.timestamp(),
),
)
self.is_changed = True
class Bookmarks(object):
def __init__(self):
if "BOOKMARKS_ARCHIVE" not in os.environ:
self.fpath = None
else:
self.fpath = os.environ["BOOKMARKS_ARCHIVE"]
self.dirpath = os.path.join(BASEPATH, "bookmarks")
if not os.path.isdir(self.dirpath):
os.makedirs(self.dirpath)
@property
def mtime(self):
mtime = 0
if self.fpath:
mtime = int(os.path.getmtime(self.fpath))
return mtime
@property
def links(self):
links = []
if not self.fpath:
return links
with open(self.fpath, "rt") as f:
raw = json.loads(f.read())
for e in raw:
dt = RE_TIMEZONEFIX.sub(r"\1:\2", e["created_at"])
eid = int(e["id"])
url = e["url"]
link = {
"url": url,
"title": e["title"],
"slug": slugify(url),
"published": datetime.fromisoformat(dt).isoformat(),
"id": eid,
}
links.append(link)
return links
def __str__(self):
if not self.fpath:
return ""
htmlpath = os.path.join(self.dirpath, "index.html")
html = cachefile(self.fpath, htmlpath, self.mtime)
if not html:
logging.info(f"saving {htmlpath}")
with open(htmlpath, "wt") as f:
tmpl = J2.get_template("Bookmarks.j2.html")
tmplvars = {
"baseurl": htmlpath.replace(BASEPATH, SITEVARS["url"]),
"site": SITEVARS,
"post": self.tmplvars,
}
html = tmpl.render(tmplvars)
f.write(html)
del tmpl
del tmplvars
return html
@property
def tmplvars(self):
v = {"subentries": self.links}
return v
def maybe_hash_cache(prefix, txt):
_h = hashlib.md5(txt.encode())
_md5 = _h.hexdigest()
_hf = os.path.join(TMPPATH, f"{prefix}_{_md5}")
if not os.path.exists(_hf):
return None
with open(_hf, "rt") as f:
return f.read()
def write_hash_cache(prefix, txt, content):
_h = hashlib.md5(txt.encode())
_md5 = _h.hexdigest()
_hf = os.path.join(TMPPATH, f"{prefix}_{_md5}")
with open(_hf, "wt") as f:
f.write(content)
def write_mdfile(fpath, meta, txt):
meta = yaml.dump(
meta, default_flow_style=False, indent=4, allow_unicode=True, width=72
)
r = f"---\n{meta}\n---\n\n{txt}\n"
with open(fpath, "wt") as f:
logging.info(f"saving markdown file {fpath}")
f.write(r)
def pandoc_formattedmarkdown(txt):
_h = maybe_hash_cache("fmarkdown", txt)
if _h:
return _h
mdoptions = [
"+footnotes",
"+pipe_tables",
"+strikeout",
"+superscript",
"+subscript",
"+raw_html",
"+definition_lists",
"+backtick_code_blocks",
"+fenced_code_attributes",
"+shortcut_reference_links",
"+lists_without_preceding_blankline",
"-smart",
"-markdown_in_html_blocks",
"-simple_tables",
"-multiline_tables",
"-grid_tables",
]
mdoptions = "".join(mdoptions)
f = f"--from=markdown{mdoptions}"
t = f"--to=markdown{mdoptions}"
cmd = (
"pandoc",
"-o-",
f,
t,
"--quiet",
"--markdown-headings=atx",
"--wrap=auto",
"--columns=72",
)
pandocprocess = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = pandocprocess.communicate(input=txt.encode())
if stderr:
raise OSError(f"Error during pandoc call of `{cmd}`: {stderr}")
r = stdout.decode("utf-8").strip()
write_hash_cache("fmarkdown", txt, str(r))
return str(r)
def pandoc_formattedtext(txt):
_h = maybe_hash_cache("ftext", txt)
if _h:
return _h
f = f"--from=markdown"
t = f"--to=plain"
cmd = (
"pandoc",
"-o-",
f,
t,
"--quiet",
"--wrap=auto",
"--columns=72",
)
pandocprocess = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = pandocprocess.communicate(input=txt.encode())
if stderr:
raise OSError(f"Error during pandoc call of `{cmd}`: {stderr}")
r = stdout.decode("utf-8").strip()
write_hash_cache("ftext", txt, str(r))
return str(r)
def pandoc(txt):
_h = maybe_hash_cache("html", txt)
if _h:
return _h
mdoptions = [
"+footnotes",
"+pipe_tables",
"+strikeout",
"+superscript",
"+subscript",
"+raw_html",
"+definition_lists",
"+backtick_code_blocks",
"+fenced_code_attributes",
"+shortcut_reference_links",
"+lists_without_preceding_blankline",
"+autolink_bare_uris",
"+auto_identifiers",
"+space_in_atx_header",
"-smart",
]
mdoptions = "".join(mdoptions)
f = f"--from=markdown{mdoptions}"
t = "--to=html5"
cmd = (
"pandoc",
"-o-",
f,
t,
"--no-highlight",
"--quiet",
"--wrap=auto",
"--columns=72"
)
pandocprocess = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = pandocprocess.communicate(input=txt.encode())
if stderr:
raise OSError(f"Error during pandoc call of `{cmd}`: {stderr}")
r = stdout.decode("utf-8").strip()
write_hash_cache("html", txt, str(r))
return str(r)
def mkfeed(entries):
xmlfeedfile = os.path.join(BASEPATH, "feed", "index.atom")
# rssfeedfile = os.path.join(BASEPATH, "feed", "index.rss")
# atom2rss = os.path.join(BASEPATH, "atom2rss.xsl")
htmlfeedfile = os.path.join(BASEPATH, "feed", "hfeed.html")
sitemapfile = os.path.join(BASEPATH, "sitemap.txt")
if not os.path.isdir(os.path.dirname(xmlfeedfile)):
os.makedirs(os.path.dirname(xmlfeedfile))
firstentry = entries[0]
for e in entries:
if not e.is_future:
firstentry = e
break
up_to_date = True
for f in [xmlfeedfile, htmlfeedfile, sitemapfile]:
if not os.path.exists(f) or os.path.getmtime(f) < firstentry.mtime:
up_to_date = False
if up_to_date:
return
logging.info("making feeds")
feed = etree.Element(
"feed",
nsmap={
None: "http://www.w3.org/2005/Atom",
"xlink": "https://www.w3.org/1999/xlink",
},
)
xmldoc = etree.ElementTree(feed)
feed.addprevious(
etree.ProcessingInstruction(
"xml-stylesheet",
'type="text/xsl" href="//petermolnar.net/feed/atom.xsl"',
)
)
feedid = etree.SubElement(feed, "id")
feedid.text = "%s/" % (SITEVARS["url"].strip("/"))
feedtitle = etree.SubElement(feed, "title")
feedtitle.text = "Latest entries from %s" % (SITEVARS["name"])
feedupdated = etree.SubElement(feed, "updated")
feedupdated.text = firstentry.dt.isoformat()
etree.SubElement(
feed,
"link",
attrib={
"href": "%s/feed/" % (SITEVARS["url"]),
"rel": "self",
"type": "application/rss+atom",
},
)
etree.SubElement(
feed,
"link",
attrib={
"href": SITEVARS["link"]["hub"],
"rel": "hub",
},
)
etree.SubElement(
feed,
"link",
attrib={
"href": SITEVARS["url"],
"rel": "alternate",
"type": "text/html",
},
)
icon = etree.SubElement(feed, "icon")
icon.text = "%s/favicon.png" % (SITEVARS["url"])
htmlentries = []
sitemapentries = []
rss_cntr = 0
# small_feed = None
for entry in entries:
if entry.is_future:
continue
if "post" != entry.type:
continue
sitemapentries.append(entry.url)
xmlentry = etree.SubElement(feed, "entry")
eid = etree.SubElement(xmlentry, "id")
eid.text = entry.url
etitle = etree.SubElement(xmlentry, "title")
etitle.text = entry.title
eupdated = etree.SubElement(xmlentry, "updated")
eupdated.text = entry.updated
epublished = etree.SubElement(xmlentry, "published")
epublished.text = entry.dt.isoformat()
atomauthor = etree.SubElement(xmlentry, "author")
atomauthor_name = etree.SubElement(atomauthor, "name")
atomauthor_name.text = entry.meta["author"]["name"]
etree.SubElement(
xmlentry,
"link",
attrib={
"href": entry.tmplvars["url"],
"rel": "alternate",
"type": "text/html",
},
)
ecategory = etree.SubElement(
xmlentry,
"category",
)
ecategory.text = entry.category
atomsummary = etree.SubElement(
xmlentry,
"summary",
attrib={"type": "html"},
)
atomsummary.text = entry.description
if "summary" in entry.meta:
cdata = "%s\n\n%s" % (entry.description, entry.html)
else:
cdata = "%s" % entry.html
if "in-reply-to" in entry.meta:
cdata = 'This post is a reply to: %s
%s' % (
entry.meta["in-reply-to"],
entry.meta["in-reply-to"],
cdata,
)
if "image" in entry.tmplvars:
etree.SubElement(
xmlentry,
"link",
attrib={
"rel": "enclosure",
"href": entry.tmplvars["image"]["src"],
"type": entry.tmplvars["image"]["mime"],
"length": str(entry.tmplvars["image"]["bytesize"]),
},
)
imgdata = '
' % (
entry.tmplvars["image"]["src"],
entry.title,
)
cdata = "%s\n%s" % (imgdata, cdata)
atomcontent = etree.SubElement(
xmlentry,
"content",
attrib={"type": "html"},
)
atomcontent.text = cdata
rss_cntr = rss_cntr + 1
if rss_cntr < 12:
htmlentries.append(entry.tmplvars)
if rss_cntr == 12:
# small_feed = deepcopy(xmldoc)
break
logging.info("saving normal ATOM feed")
with open(xmlfeedfile, "wb") as f:
f.write(
etree.tostring(
xmldoc,
encoding="utf-8",
xml_declaration=True,
pretty_print=True,
)
)
logging.info("saving HTML")
with open(htmlfeedfile, "wt") as f:
tmpl = J2.get_template("hfeed.j2.html")
tmplvars = {"feed": SITEVARS, "entries": htmlentries}
content = tmpl.render(tmplvars)
f.write(content)
logging.info("saving sitemap")
with open(sitemapfile, "wt") as f:
f.write("\n".join(sitemapentries))
return
class Webmentions(object):
def __init__(self):
self.cleanups = ["https://href.li/?"]
@property
def mtime(self):
mtime = 0
for md in sorted(
glob.glob(os.path.join(BASEPATH, "**", "*.md"), recursive=True)
):
if md.endswith("index.md"):
continue
maybe = os.path.basename(md).split("-")[0]
fmtime = int(os.path.getmtime(md))
if maybe.isnumeric():
fnamemtime = int(maybe)
fmtime = min(fnamemtime, fmtime)
mtime = max(mtime, fmtime)
return mtime
def new_webmention(self, webmention):
if "source" not in webmention:
logging.error(f"empty 'source' for: {webmention}")
return
if "target" not in webmention:
logging.error(f"empty 'source' for: {webmention}")
return
target = webmention.get("target")
for cleanup in self.cleanups:
target = target.replace(cleanup, "")
slug = os.path.split(urllib.parse.urlparse(target).path.lstrip("/"))[0]
# ignore selfpings
if slug == SITEVARS["domain"]:
logging.warning(f"selfping found: {webmention}")
return
if not len(slug):
logging.error(f"empty target in: {webmention}")
return
fdir = glob.glob(os.path.join(BASEPATH, "**", slug), recursive=True)
if not len(fdir):
logging.error(f"no target found for: {webmention}")
return
elif len(fdir) > 1:
logging.error(f"multiple targets found for: {webmention}")
return
fdir = fdir.pop()
parsed_url = urllib.parse.urlparse(webmention["source"])
author = {
"name": f"{parsed_url.hostname}",
"url": f"{parsed_url.scheme}://{parsed_url.hostname}",
}
for k, v in webmention["author"].items():
if v:
author[k] = v
dt = datetime.now(UTC)
try:
dt = datetime.fromisoformat(webmention["published"])
except TypeError:
logging.error("failed to parse dt in webmention, using 'now' as timestamp")
pass
timestamp = int(dt.timestamp())
url = slugify(webmention["source"])
slugfname = url[:200]
fpath = os.path.join(fdir, f"{timestamp}-{slugfname}.md")
meta = {
"author": author,
"date": dt.isoformat(),
"source": webmention["source"],
"target": webmention["target"],
"type": webmention.get("activity", {}).get("type", "webmention"),
}
try:
txt = webmention.get("content", "").strip()
except Exception:
txt = ""
pass
logging.info(f"saving webmention into {fpath}")
write_mdfile(fpath, meta, txt)
def run(self):
if "WEBMENTIONIO_TOKEN" not in os.environ:
return
# params = {
# "token": os.environ["WEBMENTIONIO_TOKEN"],
# "since": datetime.fromtimestamp(self.mtime)
# .replace(tzinfo=timezone.utc)
# .isoformat(),
# }
logging.info(f"requesting webmentions")
# wio = requests.get("https://webmention.io/api/mentions", params=params)
wio = requests.get(
f"https://webmention.petermolnar.net/webmention/petermolnar.net/{os.environ['WEBMENTIONIO_TOKEN']}"
)
if wio.status_code != requests.codes.ok:
raise Exception(
f"failed to query webmention.io: {wio.status_code} {wio.text}"
)
mentions = wio.json()
for webmention in mentions.get("json"):
self.new_webmention(webmention)
def run():
webmentions = Webmentions()
webmentions.run()
freshest_mtime = 0
everything = {
# unix timestamp: Entry object
}
categories = {
# category name string: Entry object
}
feed = {
# unix timestamp: Entry object
}
# collect data first
for e in sorted(
glob.glob(os.path.join(BASEPATH, "**", "index.md"), recursive=True)
):
logging.info(f"reading {e}")
doc = Entry(e)
logging.info(f"parsed {doc.type} :: {doc.category} :: {doc.entry}")
ts = int(doc.dt.timestamp())
everything[ts] = doc
freshest_mtime = max(doc.mtime, freshest_mtime)
if "category" == doc.type and doc.entry not in categories:
categories[doc.entry] = doc
# sort out categories and their posts
# select which posts can go into the feed(s)
# populate search, if needed
search = SearchDB()
for mtime, post in everything.items():
if "post" != post.type:
continue
if post.category not in categories:
continue
if post.is_future:
logging.warning(
f"skipping future entry {post.category} :: {post.entry} (sheduled for {post.dt})"
)
continue
post_ts = int(post.dt.timestamp())
if post_ts in categories[post.category].subentries:
maybe_problem = categories[post.category].subentries[post_ts]
logging.warning(
f"TIMESTAMP COLLISION IN CATEGORY {post.category}: {post.fpath} vs {maybe_problem.fpath}"
)
else:
categories[post.category].subentries[post_ts] = post
if post_ts in feed:
maybe_problem = feed[post_ts]
logging.warning(
f"TIMESTAMP COLLISION IN FEED: {post.fpath} vs {maybe_problem.fpath}"
)
else:
feed[post_ts] = post
search.append(post)
lang = post.meta.get("lang", "en")
if lang == "en" and post.category == "journal":
with open(f"/tmp/corpus_{lang}.txt", "at", encoding="utf-8") as c:
c.write(pandoc_formattedtext(post.txt))
search.__exit__()
# render
for post in everything.values():
try:
post.images()
str(post)
except NotImplementedError:
logging.error(f"{post.fpath} needs to wait")
# create feeds
mkfeed([feed[k] for k in sorted(feed.keys(), reverse=True)])
# create bookmarks html
# bookmarks = Bookmarks()
# str(bookmarks)
if __name__ == "__main__":
run()