import json import os import readline import util from util.messages import warn from core.models import ExamType, Feedback, Submission, SubmissionType, Test from core.models import UserAccount as User from util.factories import GradyUserFactory import semver WELCOME = r''' ______ __ ____ __ / ____/________ _____/ /_ __ / _/___ ___ ____ ____ _____/ /____ _____ / / __/ ___/ __ `/ __ / / / / / // __ `__ \/ __ \/ __ \/ ___/ __/ _ \/ ___/ / /_/ / / / /_/ / /_/ / /_/ / _/ // / / / / / /_/ / /_/ / / / /_/ __/ / \____/_/ \__,_/\__,_/\__, / /___/_/ /_/ /_/ .___/\____/_/ \__/\___/_/ /____/ /_/ ''' HISTFILE = '.importer_history' PASSWORDS = '.importer_passwords' YES = 'Y/n' NO = 'y/N' RUSTY_HEKTOR_MIN_VER = ">=1.0.0" RUSTY_HEKTOR_MAX_VER = "<2.0.0" valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} ORIGIN_ORDER = { Feedback.WAS_EMPTY, Feedback.DID_NOT_COMPILE, Feedback.COULD_NOT_LINK, Feedback.FAILED_UNIT_TESTS, } TEST_ORDER = ( util.processing.EmptyTest.__name__, util.processing.CompileTest.__name__, util.processing.LinkTest.__name__, util.processing.UnitTestTest.__name__, ) FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER)) user_factory = GradyUserFactory() def start(): if os.path.exists(HISTFILE): readline.read_history_file(HISTFILE) print(WELCOME + ''' Welcome to the Grady import script! This script aims at making the setup of the database as easy as possible. At the same time it serves as a documentation on how data is imported into Grady. Let\'s dive right in.\n''') try: print('The following sub importers are available:\n') for fid, func in enumerate(call_order): print(f'\t[{fid}] {func.__name__}') print('\t[q] exit') print() fid = i('Choose a number or hit enter to start at the beginning') if not fid: for func in call_order: func() elif fid in ('q', 'quit', 'exit'): return elif not 0 <= int(fid) < len(call_order): warn('There is no loader with this number') else: call_order[int(fid)]() except (EOFError, KeyboardInterrupt): print() return except FileNotFoundError: raise except Exception: import traceback traceback.print_exc() finally: readline.write_history_file(HISTFILE) def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False): if default is YES or default is NO: answer = valid[input(f'[Q] {prompt} ({default}): ').lower() or ( 'y' if YES == default else 'n')] elif default: answer = input(f'[Q] {prompt} ({default}): ') or default else: answer = input(f'[Q] {prompt}: ') if (is_path or is_file) and \ not os.path.exists(answer) or is_file and \ not os.path.isfile(answer): path_or_type = "path" if is_path else "file" warn(f'The {path_or_type} does not exist. Please try again.') return i(prompt, default, is_path, is_file) return answer def load_hektor_json(): file = i('Get me the file with the output from rusty-hektor', 'submissions.json', is_file=True) with open(file, 'r') as f: exam_data = json.JSONDecoder().decode(f.read()) hektor_version = exam_data['meta']['version'] if not (semver.match(hektor_version, RUSTY_HEKTOR_MIN_VER) and semver.match(hektor_version, RUSTY_HEKTOR_MAX_VER)): warn(f'The data you\'re trying to import has the wrong version {hektor_version}\n' f'Requirements: {RUSTY_HEKTOR_MIN_VER}, {RUSTY_HEKTOR_MAX_VER}') exam, _ = ExamType.objects.get_or_create(**exam_data['module']) for submission_type in exam['submission_types']: SubmissionType.objects.get_or_create(**submission_type) for student in exam_data['students']: student_obj = user_factory.make_student(exam=exam, **student).student for submission_obj in student['submissions']: add_submission(student_obj, **submission_obj) def load_reviewers(): print('Please import reviewer users by providing one name per line') reviewers = i('List of reviewers', 'reviewers', is_file=True) with open(reviewers) as reviewers_f: for reviewer in reviewers_f: user_factory.make_reviewer(reviewer.strip(), is_staff=True, store_pw=True) def add_submission(student_obj, code, tests, type=None): submission_type_obj = SubmissionType.objects.get(name=type) submission_obj, _ = Submission.objects.update_or_create( type=submission_type_obj, student=student_obj, defaults={'text': code} ) if tests: add_tests(submission_obj, tests) def add_tests(submission_obj, tests): auto_correct, _ = User.objects.get_or_create( username='auto_correct', defaults={'is_active': False} ) for name in (name for name in TEST_ORDER if name in tests): test_data = tests[name] test_obj, created = Test.objects.update_or_create( name=test_data['name'], submission=submission_obj, defaults={ 'label': test_data['label'], 'annotation': test_data['annotation'], } ) add_feedback_if_test_recommends_it(test_obj) def add_feedback_if_test_recommends_it(test_obj): available_tests = util.processing.Test.available_tests() if test_obj.label == available_tests[test_obj.name].label_failure \ and not hasattr(test_obj.submission, 'feedback') \ and (test_obj.name == util.processing.EmptyTest.__name__ or test_obj.name == util.processing.CompileTest.__name__): return Feedback.objects.update_or_create( of_submission=test_obj.submission, defaults={ 'score': 0, 'origin': FEEDBACK_MAPPER[test_obj.name], 'is_final': True, } ) call_order = [ load_hektor_json, load_reviewers ]