diff --git a/.gitignore b/.gitignore index db13625127cecd935b214ac1d47863022fa9bb40..f7e0d45a9334785951d16cfd4716725f9e3fdc34 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ build/ dist/ deploy.sh +.DS_Store +*.xls +.venv/ diff --git a/hektor.py b/hektor.py index 058f9fe5cca315f527c6e5edddd0523dc2524019..ced0418c80c71a86418d5300a02810c44b111bb9 100644 --- a/hektor.py +++ b/hektor.py @@ -1,161 +1,199 @@ import argparse -import base64 +import functools import json -import re -import sys -import zipfile +import logging +import os +from typing import Any, Callable, Dict, Sequence -from lxml import etree +from xkcdpass import xkcd_password as xp -file_regex = re.compile( - r'(\d+)__(\d+)__(?P<data>results|qti|tst)_(?P<id>\d+).xml') -task_id_regex = re.compile(r'il_\d+_qst_(?P<task_id>\d+)') -tasks_path = ('./assessment/section') +from lib import Converter -users = './tst_active/row' -solutions = './tst_solutions/row[@question_fi="%s"]' -lecturer_xpath = ('./MetaData/Lifecycle/Contribute' - '[@Role="Author"]/Entity/text()') +# ============================== =- Logging -= ============================== # +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) +# create console handler and formatter +console = logging.StreamHandler() +console.setLevel(logging.DEBUG) +formatter = logging.Formatter('[%(levelname)s] %(message)s') -def eat_qti(tree, only_of_type=('assSourceCode',), **kwargs): - tasks = tree.xpath(tasks_path)[0] +# add formatter to console handler +console.setFormatter(formatter) +log.addHandler(console) - titles = tasks.xpath('./item/@title') - types = tasks.xpath( - './item/itemmetadata/qtimetadata/qtimetadatafield/' - 'fieldlabel[text()="QUESTIONTYPE"]/../fieldentry/text()') - ids = [re.search(task_id_regex, ident).group('task_id') - for ident in tasks.xpath('./item/@ident')] - texts = ['\n'.join(flow.xpath('./material/mattext/text()')) - for flow in tasks.xpath('./item/presentation/flow')] - return {id: {'title': title, 'text': text, 'type': type} - for id, type, title, text in zip(ids, types, titles, texts) - if not only_of_type or type in only_of_type} - - -def eat_users(results_tree): - return {row.attrib['active_id']: dict(row.attrib) - for row in results_tree.xpath(users)} - - -def convert_code(text): - return base64.b64decode(text).decode('utf-8').split('\n') - - -def eat_solutions(results_tree, task_id): - return {row.attrib['active_fi']: convert_code(row.attrib['value1']) - for row in results_tree.xpath(solutions % task_id)} - - -def eat_results(tree, qti=(), **kwargs): - questions = qti - users = eat_users(tree) - for user in users.values(): - user['submissions'] = {} - for question in questions: - solutions = eat_solutions(tree, question) - for user_id, solution in solutions.items(): - users[user_id]['submissions'][question] = solution - return users - - -def eat_tst(tree): - title = tree.xpath('./MetaData/General/Title/text()') - lecturer = tree.xpath(lecturer_xpath) - return {'exam': title[0], 'author': lecturer[0]} - - -def eval_file(archive, match, cache): - funcname = 'eat_' + match.group('data') - with archive.open(match.string) as datafile: - tree = etree.parse(datafile) - return globals()[funcname](tree, **cache) - - -def eat_archive(archive): - files = {match.group('data'): match - for match in (re.search(file_regex, name) - for name in archive.NameToInfo) - if match} +# ============================= =- argparse -= ============================== # +def parseme(): + def file_exists(parser, filepath: str) -> str: + if not os.path.isfile(filepath): + parser.error('Not a file %s' % filepath) + return filepath - order = ('tst', 'qti', 'results') - cache = {} + parser = argparse.ArgumentParser() + parser.add_argument( + 'input', + metavar='DATA', + type=lambda f: file_exists(parser, f), + help='A QTI-ZIP or a .xla Ilias export that contains course data') + parser.add_argument( + 'output', + metavar='OUTFILE', + help='Where you want to put the output') + parser.add_argument( + '-a', '--anonymous', + action='store_true', + help='Strip any personal information and create a reversing table') + parser.add_argument( + '-t', '--personal-secret-table', + help='Where to store personal information', + ) + parser.add_argument( + '-m', '--meta', + action='store_true', + help='If you want to add meta information (lecturer, course title)' + ) + + args = parser.parse_args() + + if args.anonymous != (args.personal_secret_table is not None): + parser.error('Need an output for anonymous mode') + + return args + + +# ========================== =- General Purpose -= ========================== # +def compose(*functions: Sequence[Callable]) -> Callable: + """ Standard function composition. Takes a Sequence of functions [f, g, h, ...] + and returns the composite function i(x) = f(g(h(x))). There are no checks + that validate if domain and image of these functions are compatible.""" + return functools.reduce(lambda f, + g: lambda x: f(g(x)), + functions, + lambda x: x) + + +# ========================== =- Post processors -= ========================== # +def anonymise(structured_data: Dict[str, Any]) -> Dict[str, Any]: + DELIMITER = '-' + wordfile = xp.locate_wordfile() + words = xp.generate_wordlist(wordfile=wordfile, + min_length=7, + max_length=7) + + def get_identifier(): + return xp.generate_xkcdpassword(words, numwords=2, delimiter=DELIMITER) - for key in order: - cache[key] = eval_file(archive, files[key], cache) + students = structured_data.pop('students') + reverser = {get_identifier(): s for s in students.values()} + students_anon = {r: { + 'fullname': ' '.join(w[0].capitalize() + w[1:] + for w in r.split(DELIMITER)), + 'identifier': r, + 'submissions': student['submissions'] + } for r, student in zip(reverser, students.values())} - return cache + with open(args.personal_secret_table, 'w') as out: + print('key, previous identifier, fullname', file=out) + print('\n'.join(anon + '\t' + '\t'.join(v + for v in data.values() + if type(v) is str) + for anon, data in reverser.items()), file=out) + return { + **structured_data, + 'students': students_anon + } -def add_meta(base, data): - base.update(data['tst']) +def add_meta_information(structured_data: Dict[str, Any]) -> Dict[str, Any]: + if args.meta: + structured_data['author'] = input('[Q] author: ') + structured_data['exam'] = input('[Q] course title: ') + return structured_data -def add_tasks(base, data): - base['tasks'] = data['qti'] +def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]: + def assert_submission(submission): + assert 'code' in submission, 'A submission needs code' + assert 'type' in submission, 'A submission has to be of some type' + assert 'tests' in submission, 'A tests dict has to be present.' -ignore_user_fields = ("user_fi", - "anonymous_id", - "test_fi", - "lastindex", - "tries", - "submitted", - "submittimestamp", - "tstamp", - "user_criteria",) + def assert_student(student): + log.debug('asserting %s (%d)' % (student['fullname'], + len(student['submissions']))) + assert 'fullname' in student, 'Student needs a name %s' % student + assert 'identifier' in student, 'Student needs a unique identifier' + def base_assert(): + assert 'students' in structured_data, 'No students found' + assert 'tasks' in structured_data, 'No tasks found' -def add_users(base, data): - for userdata in data['results'].values(): - for field in ignore_user_fields: - userdata.pop(field) - base['students'] = data['results'] + try: + base_assert() + students = structured_data['students'].values() + number_of_submissions = len(structured_data['tasks']) + for student in students: + try: + assert_student(student) + assert number_of_submissions == len(student['submissions']), \ + '%s does not have enough submissoins' % student['fullname'] + for submission in student['submissions']: -def give_me_structure(data): - base = {} + try: + assert_submission(submission) + except AssertionError as err: + log.warn(err) - add_meta(base, data) - add_tasks(base, data) - add_users(base, data) + except AssertionError as err: + log.warn(err) - return base + except AssertionError as err: + log.warn(err) + return structured_data -def eat_zipfile(input_file, output): - with zipfile.ZipFile(input_file) as archive: - data = dict(eat_archive(archive)) - structured_data = give_me_structure(data) +post_processors = [ + anonymise, + # add_meta_information, + # assert_correct_format +] - json.dump(structured_data, output, indent=2, sort_keys=True) + +# ============================== =- Hektor -= =============================== # +def _preprocessing(filepath: str) -> str: + return filepath + + +def _processing(filepath: str) -> Dict[str, Any]: + try: + return next(converter().convert(filepath) + for converter in Converter.implementations() + if converter.accept(filepath)) + except StopIteration as err: + log.error('No suitable converter found. Accepting only %s' % + ', '.join(f + for c in Converter.implementations() + for f in c.accepted_files)) -def parseme(): - parser = argparse.ArgumentParser() - parser.add_argument( - 'input', - metavar='FILE', - help='A ZIP file that contains a qit course') - parser.add_argument( - '-o', - '--output', - default=sys.stdout, - type=argparse.FileType('w'), - metavar='FILE', - help='Where you want to put the output') - return parser.parse_args() +def _postprocessing(structured_data: Dict[str, Any]) -> Dict[str, Any]: + return compose(*post_processors)(structured_data) def main(): - args = parseme() - eat_zipfile(args.input, args.output) + processing = compose(_postprocessing, _processing, _preprocessing) + data = processing(args.input) + destination = args.output.split('.json')[0] + '.json' + with open(destination, 'w') as output: + json.dump(data, output, indent=2, sort_keys=True) + log.info('Wrote exam data to %s', destination) if __name__ == '__main__': + args = parseme() main() diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..02d4a05ecb3f05a6031b621a53d865f6c609fc38 --- /dev/null +++ b/lib/__init__.py @@ -0,0 +1,5 @@ +# New modules need to be registered here + +from lib.generic import Converter +from lib.qti import QTIConverter +from lib.xls import XLSConverter diff --git a/lib/generic.py b/lib/generic.py new file mode 100644 index 0000000000000000000000000000000000000000..55279c50a9223b533bb9e8800dec5bdd2aea2313 --- /dev/null +++ b/lib/generic.py @@ -0,0 +1,27 @@ +import abc + + +def all_subclasses(cls): + return cls.__subclasses__() \ + + [g for s in cls.__subclasses__() for g in all_subclasses(s)] + + +class Converter(metaclass=abc.ABCMeta): + """ A base class if we incorporate more converters in the future """ + + @abc.abstractmethod + def convert(self): + pass + + @property + @abc.abstractclassmethod + def accepted_files(cls): + pass + + @classmethod + def implementations(cls): + return all_subclasses(cls) + + @classmethod + def accept(cls, filepath): + return any(filepath.endswith(ending) for ending in cls.accepted_files) diff --git a/lib/qti.py b/lib/qti.py new file mode 100644 index 0000000000000000000000000000000000000000..bfffd95640e49d7acd1d351acfeefbf264176719 --- /dev/null +++ b/lib/qti.py @@ -0,0 +1,145 @@ +import base64 +import re +import zipfile + +from lxml import etree + +import lib.generic + + +class QTIConverter(lib.generic.Converter): + """docstring for XLSConverter""" + + accepted_files = ('.zip', '.xml') + + def convert(self, filepath): + with zipfile.ZipFile(filepath) as archive: + data = dict(process_archive(archive)) + + return give_me_structure(data) + + +file_regex = re.compile( + r'(\d+)__(\d+)__(?P<data>results|qti|tst)_(?P<id>\d+).xml') +task_id_regex = re.compile(r'il_\d+_qst_(?P<task_id>\d+)') + +tasks_path = ('./assessment/section') + +users = './tst_active/row' +solutions = './tst_solutions/row[@question_fi="%s"]' + +lecturer_xpath = ('./MetaData/Lifecycle/Contribute' + '[@Role="Author"]/Entity/text()') + +types_xpath = ('./item/itemmetadata/qtimetadata/qtimetadatafield/' + 'fieldlabel[text()="QUESTIONTYPE"]/../fieldentry/text()') + + +def process_qti(tree, only_of_type=('assSourceCode',), **kwargs): + tasks = tree.xpath(tasks_path)[0] + + titles = tasks.xpath('./item/@title') + types = tasks.xpath(types_xpath) + ids = [re.search(task_id_regex, ident).group('task_id') + for ident in tasks.xpath('./item/@ident')] + texts = ['\n'.join(flow.xpath('./material/mattext/text()')) + for flow in tasks.xpath('./item/presentation/flow')] + + return {id: {'title': title, 'text': text, 'type': type} + for id, type, title, text in zip(ids, types, titles, texts) + if not only_of_type or type in only_of_type} + + +def process_users(results_tree): + return {row.attrib['active_id']: dict(row.attrib) + for row in results_tree.xpath(users)} + + +def convert_code(text): + return base64.b64decode(text).decode('utf-8').split('\n') + + +def process_solutions(results_tree, task_id): + return {row.attrib['active_fi']: convert_code(row.attrib['value1']) + for row in results_tree.xpath(solutions % task_id)} + + +def process_results(tree, qti=(), **kwargs): + questions = qti + users = process_users(tree) + for user in users.values(): + user['submissions'] = [] + for question in questions: + solutions = process_solutions(tree, question) + for user_id, solution in solutions.items(): + users[user_id]['submissions'].append({'type': question, + 'code': solution, + 'tests': {}}) + return users + + +def process_tst(tree): + title = tree.xpath('./MetaData/General/Title/text()') + lecturer = tree.xpath(lecturer_xpath) + return {'exam': title[0], 'author': lecturer[0]} + + +def eval_file(archive, match, cache): + funcname = 'process_' + match.group('data') + with archive.open(match.string) as datafile: + tree = etree.parse(datafile) + return globals()[funcname](tree, **cache) + + +def process_archive(archive): + files = {match.group('data'): match + for match in (re.search(file_regex, name) + for name in archive.NameToInfo) + if match} + + order = ('tst', 'qti', 'results') + cache = {} + + for key in order: + cache[key] = eval_file(archive, files[key], cache) + + return cache + + +def add_meta(base, data): + base.update(data['tst']) + + +def add_tasks(base, data): + base['tasks'] = list(data['qti'].values()) + + +ignore_user_fields = ("user_fi", + "active_id", + "usr_id", + "anonymous_id", + "test_fi", + "lastindex", + "tries", + "submitted", + "submittimestamp", + "tstamp", + "user_criteria",) + + +def add_users(base, data): + for userdata in data['results'].values(): + userdata['identifier'] = userdata['user_fi'] + for field in ignore_user_fields: + userdata.pop(field) + base['students'] = data['results'] + + +def give_me_structure(data): + base = {} + + add_meta(base, data) + add_tasks(base, data) + add_users(base, data) + + return base diff --git a/lib/xls.py b/lib/xls.py new file mode 100755 index 0000000000000000000000000000000000000000..d4d29177142551e4712115a2115f3993246bfd85 --- /dev/null +++ b/lib/xls.py @@ -0,0 +1,153 @@ +#!/usr/local/bin/python3 +""" a simple script that converts ilias exam output to readable json + +The json output will look like this: +{ + "max.mustermann": { <<--- OR all uppercase letter of the name + username/matrikel_no # noqa: E501 + "matrikel_no": "12345678", + "name": "Mustermann, Max", + "task_list": { + "[task_id_1]": "print Hello World!", + ...., + "[task_id_n]": "#include <stdio.h> etc." + } + }, + ... ans so on +} + +usage: convert.py [-h] [-u USERNAMES] [-n NUMBER_OF_TASKS] INFILE OUTFILE + +positional arguments: + INFILE Ilias exam data + OUTFILE Where to write the final file + +optional arguments: + -h, --help show this help message and exit + -u USERNAMES, --usernames USERNAMES + a json dict matno -> email + -n NUMBER_OF_TASKS, --NUMBER_OF_TASKS NUMBER_OF_TASKS + Where to write the final file + + +Author: Jan Maximilian Michal +Date: 30 March 2017 +""" + +import json +import os +import re +import urllib.parse +from collections import defaultdict, namedtuple + +from xlrd import open_workbook + +import lib.generic + + +class XLSConverter(lib.generic.Converter): + """docstring for XLSConverter""" + + accepted_files = ('.xls',) + + def convert(self, filepath): + return converter(filepath) + + +# one user has one submission (code) per task +# yes, I know it is possible to name match groups via (?P<name>) but +# I like this solution better since it gets the job done nicely +user_t = namedtuple('user_head', 'name matrikel_no') + +# one task has a title and id and hpfly code +task_head_re = re.compile(r'^Quellcode Frage (?P<title>.*?) ?(\d{8})?$') + +# nor parsing the weird mat no +matno_re = re.compile(r'^(?P<matrikel_no>\d{8})-(\d+)-(\d+)$') + +COLUMNS_BEFORE_TASKS = 19 + + +def converter(infile, usernames=None, number_of_tasks=0,): + + # Modify these iterators in order to change extraction behaviour + + def sheet_iter_meta(sheet): + """ yield first and second col entry as tuple of (name, matnr) """ + for row in (sheet.row(i) for i in range(1, sheet.nrows)): + match = re.search(matno_re, row[1].value) + if match: + yield row[0].value, match.group('matrikel_no') + + def sheet_iter_data(sheet): + """ yields all source code titel and code tuples """ + def row(i): + return sheet.row(i) + for top, low in ((row(i), row(i + 1)) for i in range(sheet.nrows - 1)): + if any(map(lambda c: c.ctype, top)) and 'Quell' in top[0].value: + yield (' '.join(c.value for c in top), + ' '.join(c.value for c in low)) + + # meta sheet contains ilias names usernames etc - data contains code + meta, *data = open_workbook(infile, open(os.devnull, 'w')).sheets() + + # nice! + name2mat = dict(sheet_iter_meta(meta)) + assert len(name2mat) == len(data), f'{len(name2mat)} names != {len(data)} sheets' # noqa + + # from xls to lists and namedtuples + # [ [user0, task0_h, code0, ..., taskn, coden ], ..., [...] ] + root = [] + tasks = {} + for user, sheet in zip(sheet_iter_meta(meta), data): + root.append([user_t(*user)]) + for task, code in sheet_iter_data(sheet): + task = re.search(task_head_re, task) + task_title = task.group('title') + tasks[task_title] = { + 'title': task_title, + 'type': 'SourceCode' + } + root[-1].append(task.group('title')) + root[-1].append(urllib.parse.unquote(code).strip()) + + if number_of_tasks: + for (user, *task_list) in sorted(root, key=lambda u: u[0].name): + assert len(task_list) == number_of_tasks * 2 + + mat_to_email = defaultdict(str) + if usernames: + with open(usernames) as data: + mat_to_email.update(json.JSONDecoder().decode(data.read())) + + def get_username(user): + if name2mat[user.name] in mat_to_email: + return mat_to_email[name2mat[user.name]].split('@')[0] + return ''.join(filter(str.isupper, user.name)) + name2mat[user.name] + + usernames = {user.name: get_username(user) for (user, *_) in root} + + return { + 'students': { + usernames[user.name]: { + 'fullname': user.name, + 'email': mat_to_email[name2mat[user.name]], + 'identifier': name2mat[user.name], + 'submissions': [ + { + "type": task, + "code": code, + "tests": {}, + } for task, code in zip(task_list[::2], task_list[1::2]) + ] + } for (user, *task_list) in sorted(root, key=lambda u: u[0].name) + }, + 'tasks': list(tasks.values()) + } + + +def write_to_file(json_dict, outfile): + # just encode python style + with open(outfile, "w") as out: + json.dump(json_dict, out, indent=2) + + print(f"Wrote data to {outfile}. Done.") diff --git a/requirements.txt b/requirements.txt index 91d90eb3ef3c2cc53e17dc89c1da38dc71347fcb..0853005a985601f9e58239a48f3e0b67887b2c1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ lxml~=4.1.1 +xlrd~=1.1.0 +xkcdpass~=1.16.0