Skip to content
Snippets Groups Projects
Commit 16e0d01e authored by Jan Maximilian Michal's avatar Jan Maximilian Michal
Browse files

Major refactor added basic anonymisaton

parent bfa055bc
No related branches found
No related tags found
No related merge requests found
...@@ -7,3 +7,6 @@ build/ ...@@ -7,3 +7,6 @@ build/
dist/ dist/
deploy.sh deploy.sh
.DS_Store
*.xls
.venv/
import argparse import argparse
import base64 import functools
import json import json
import re import logging
import sys import os
import zipfile from typing import Any, Callable, Dict, Sequence
from lxml import etree from xkcdpass import xkcd_password as xp
file_regex = re.compile(
r'(\d+)__(\d+)__(?P<data>results|qti|tst)_(?P<id>\d+).xml')
task_id_regex = re.compile(r'il_\d+_qst_(?P<task_id>\d+)')
tasks_path = ('./assessment/section') from lib import Converter
users = './tst_active/row'
solutions = './tst_solutions/row[@question_fi="%s"]'
lecturer_xpath = ('./MetaData/Lifecycle/Contribute' # ============================== =- Logging -= ============================== #
'[@Role="Author"]/Entity/text()') log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
# create console handler and formatter
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(levelname)s] %(message)s')
def eat_qti(tree, only_of_type=('assSourceCode',), **kwargs): # add formatter to console handler
tasks = tree.xpath(tasks_path)[0] console.setFormatter(formatter)
log.addHandler(console)
titles = tasks.xpath('./item/@title')
types = tasks.xpath(
'./item/itemmetadata/qtimetadata/qtimetadatafield/'
'fieldlabel[text()="QUESTIONTYPE"]/../fieldentry/text()')
ids = [re.search(task_id_regex, ident).group('task_id')
for ident in tasks.xpath('./item/@ident')]
texts = ['\n'.join(flow.xpath('./material/mattext/text()'))
for flow in tasks.xpath('./item/presentation/flow')]
return {id: {'title': title, 'text': text, 'type': type} # ============================= =- argparse -= ============================== #
for id, type, title, text in zip(ids, types, titles, texts) def parseme():
if not only_of_type or type in only_of_type} def file_exists(parser, filepath: str) -> str:
if not os.path.isfile(filepath):
parser.error('Not a file %s' % filepath)
def eat_users(results_tree): return filepath
return {row.attrib['active_id']: dict(row.attrib)
for row in results_tree.xpath(users)}
def convert_code(text):
return base64.b64decode(text).decode('utf-8').split('\n')
def eat_solutions(results_tree, task_id):
return {row.attrib['active_fi']: convert_code(row.attrib['value1'])
for row in results_tree.xpath(solutions % task_id)}
def eat_results(tree, qti=(), **kwargs):
questions = qti
users = eat_users(tree)
for user in users.values():
user['submissions'] = {}
for question in questions:
solutions = eat_solutions(tree, question)
for user_id, solution in solutions.items():
users[user_id]['submissions'][question] = solution
return users
def eat_tst(tree):
title = tree.xpath('./MetaData/General/Title/text()')
lecturer = tree.xpath(lecturer_xpath)
return {'exam': title[0], 'author': lecturer[0]}
def eval_file(archive, match, cache):
funcname = 'eat_' + match.group('data')
with archive.open(match.string) as datafile:
tree = etree.parse(datafile)
return globals()[funcname](tree, **cache)
def eat_archive(archive):
files = {match.group('data'): match
for match in (re.search(file_regex, name)
for name in archive.NameToInfo)
if match}
order = ('tst', 'qti', 'results') parser = argparse.ArgumentParser()
cache = {} parser.add_argument(
'input',
metavar='DATA',
type=lambda f: file_exists(parser, f),
help='A QTI-ZIP or a .xla Ilias export that contains course data')
parser.add_argument(
'output',
metavar='OUTFILE',
help='Where you want to put the output')
parser.add_argument(
'-a', '--anonymous',
action='store_true',
help='Strip any personal information and create a reversing table')
parser.add_argument(
'-t', '--personal-secret-table',
help='Where to store personal information',
)
parser.add_argument(
'-m', '--meta',
action='store_true',
help='If you want to add meta information (lecturer, course title)'
)
args = parser.parse_args()
if args.anonymous != (args.personal_secret_table is not None):
parser.error('Need an output for anonymous mode')
return args
# ========================== =- General Purpose -= ========================== #
def compose(*functions: Sequence[Callable]) -> Callable:
""" Standard function composition. Takes a Sequence of functions [f, g, h, ...]
and returns the composite function i(x) = f(g(h(x))). There are no checks
that validate if domain and image of these functions are compatible."""
return functools.reduce(lambda f,
g: lambda x: f(g(x)),
functions,
lambda x: x)
# ========================== =- Post processors -= ========================== #
def anonymise(structured_data: Dict[str, Any]) -> Dict[str, Any]:
DELIMITER = '-'
wordfile = xp.locate_wordfile()
words = xp.generate_wordlist(wordfile=wordfile,
min_length=7,
max_length=7)
def get_identifier():
return xp.generate_xkcdpassword(words, numwords=2, delimiter=DELIMITER)
for key in order: students = structured_data.pop('students')
cache[key] = eval_file(archive, files[key], cache) reverser = {get_identifier(): s for s in students.values()}
students_anon = {r: {
'fullname': ' '.join(w[0].capitalize() + w[1:]
for w in r.split(DELIMITER)),
'identifier': r,
'submissions': student['submissions']
} for r, student in zip(reverser, students.values())}
return cache with open(args.personal_secret_table, 'w') as out:
print('key, previous identifier, fullname', file=out)
print('\n'.join(anon + '\t' + '\t'.join(v
for v in data.values()
if type(v) is str)
for anon, data in reverser.items()), file=out)
return {
**structured_data,
'students': students_anon
}
def add_meta(base, data):
base.update(data['tst'])
def add_meta_information(structured_data: Dict[str, Any]) -> Dict[str, Any]:
if args.meta:
structured_data['author'] = input('[Q] author: ')
structured_data['exam'] = input('[Q] course title: ')
return structured_data
def add_tasks(base, data):
base['tasks'] = data['qti']
def assert_correct_format(structured_data: Dict[str, Any]) -> Dict[str, Any]:
def assert_submission(submission):
assert 'code' in submission, 'A submission needs code'
assert 'type' in submission, 'A submission has to be of some type'
assert 'tests' in submission, 'A tests dict has to be present.'
ignore_user_fields = ("user_fi", def assert_student(student):
"anonymous_id", log.debug('asserting %s (%d)' % (student['fullname'],
"test_fi", len(student['submissions'])))
"lastindex", assert 'fullname' in student, 'Student needs a name %s' % student
"tries", assert 'identifier' in student, 'Student needs a unique identifier'
"submitted",
"submittimestamp",
"tstamp",
"user_criteria",)
def base_assert():
assert 'students' in structured_data, 'No students found'
assert 'tasks' in structured_data, 'No tasks found'
def add_users(base, data): try:
for userdata in data['results'].values(): base_assert()
for field in ignore_user_fields: students = structured_data['students'].values()
userdata.pop(field) number_of_submissions = len(structured_data['tasks'])
base['students'] = data['results'] for student in students:
try:
assert_student(student)
assert number_of_submissions == len(student['submissions']), \
'%s does not have enough submissoins' % student['fullname']
for submission in student['submissions']:
def give_me_structure(data): try:
base = {} assert_submission(submission)
except AssertionError as err:
log.warn(err)
add_meta(base, data) except AssertionError as err:
add_tasks(base, data) log.warn(err)
add_users(base, data)
return base except AssertionError as err:
log.warn(err)
return structured_data
def eat_zipfile(input_file, output):
with zipfile.ZipFile(input_file) as archive:
data = dict(eat_archive(archive))
structured_data = give_me_structure(data) post_processors = [
anonymise,
# add_meta_information,
# assert_correct_format
]
json.dump(structured_data, output, indent=2, sort_keys=True)
# ============================== =- Hektor -= =============================== #
def _preprocessing(filepath: str) -> str:
return filepath
def _processing(filepath: str) -> Dict[str, Any]:
try:
return next(converter().convert(filepath)
for converter in Converter.implementations()
if converter.accept(filepath))
except StopIteration as err:
log.error('No suitable converter found. Accepting only %s' %
', '.join(f
for c in Converter.implementations()
for f in c.accepted_files))
def parseme(): def _postprocessing(structured_data: Dict[str, Any]) -> Dict[str, Any]:
parser = argparse.ArgumentParser() return compose(*post_processors)(structured_data)
parser.add_argument(
'input',
metavar='FILE',
help='A ZIP file that contains a qit course')
parser.add_argument(
'-o',
'--output',
default=sys.stdout,
type=argparse.FileType('w'),
metavar='FILE',
help='Where you want to put the output')
return parser.parse_args()
def main(): def main():
args = parseme() processing = compose(_postprocessing, _processing, _preprocessing)
eat_zipfile(args.input, args.output) data = processing(args.input)
destination = args.output.split('.json')[0] + '.json'
with open(destination, 'w') as output:
json.dump(data, output, indent=2, sort_keys=True)
log.info('Wrote exam data to %s', destination)
if __name__ == '__main__': if __name__ == '__main__':
args = parseme()
main() main()
# New modules need to be registered here
from lib.generic import Converter
from lib.qti import QTIConverter
from lib.xls import XLSConverter
import abc
def all_subclasses(cls):
return cls.__subclasses__() \
+ [g for s in cls.__subclasses__() for g in all_subclasses(s)]
class Converter(metaclass=abc.ABCMeta):
""" A base class if we incorporate more converters in the future """
@abc.abstractmethod
def convert(self):
pass
@property
@abc.abstractclassmethod
def accepted_files(cls):
pass
@classmethod
def implementations(cls):
return all_subclasses(cls)
@classmethod
def accept(cls, filepath):
return any(filepath.endswith(ending) for ending in cls.accepted_files)
import base64
import re
import zipfile
from lxml import etree
import lib.generic
class QTIConverter(lib.generic.Converter):
"""docstring for XLSConverter"""
accepted_files = ('.zip', '.xml')
def convert(self, filepath):
with zipfile.ZipFile(filepath) as archive:
data = dict(process_archive(archive))
return give_me_structure(data)
file_regex = re.compile(
r'(\d+)__(\d+)__(?P<data>results|qti|tst)_(?P<id>\d+).xml')
task_id_regex = re.compile(r'il_\d+_qst_(?P<task_id>\d+)')
tasks_path = ('./assessment/section')
users = './tst_active/row'
solutions = './tst_solutions/row[@question_fi="%s"]'
lecturer_xpath = ('./MetaData/Lifecycle/Contribute'
'[@Role="Author"]/Entity/text()')
types_xpath = ('./item/itemmetadata/qtimetadata/qtimetadatafield/'
'fieldlabel[text()="QUESTIONTYPE"]/../fieldentry/text()')
def process_qti(tree, only_of_type=('assSourceCode',), **kwargs):
tasks = tree.xpath(tasks_path)[0]
titles = tasks.xpath('./item/@title')
types = tasks.xpath(types_xpath)
ids = [re.search(task_id_regex, ident).group('task_id')
for ident in tasks.xpath('./item/@ident')]
texts = ['\n'.join(flow.xpath('./material/mattext/text()'))
for flow in tasks.xpath('./item/presentation/flow')]
return {id: {'title': title, 'text': text, 'type': type}
for id, type, title, text in zip(ids, types, titles, texts)
if not only_of_type or type in only_of_type}
def process_users(results_tree):
return {row.attrib['active_id']: dict(row.attrib)
for row in results_tree.xpath(users)}
def convert_code(text):
return base64.b64decode(text).decode('utf-8').split('\n')
def process_solutions(results_tree, task_id):
return {row.attrib['active_fi']: convert_code(row.attrib['value1'])
for row in results_tree.xpath(solutions % task_id)}
def process_results(tree, qti=(), **kwargs):
questions = qti
users = process_users(tree)
for user in users.values():
user['submissions'] = []
for question in questions:
solutions = process_solutions(tree, question)
for user_id, solution in solutions.items():
users[user_id]['submissions'].append({'type': question,
'code': solution,
'tests': {}})
return users
def process_tst(tree):
title = tree.xpath('./MetaData/General/Title/text()')
lecturer = tree.xpath(lecturer_xpath)
return {'exam': title[0], 'author': lecturer[0]}
def eval_file(archive, match, cache):
funcname = 'process_' + match.group('data')
with archive.open(match.string) as datafile:
tree = etree.parse(datafile)
return globals()[funcname](tree, **cache)
def process_archive(archive):
files = {match.group('data'): match
for match in (re.search(file_regex, name)
for name in archive.NameToInfo)
if match}
order = ('tst', 'qti', 'results')
cache = {}
for key in order:
cache[key] = eval_file(archive, files[key], cache)
return cache
def add_meta(base, data):
base.update(data['tst'])
def add_tasks(base, data):
base['tasks'] = list(data['qti'].values())
ignore_user_fields = ("user_fi",
"active_id",
"usr_id",
"anonymous_id",
"test_fi",
"lastindex",
"tries",
"submitted",
"submittimestamp",
"tstamp",
"user_criteria",)
def add_users(base, data):
for userdata in data['results'].values():
userdata['identifier'] = userdata['user_fi']
for field in ignore_user_fields:
userdata.pop(field)
base['students'] = data['results']
def give_me_structure(data):
base = {}
add_meta(base, data)
add_tasks(base, data)
add_users(base, data)
return base
#!/usr/local/bin/python3
""" a simple script that converts ilias exam output to readable json
The json output will look like this:
{
"max.mustermann": { <<--- OR all uppercase letter of the name + username/matrikel_no # noqa: E501
"matrikel_no": "12345678",
"name": "Mustermann, Max",
"task_list": {
"[task_id_1]": "print Hello World!",
....,
"[task_id_n]": "#include <stdio.h> etc."
}
},
... ans so on
}
usage: convert.py [-h] [-u USERNAMES] [-n NUMBER_OF_TASKS] INFILE OUTFILE
positional arguments:
INFILE Ilias exam data
OUTFILE Where to write the final file
optional arguments:
-h, --help show this help message and exit
-u USERNAMES, --usernames USERNAMES
a json dict matno -> email
-n NUMBER_OF_TASKS, --NUMBER_OF_TASKS NUMBER_OF_TASKS
Where to write the final file
Author: Jan Maximilian Michal
Date: 30 March 2017
"""
import json
import os
import re
import urllib.parse
from collections import defaultdict, namedtuple
from xlrd import open_workbook
import lib.generic
class XLSConverter(lib.generic.Converter):
"""docstring for XLSConverter"""
accepted_files = ('.xls',)
def convert(self, filepath):
return converter(filepath)
# one user has one submission (code) per task
# yes, I know it is possible to name match groups via (?P<name>) but
# I like this solution better since it gets the job done nicely
user_t = namedtuple('user_head', 'name matrikel_no')
# one task has a title and id and hpfly code
task_head_re = re.compile(r'^Quellcode Frage (?P<title>.*?) ?(\d{8})?$')
# nor parsing the weird mat no
matno_re = re.compile(r'^(?P<matrikel_no>\d{8})-(\d+)-(\d+)$')
COLUMNS_BEFORE_TASKS = 19
def converter(infile, usernames=None, number_of_tasks=0,):
# Modify these iterators in order to change extraction behaviour
def sheet_iter_meta(sheet):
""" yield first and second col entry as tuple of (name, matnr) """
for row in (sheet.row(i) for i in range(1, sheet.nrows)):
match = re.search(matno_re, row[1].value)
if match:
yield row[0].value, match.group('matrikel_no')
def sheet_iter_data(sheet):
""" yields all source code titel and code tuples """
def row(i):
return sheet.row(i)
for top, low in ((row(i), row(i + 1)) for i in range(sheet.nrows - 1)):
if any(map(lambda c: c.ctype, top)) and 'Quell' in top[0].value:
yield (' '.join(c.value for c in top),
' '.join(c.value for c in low))
# meta sheet contains ilias names usernames etc - data contains code
meta, *data = open_workbook(infile, open(os.devnull, 'w')).sheets()
# nice!
name2mat = dict(sheet_iter_meta(meta))
assert len(name2mat) == len(data), f'{len(name2mat)} names != {len(data)} sheets' # noqa
# from xls to lists and namedtuples
# [ [user0, task0_h, code0, ..., taskn, coden ], ..., [...] ]
root = []
tasks = {}
for user, sheet in zip(sheet_iter_meta(meta), data):
root.append([user_t(*user)])
for task, code in sheet_iter_data(sheet):
task = re.search(task_head_re, task)
task_title = task.group('title')
tasks[task_title] = {
'title': task_title,
'type': 'SourceCode'
}
root[-1].append(task.group('title'))
root[-1].append(urllib.parse.unquote(code).strip())
if number_of_tasks:
for (user, *task_list) in sorted(root, key=lambda u: u[0].name):
assert len(task_list) == number_of_tasks * 2
mat_to_email = defaultdict(str)
if usernames:
with open(usernames) as data:
mat_to_email.update(json.JSONDecoder().decode(data.read()))
def get_username(user):
if name2mat[user.name] in mat_to_email:
return mat_to_email[name2mat[user.name]].split('@')[0]
return ''.join(filter(str.isupper, user.name)) + name2mat[user.name]
usernames = {user.name: get_username(user) for (user, *_) in root}
return {
'students': {
usernames[user.name]: {
'fullname': user.name,
'email': mat_to_email[name2mat[user.name]],
'identifier': name2mat[user.name],
'submissions': [
{
"type": task,
"code": code,
"tests": {},
} for task, code in zip(task_list[::2], task_list[1::2])
]
} for (user, *task_list) in sorted(root, key=lambda u: u[0].name)
},
'tasks': list(tasks.values())
}
def write_to_file(json_dict, outfile):
# just encode python style
with open(outfile, "w") as out:
json.dump(json_dict, out, indent=2)
print(f"Wrote data to {outfile}. Done.")
lxml~=4.1.1 lxml~=4.1.1
xlrd~=1.1.0
xkcdpass~=1.16.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment