Skip to content
Snippets Groups Projects
reader.py 7.05 KiB
import datatypes
import pandoc

import frontmatter

from datetime import datetime

import subprocess
import os


class Reader:
    def __init__(self, config, factories):
        self.config = config
        self.factories = factories
    def can_read_file(self, mimetype, mimeencoding):
        raise Exception("Function not implemented.")
    def read_file(self, path, subpath):
        raise Exception("Function not implemented.")


class RawFileReader(Reader):

    def can_read_file(self, mimetype, mimeencoding):
        #if mimeencoding != None:
        #    return False
        return True

    def read_file(self, path, subpath, mimetype, mimeencoding):
        f = open(path, 'rb')
        rawcontents = f.read()
        f.close()
        pathlist = path.split('/')
        subpathlist = pathlist[:-1]
        filename = pathlist[-1]

        self.factories['file'].get(filename).init(rawcontents, subpathlist)

class PandocReader(Reader):

    def __init__(self, config, factories, mimetype, mimeencoding, base, extensions):
        super().__init__(config, factories)
        self.mimetype = mimetype
        self.mimeencoding = mimeencoding
        self.base = base
        self.extensions = extensions

    def can_read_file(self, mimetype, mimeencoding):
        if self.mimetype != mimetype:
            return False
        if self.mimeencoding != mimeencoding:
            return False
        return True


    def read_file(self, path, subpath, mimetype, mimeencoding):
        if len(subpath) > 1:
            raise Exception("file is too deep in directory structure: ", path, subpath)


        #print("parsing file: ", path, subpath, mimetype, mimeencoding)

        f = open(path)
        rawfile = f.read()
        f.close()
        metadata, rawcontent = frontmatter.parse(rawfile)

        #print(metadata)

        category_name = self.get_category_name(metadata, subpath)

        # slug and lang
        pathlist = path.split('/')
        filename = pathlist[-1]
        filenamelist = filename.split('.')
        filenamelist = filenamelist[:-1] # remove .md
        slug = None
        lang = None
        if len(filenamelist) < 1:
            raise Exception("filename is empty?", path, subpath)
        elif len(filenamelist) == 1:
            slug = filenamelist[0]
        elif len(filenamelist) == 2:
            slug = filenamelist[0]
            lang = filenamelist[1]
        if 'slug' in metadata:
            slug = metadata['slug']
        if 'lang' in metadata:
            lang = metadata['lang']

        if lang == None:
            lang = self.config['lang']['default']

        if not self.is_supported_lang(lang):
            raise Exception("language is not supported: ", lang)
        slug = self.secure_slug(slug)
        #print("slug: ", slug)
        #print("lang: ", lang)

        # content
        #content, contentmetadata = pandoc.run_pandoc(factories=self.factories, lang=lang, source=rawcontent, base=self.config['pandoc']['base'], extensions=self.config['pandoc']['extensions'])
        content, contentmetadata = pandoc.run_pandoc(factories=self.factories, lang=lang, source=rawcontent, base=self.base, extensions=self.extensions)
        metadata.update(contentmetadata) # merge content specific metadata into metadata
        #print(content)

        # title
        if 'title' not in metadata:
            raise Exception("File is missing title in metadata: ", path, subpath)
        title = metadata['title']

        # date_created and date_modified
        date_modified = datetime.now()
        date_created = datetime.now()
        date_changes = self.run_git(path, "log", ["--follow", "--format=%ad", "--date", "iso-strict"]).splitlines()
        #print("date_changes: ", date_changes)
        if (len(date_changes) > 0):
            date_modified = datetime.fromisoformat(date_changes[0])
            date_created = datetime.fromisoformat(date_changes[-1])
        if 'date' in metadata:
            date_created = metadata['date']
        if 'modified' in metadata:
            date_modified = metadata['modified']
        #print("created: ", date_created)
        #print("last changed: ", date_modified)

        # author
        # TODO author from metadata
        authors_raw = self.run_git(path, "log", ["--follow", "--format=%aE@%aN", "--use-mailmap"]).splitlines()
        authors = []
        known_author_raws = []
        for author_raw in authors_raw:
            if author_raw not in known_author_raws:
                authors.append(self.extract_author(author_raw))
                known_author_raws.append(author_raw)
        if len(authors_raw) > 0:
            last_modification_author = self.extract_author(authors_raw[0])
        else:
            last_modification_author = None



        # status
        status = self.config['default_status']
        if 'status' in metadata:
            status = metadata['status']
        valid_status = ["published", "draft", "hidden"]
        if status not in valid_status:
            raise Exception("invalid status '", status, "' must be one of ", valid_status)

        # TODO summary

        p = self.factories['page'].get(slug, lang)
        p.init(
                filename,
                subpath,
                rawfile,
                metadata,
                content,
                title,
                category_name,
                date_created,
                date_modified,
                authors,
                last_modification_author,
                status)
        return p

    def extract_author(self, raw):
        author_split = raw.split('@')
        author_local_part = author_split[0]
        author_domain = author_split[1]
        author_name = '@'.join(author_split[2:])
        return (author_local_part, author_domain, author_name)

    def is_supported_lang(self, lang):
        if not isinstance(lang, str):
            return False
        return (lang in self.config['lang']['supported'])


    def secure_slug(self, slug):
        if not isinstance(slug, str):
            raise Exception("slug is not a string: '", slug, "'")
        slug = slug.lower()
        whitelist = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-"
        res = ""
        for c in slug:
            if c in whitelist:
                #print ("c: '", c,"'")
                res += c
        #print("res: '", res, "'")
        if len(res) == 0:
            raise Exception("slug is empty")
        return res


    def get_category_name(self, metadata, subpath):
        if 'category' in metadata:
            return metadata['category']
        elif len(subpath) == 1:
            return subpath[0]
        else:
            return 'misc'

    def run_git(self, path, subcmd, extra_args):
        real_path = os.path.realpath(path)
        filename = os.path.basename(real_path)
        dir_path = os.path.dirname(real_path)
        git_bin = "git"
        args = [git_bin, subcmd] + extra_args + ["--", filename]
        p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, cwd=dir_path)
        out, _ = p.communicate("".encode('utf-8', errors='strict'))
        out_str = out.decode('utf-8')
        return out_str