From 828ac4f81f1d8504c3af678f9cadeba757ebef50 Mon Sep 17 00:00:00 2001 From: janmax <j.michal@stud.uni-goettingen.de> Date: Sun, 11 Mar 2018 20:45:08 +0100 Subject: [PATCH] Added encryption/decreption capabilities * Also the command line options should now work as expected * Added an identity converter that just outputs .json output. This allows to postprocess files that have already been created. * Breaking changes to the data format: students is now a list * Bump python version to 3.5 to support typing --- .gitignore | 1 + .gitlab-ci.yml | 17 ++- Makefile | 2 +- bin/hektor | 5 +- hektor.py | 268 +++++++++++++++++++++++++++++++++++++----------- lib/__init__.py | 1 + lib/generic.py | 3 +- lib/identity.py | 15 +++ lib/qti.py | 14 +-- lib/xls.py | 43 +++----- setup.py | 6 +- 11 files changed, 261 insertions(+), 114 deletions(-) create mode 100644 lib/identity.py diff --git a/.gitignore b/.gitignore index f7e0d45..b130d9c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ deploy.sh .DS_Store *.xls .venv/ +testall.sh diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6afa155..c1d1777 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,14 +1,8 @@ -image: python:3.4 +image: python:3.5 before_script: - python -V - -variables: - PIP_CACHE_DIR: "$CI_PROJECT_DIR/pip-cache" - -cache: - paths: - - "$CI_PROJECT_DIR/pip-cache" + - pip install -e . variables: PIP_CACHE_DIR: "$CI_PROJECT_DIR/pip-cache" @@ -17,8 +11,11 @@ cache: paths: - "$CI_PROJECT_DIR/pip-cache" -test: +flake8: script: - - pip install -e . - pip install flake8 - flake8 hektor.py bin lib + +test: + script: + - hektor -h diff --git a/Makefile b/Makefile index b39174c..bfffa42 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,4 @@ upload: dist twine upload dist/* tag: - git tag $(python setup.py --version) + git tag `python setup.py --version` diff --git a/bin/hektor b/bin/hektor index 8d68445..6e708f8 100755 --- a/bin/hektor +++ b/bin/hektor @@ -1,9 +1,10 @@ #!/usr/bin/env python3 -import hektor import sys +import hektor + if __name__ == '__main__': - if sys.version_info < (3, 4): + if sys.version_info < (3, 5): sys.exit("At least Python 3.4 is required.") hektor.main() diff --git a/hektor.py b/hektor.py index d84da1b..82faed7 100644 --- a/hektor.py +++ b/hektor.py @@ -1,38 +1,57 @@ import argparse +import base64 import functools +import getpass import json import logging import os -from typing import Any, Callable, Dict, Sequence +from typing import Any, Callable, Dict, List, Sequence, Union +from cryptography.fernet import Fernet from xkcdpass import xkcd_password as xp - from lib import Converter # ============================== =- Logging -= ============================== # -log = logging.getLogger(__name__) -log.setLevel(logging.DEBUG) +def setup_logging(): + ''' Make the logger globally available by hide intermediate handler, + filters and formatter variables ''' + global log + + level = logging.DEBUG if args.verbose else logging.INFO + + log = logging.getLogger(__name__) + log.setLevel(level) -# create console handler and formatter -console = logging.StreamHandler() -console.setLevel(logging.DEBUG) -formatter = logging.Formatter('[%(levelname)s] %(message)s') + # create console handler and formatter + console = logging.StreamHandler() + console.setLevel(level) + formatter = logging.Formatter('[%(levelname)s] %(message)s') -# add formatter to console handler -console.setFormatter(formatter) -log.addHandler(console) + # add formatter to console handler + console.setFormatter(formatter) + log.addHandler(console) # ============================= =- argparse -= ============================== # -def parseme(): +def setup_argparse(): + global args + def file_exists(parser, filepath: str) -> str: if not os.path.isfile(filepath): parser.error('Not a file %s' % filepath) return filepath parser = argparse.ArgumentParser() + + # General purpose arguments + parser.add_argument( + '-v', '--verbose', + action='store_true', + help='enable verbose logging (Level: DEBUG)') + + # Input output files parser.add_argument( 'input', metavar='DATA', @@ -42,7 +61,21 @@ def parseme(): 'output', metavar='OUTFILE', help='destination of converter output (JSON)') - parser.add_argument( + + # Post-processor flags + remove_personal = parser.add_mutually_exclusive_group() + remove_personal.add_argument( + '-e', '--encrypt', + action='store_true', + help='''strip all personal information and provide decryption key + (AES 128-bit, CBC mode, PKCS7 for padding, HMAC with SHA-256 + for integrity)''' + ) + remove_personal.add_argument( + '-d', '--decrypt', + action='store_true', + help='Reverse previous AES encryption.') + remove_personal.add_argument( '-a', '--anonymous', action='store_true', help='replace personal information and create a reversing table') @@ -51,17 +84,21 @@ def parseme(): help='where to store personal information (CSV)', ) parser.add_argument( - '-m', '--meta', + '-m', '--add-meta', action='store_true', help='add meta information (lecturer, course title)' ) + parser.add_argument( + '--verify', + action='store_true', + default=True, + help='asserts that output data will be in a certain format' + ) args = parser.parse_args() if args.anonymous != (args.personal_secret_table is not None): - parser.error('Need an output for anonymous mode') - - return args + parser.error('Please specify where to write the mapping (see -t)') # ========================== =- General Purpose -= ========================== # @@ -75,45 +112,24 @@ def compose(*functions: Sequence[Callable]) -> Callable: lambda x: x) -# ========================== =- Post processors -= ========================== # -def anonymise(structured_data: Dict[str, Any]) -> Dict[str, Any]: - DELIMITER = '-' - wordfile = xp.locate_wordfile() - words = xp.generate_wordlist(wordfile=wordfile, - min_length=7, - max_length=7) - - def get_identifier(): - return xp.generate_xkcdpassword(words, numwords=2, delimiter=DELIMITER) - - students = structured_data.pop('students') - reverser = {get_identifier(): s for s in students.values()} - students_anon = {r: { - 'fullname': ' '.join(w[0].capitalize() + w[1:] - for w in r.split(DELIMITER)), - 'identifier': r, - 'submissions': student['submissions'] - } for r, student in zip(reverser, students.values())} - - with open(args.personal_secret_table, 'w') as out: - print('key, previous identifier, fullname', file=out) - print('\n'.join(anon + '\t' + '\t'.join(v - for v in data.values() - if type(v) is str) - for anon, data in reverser.items()), file=out) - - structured_data.update({'students': students_anon}) - return structured_data +def abort(message='Bye.'): + ''' In case anything goes wrong. Basically a dump wrapper around exit ''' + log.info(message) + exit(1) -def add_meta_information(structured_data: Dict[str, Any]) -> Dict[str, Any]: - if args.meta: - structured_data['author'] = input('[Q] author: ') - structured_data['exam'] = input('[Q] course title: ') +# ========================== =- Post processors -= ========================== # +def do_add_meta(structured_data: Dict[str, Any]) -> Dict[str, Any]: + ''' Asks the user for metadata about the exam ''' + structured_data['author'] = input('[Q] author: ') + structured_data['exam'] = input('[Q] course title: ') return structured_data -def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]: +def do_verify(structured_data: Dict[str, Any]) -> Dict[str, Any]: + ''' The is the testable specification of the format that is output by + hector. Since multiple formats are compiled into this one verification is + on by default. The impact on performance is neglectable. ''' def assert_submission(submission): assert 'code' in submission, 'A submission needs code' assert 'type' in submission, 'A submission has to be of some type' @@ -124,6 +140,7 @@ def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]: len(student['submissions']))) assert 'fullname' in student, 'Student needs a name %s' % student assert 'identifier' in student, 'Student needs a unique identifier' + assert 'username' in student, 'Student needs a unique username' def base_assert(): assert 'students' in structured_data, 'No students found' @@ -131,14 +148,14 @@ def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]: try: base_assert() - students = structured_data['students'].values() + students = structured_data['students'] number_of_submissions = len(structured_data['tasks']) for student in students: try: assert_student(student) assert number_of_submissions == len(student['submissions']), \ - '%s does not have enough submissoins' % student['fullname'] + '%s does not have enough submissions' % student['fullname'] for submission in student['submissions']: try: @@ -155,11 +172,135 @@ def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]: return structured_data -post_processors = [ - anonymise, - # add_meta_information, - # assert_correct_format -] +def student_replacer(processor): + ''' A simple decorator that is used to remove students and put them back in + when the preprocessor is dome with them''' + + @functools.wraps(processor) + def processor_closure(structured_data: Dict[str, Any]) -> Dict[str, Any]: + students = structured_data.pop('students') + students_replacement = processor(students) + structured_data['students'] = students_replacement + return structured_data + + return processor_closure + + +@student_replacer +def do_anonymous(students: Dict[str, Union[str, List]]): + ''' Recreates most of the data and includes fields over a whitelist + therefore ensuring that no personal information remains in the data ''' + DELIMITER = '-' + wordfile = xp.locate_wordfile() + words = xp.generate_wordlist(wordfile=wordfile, + min_length=7, + max_length=7) + + def get_random_xkcd_identifier(): + return xp.generate_xkcdpassword(words, numwords=2, delimiter=DELIMITER) + + reverser = {get_random_xkcd_identifier(): s for s in students} + students_anonymous = [{ + 'fullname': ' '.join(w[0].capitalize() + w[1:] + for w in r.split(DELIMITER)), + 'identifier': r, + 'username': r, + 'submissions': student['submissions'] + } for r, student in zip(reverser, students)] + + with open(args.personal_secret_table, 'w') as out: + print('key, previous identifier, fullname', file=out) + print('\n'.join('%s %s %s' % (anonymous_key, + data['identifier'], + data['fullname']) + for anonymous_key, data in reverser.items()), file=out) + + return students_anonymous + + +@student_replacer +def do_encrypt(students): + + # Init Crypto. See the module documentation on what actually happens here, + # then read all about those methods and then go study number theory. Never + # roll your own custom crypto ;-) + key = Fernet.generate_key() + aes = Fernet(key) + + def encrypt(clear: str) -> str: + return base64.b64encode(aes.encrypt(clear.encode())).decode('utf-8') + + output_the_key_to_the_user(key) + return transform(students, encrypt) + + +@student_replacer +def do_decrypt(students): + + def decrypt(cipher: str) -> str: + return aes.decrypt(base64.b64decode(cipher.encode())).decode('utf-8') + + try: + key = getpass.getpass('[Q] Give me the decryption key: ') + aes = Fernet(key) + + return transform(students, decrypt) + except Exception as err: + abort('Your key is bad (%s).' % err) + + +def transform(students, function): + return [ + {'fullname': function(student['fullname']), + 'identifier': function(student['identifier']), + 'username': function(student['username']), + 'submissions': student['submissions']} for student in students + ] + + +def output_the_key_to_the_user(key: bytes): + + def to_file(filepath: str): + with open(filepath, 'wb') as file: + file.write(key) + log.info('Key written to %s. Keep it safe.', filepath) + + def to_stdout(): + print('Encrypted and signed. Key this key safe or bad things happen') + print(' --------->> %s <<--------- ' % key.decode('latin-1')) + + output = input('[Q] The data has been encrypted. ' + + 'Where should I put the key? (stdout) ') or 'stdout' + + if output == 'stdout': + to_stdout() + + elif not os.path.exists(output): + to_file(output) + + elif os.path.isfile(output): + confirm = input('[Q] File exists. Want to override? (Y/n)') or 'y' + if confirm.lower().startswith('y'): + to_file(output) + else: + abort('No data was written. Bye.') + + else: + log.error('I cannot write to %s.', output) + abort() + + +def get_active_postprocessors(): + postprocessor_order = ( + do_add_meta, + do_verify, + do_anonymous, + do_encrypt, + do_decrypt + ) + + return (p for p in postprocessor_order + if getattr(args, p.__name__.split('do_')[1])) # ============================== =- Hektor -= =============================== # @@ -177,21 +318,26 @@ def _processing(filepath: str) -> Dict[str, Any]: ', '.join(f for c in Converter.implementations() for f in c.accepted_files)) + abort('Program stopped prematurely. No data was written. Bye.') def _postprocessing(structured_data: Dict[str, Any]) -> Dict[str, Any]: - return compose(*post_processors)(structured_data) + return compose(*get_active_postprocessors())(structured_data) def main(): - global args - args = parseme() + setup_argparse() + setup_logging() + + log.debug('Active post processors %s', list(get_active_postprocessors())) processing = compose(_postprocessing, _processing, _preprocessing) data = processing(args.input) + destination = args.output.split('.json')[0] + '.json' with open(destination, 'w') as output: json.dump(data, output, indent=2, sort_keys=True) + log.info('Wrote exam data to %s', destination) diff --git a/lib/__init__.py b/lib/__init__.py index f7c0d5e..9cc9e90 100644 --- a/lib/__init__.py +++ b/lib/__init__.py @@ -3,3 +3,4 @@ from lib.generic import Converter # noqa from lib.qti import QTIConverter # noqa from lib.xls import XLSConverter # noqa +from lib.identity import JSONIdentityConverter # noqa diff --git a/lib/generic.py b/lib/generic.py index 55279c5..008199c 100644 --- a/lib/generic.py +++ b/lib/generic.py @@ -7,7 +7,8 @@ def all_subclasses(cls): class Converter(metaclass=abc.ABCMeta): - """ A base class if we incorporate more converters in the future """ + """ A base class if we incorporate more converters in the future. New + implementations need to be registered in this modules __init__.py """ @abc.abstractmethod def convert(self): diff --git a/lib/identity.py b/lib/identity.py new file mode 100644 index 0000000..bf6afcf --- /dev/null +++ b/lib/identity.py @@ -0,0 +1,15 @@ +import json + +import lib.generic + + +class JSONIdentityConverter(lib.generic.Converter): + """ This serves as an identity if you wish to import a json file + that you generated earlier with hektor and you now want to run a + preprocessor on it. """ + + accepted_files = ('.json',) + + def convert(self, filepath): + with open(filepath) as json_input: + return json.load(json_input) diff --git a/lib/qti.py b/lib/qti.py index bfffd95..8441fb3 100644 --- a/lib/qti.py +++ b/lib/qti.py @@ -8,7 +8,7 @@ import lib.generic class QTIConverter(lib.generic.Converter): - """docstring for XLSConverter""" + """ XLSConverter class (Currently raw xml input is not supported) """ accepted_files = ('.zip', '.xml') @@ -51,8 +51,7 @@ def process_qti(tree, only_of_type=('assSourceCode',), **kwargs): def process_users(results_tree): - return {row.attrib['active_id']: dict(row.attrib) - for row in results_tree.xpath(users)} + return [dict(row.attrib) for row in results_tree.xpath(users)] def convert_code(text): @@ -67,14 +66,15 @@ def process_solutions(results_tree, task_id): def process_results(tree, qti=(), **kwargs): questions = qti users = process_users(tree) - for user in users.values(): + id2user = {user['active_id']: user for user in users} + for user in users: user['submissions'] = [] for question in questions: solutions = process_solutions(tree, question) for user_id, solution in solutions.items(): - users[user_id]['submissions'].append({'type': question, - 'code': solution, - 'tests': {}}) + id2user[user_id]['submissions'].append({'type': question, + 'code': solution, + 'tests': {}}) return users diff --git a/lib/xls.py b/lib/xls.py index d4d2917..a625ce8 100755 --- a/lib/xls.py +++ b/lib/xls.py @@ -1,20 +1,6 @@ #!/usr/local/bin/python3 """ a simple script that converts ilias exam output to readable json -The json output will look like this: -{ - "max.mustermann": { <<--- OR all uppercase letter of the name + username/matrikel_no # noqa: E501 - "matrikel_no": "12345678", - "name": "Mustermann, Max", - "task_list": { - "[task_id_1]": "print Hello World!", - ...., - "[task_id_n]": "#include <stdio.h> etc." - } - }, - ... ans so on -} - usage: convert.py [-h] [-u USERNAMES] [-n NUMBER_OF_TASKS] INFILE OUTFILE positional arguments: @@ -92,7 +78,7 @@ def converter(infile, usernames=None, number_of_tasks=0,): # nice! name2mat = dict(sheet_iter_meta(meta)) - assert len(name2mat) == len(data), f'{len(name2mat)} names != {len(data)} sheets' # noqa + assert len(name2mat) == len(data), '{} names != {} sheets'.format(len(name2mat), len(data)) # noqa # from xls to lists and namedtuples # [ [user0, task0_h, code0, ..., taskn, coden ], ..., [...] ] @@ -127,20 +113,17 @@ def converter(infile, usernames=None, number_of_tasks=0,): usernames = {user.name: get_username(user) for (user, *_) in root} return { - 'students': { - usernames[user.name]: { - 'fullname': user.name, - 'email': mat_to_email[name2mat[user.name]], - 'identifier': name2mat[user.name], - 'submissions': [ - { - "type": task, - "code": code, - "tests": {}, - } for task, code in zip(task_list[::2], task_list[1::2]) - ] - } for (user, *task_list) in sorted(root, key=lambda u: u[0].name) - }, + 'students': [{ + 'fullname': user.name, + 'username': usernames[user.name], + 'email': mat_to_email[name2mat[user.name]], + 'identifier': name2mat[user.name], + 'submissions': [{ + "type": task, + "code": code, + "tests": {}, + } for task, code in zip(task_list[::2], task_list[1::2])] + } for (user, *task_list) in sorted(root, key=lambda u: u[0].name)], 'tasks': list(tasks.values()) } @@ -150,4 +133,4 @@ def write_to_file(json_dict, outfile): with open(outfile, "w") as out: json.dump(json_dict, out, indent=2) - print(f"Wrote data to {outfile}. Done.") + print("Wrote data to %s. Done." % outfile) diff --git a/setup.py b/setup.py index 537679f..2740835 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ from setuptools import setup setup( name='hektor', - version='0.2.2', + version='0.3', description='A QTI-XML/XLS to JSON converter for humans', author='Jan Maximilian Michal', author_email='mail@janmax.org', @@ -13,6 +13,8 @@ setup( scripts=['bin/hektor'], install_requires=["lxml~=4.1.1", "xlrd~=1.1.0", + "cryptography~=2.1.4", "xkcdpass~=1.16.0"], - py_modules=['hektor', 'lib'] + py_modules=['hektor'], + packages=['lib'] ) -- GitLab