import collections import csv import os import readline import secrets import sys import json from typing import Callable from django.contrib.auth.models import Group, User import util.convert import util.processing from core.models import Feedback, Student, Submission, SubmissionType, Test from util.messages import * from util.processing import EmptyTest STUDENTS = Group.objects.get(name='Students') TUTORS = Group.objects.get(name='Tutors') REVIEWERS = Group.objects.get(name='Reviewers') HISTFILE = '.importer_history' RECORDS = '.importer' YES = 'Y/n' NO = 'y/N' valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} class chdir_context(object): """ Step into a directory temporarily. """ def __init__(self, path): self.old_dir = os.getcwd() self.new_dir = path def __enter__(self): info(f'Changing to {self.new_dir}') os.chdir(self.new_dir) def __exit__(self, *args): info(f'Returning to {self.new_dir}') os.chdir(self.old_dir) def get_xkcd_password(k=2): with open('/usr/share/dict/words') as words: choose_from = list({word.strip().lower() for word in words if 5 < len(word) < 8}) return ''.join(secrets.choice(choose_from) for _ in range(k)) def i(prompt: str, default: str='', is_path: bool=False, is_file: bool=False): if default is YES or default is NO: answer = valid[input(f'[Q] {prompt} ({default}): ').lower() or ('y' if YES == default else 'n')] elif default: answer = input(f'[Q] {prompt} ({default}): ') or default else: answer = input(f'[Q] {prompt}: ') if (is_path or is_file) and not os.path.exists(answer) or is_file and not os.path.isfile(answer): warn(f'The {"path" if is_path else "file"} does not exist. Please try again.') return i(prompt, default, is_path, is_file) return answer def add_user(username: str, group: str, **kwargs): """ This is a specific wrapper for the django update_or_create method of objects. * A new user is created and password and group are set accordingly * If the user was there before password is NOT change but group is. A user must only have one group. Args: username (str): the username is the login name group (str): the (only) group the user should belong to **kwargs: more attributes for user creation Returns: TYPE: Description """ user, created = User.objects.update_or_create( username=username.strip(), defaults=kwargs ) if created: password = get_xkcd_password() user.set_password(password) user.save() user.groups.clear() # remove all other groups group.user_set.add(user) return user def add_student(username, name, matrikel_no, email, **kwargs): user = add_user(username, STUDENTS, email=email) student, _ = Student.objects.update_or_create( name=name, defaults={'matrikel_no' : matrikel_no, 'user' : user} ) return student def add_submission(student_obj, code, tests, type): submission_type = SubmissionType.objects.get(name=type) submission_obj, _ = Submission.objects.update_or_create( type=submission_type, student=student_obj, defaults={'text' : code} ) for name, test_data in tests.items(): test_obj, created = Test.objects.update_or_create( name=test_data['name'], submission=submission_obj, defaults={ 'label': test_data['label'], 'annotation': test_data['annotation'], } ) if test_obj.name == EmptyTest.__name__ and test_obj.label == EmptyTest.label_failure: auto_correct, _ = User.objects.update_or_create(username='auto_correct', defaults={'is_active': False}) Feedback.objects.update_or_create( of_submission=submission_obj, defaults={ 'of_tutor' : auto_correct, 'score' : 0, 'text' : test_obj.label, 'origin' : Feedback.WAS_EMPTY, 'status' : Feedback.ACCEPTED, } ) def add_user_list(lst, group, **kwargs): for name in lst: add_user(name, group, **kwargs) def call_loader(func: Callable) -> None: """ This function handles if a function will be executed at all. Currently it just checks in the RECORDS file for the name of the function. If it is present the function will not be executed Args: func (Callable): the loader specified below """ if os.path.exists(RECORDS): with open(RECORDS, 'r') as records_f: done = [line.strip() for line in records_f] if func.__name__ in done and not \ i(f'{func.__name__} has already been processed once. Proceed anyway?', NO): return func() # This executes the specified loader with open(RECORDS, 'a') as records_f: records_f.write(func.__name__) records_f.write('\n') def do_convert_xls(): info('[Executing]', sys._getframe().f_code.co_name) ans = i('''Do you want to convert the ILIAS .xls output to .json?''', YES) if not ans: return infile = i('Please provide the path to the .xls file', is_file=True) outfile = i('Where should the output go?', 'submissons.json') json_dict = util.convert.converter(infile) util.convert.write_to_file(json_dict, outfile) def do_load_submission_types(): info('[Executing] ', sys._getframe().f_code.co_name) print('''For the following import you need three files: 1) A .csv file where the columns are: id, name, score 2) A path to a directory where I can find sample solutions named <id>-lsg.c 3) A path to a directory where I can find HTML files with an accurate description of the task. File name pattern has to be: <id>.html ''') path = i('Where are your files located?', '.', is_path=True) with chdir_context(path): submission_types_csv = i('CSV file', 'submission_types.csv') lsg_dir = i('solution dir', 'code/code-lsg') desc_dir = i('descriptions dir', 'html') with open(submission_types_csv, encoding='utf-8') as tfile: csv_rows = [row for row in csv.reader(tfile)] for row in csv_rows: tid, name, score = (col.strip() for col in row) with \ open(os.path.join(lsg_dir, tid + '-lsg.c'), encoding='utf-8') as lsg,\ open(os.path.join(desc_dir, tid + '.html'), encoding='utf-8') as desc: data={ 'name' : name, 'description' : desc.read(), 'solution' : lsg.read(), 'full_score' : int(score), } _, created = SubmissionType.objects.update_or_create( name=name, defaults=data ) info(f'{"Created" if created else "Updated"} {name}') def do_preprocess_submissions(): info('[Executing] ', sys._getframe().f_code.co_name) print(''' Preprocessing might take some time depending on the amount of data and the complexity of the programs and the corresponding unit tests. You can specify what test you want to run. Tests do depend on each other. Therefore specifying a test will also result in running all its dependencies\n''') test_enum = dict(enumerate(util.processing.Test.available_tests())) print('The following test are available:\n') print('\t[q] Do nothing') for j, test in test_enum.items(): print(f'\t[{j}] {test}') print() answer = i('Which tests do you want to run?') if not answer or answer == 'q': return raise NotImplementedError def do_load_submissions(): info('[Executing] ', sys._getframe().f_code.co_name) file = i('Get me the file with all the submissions', 'submissions.json') with open(file) as submission_file: submissions = json.JSONDecoder().decode(submission_file.read()) for username, data in submissions.items(): student_obj = add_student(username, **data) for submission_obj in data['submissions']: add_submission(student_obj, **submission_obj) def do_load_tutors(): info('[Executing] ', sys._getframe().f_code.co_name) print('Please import tutor users by providing one name per line') tutors = i('List of tutors', 'tutors', is_file=True) with open(tutors) as tutors_f: add_user_list(tutors_f, TUTORS) def do_load_reviewer(): info('[Executing] ', sys._getframe().f_code.co_name) print('Please import reviewer users by providing one name per line') reviewers = i('List of reviewers', 'reviewers', is_file=True) with open(reviewers) as reviewers_f: add_user_list(reviewers_f, REVIEWERS, is_staff=True) call_order = collections.OrderedDict({ 0 : do_convert_xls, 1 : do_load_submission_types, 2 : do_preprocess_submissions, 3 : do_load_submissions, 4 : do_load_tutors, 5 : do_load_reviewer }) def start(): if User.objects.filter(is_superuser=False) : warn('Warning database is not clean. Aborting.') if os.path.exists(HISTFILE): readline.read_history_file(HISTFILE) print('''Welcome to the Grady importer! This script aims at making the setup of the database as easy as possible. It at the same time serves as a documentation on how data is imported in Grady. Let\'s dive right in.\n''') try: print('The following importers are available:\n') for fid, func in call_order.items(): print(f'\t[{fid}] {func.__name__}') print() fid = i('Press enter for all in given order or choose a number') if fid: call_loader(call_order[int(fid)]) else: for func in call_order.values(): call_loader(func) except (EOFError, KeyboardInterrupt) as err: return except Exception as err: import traceback traceback.print_exc() finally: readline.write_history_file(HISTFILE)