Skip to content
Snippets Groups Projects
importer.py 15 KiB
Newer Older
  • Learn to ignore specific revisions
  • import os
    import readline
    import secrets
    
    from typing import Callable
    
    
    from django.contrib.auth.models import Group, User
    
    
    import util.convert
    import util.processing
    
    from core.models import (ExamType, Feedback, Student, Submission,
                             SubmissionType, Test)
    
    from util.processing import EmptyTest
    
    STUDENTS  = 'Students'
    TUTORS    = 'Tutors'
    REVIEWERS = 'Reviewers'
    
    HISTFILE  = '.importer_history'
    RECORDS   = '.importer'
    
    
    YES = 'Y/n'
    NO  = 'y/N'
    
    valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
    
    
    ORIGIN_ORDER = {
        Feedback.WAS_EMPTY,
        Feedback.DID_NOT_COMPILE,
        Feedback.COULD_NOT_LINK,
        Feedback.FAILED_UNIT_TESTS,
    
    }
    
    TEST_ORDER = (
        util.processing.EmptyTest.__name__,
        util.processing.CompileTest.__name__,
        util.processing.LinkTest.__name__,
    
        util.processing.UnitTestTest.__name__,
    
    FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER))
    
    
    
    class chdir_context(object):
        """
        Step into a directory temporarily.
        """
    
        def __init__(self, path):
            self.old_dir = os.getcwd()
            self.new_dir = path
    
        def __enter__(self):
            info(f'Changing to {self.new_dir}')
            os.chdir(self.new_dir)
    
        def __exit__(self, *args):
            info(f'Returning to {self.new_dir}')
            os.chdir(self.old_dir)
    
    
    
    def get_xkcd_password(k=2):
        with open('/usr/share/dict/words') as words:
            choose_from = list({word.strip().lower()
                                for word in words if 5 < len(word) < 8})
    
        return ''.join(secrets.choice(choose_from) for _ in range(k))
    
    
    
    def i(prompt: str, default: str='', is_path: bool=False, is_file: bool=False):
        if default is YES or default is NO:
            answer = valid[input(f'[Q] {prompt} ({default}): ').lower() or ('y' if YES == default else 'n')]
        elif default:
            answer = input(f'[Q] {prompt} ({default}): ') or default
        else:
            answer = input(f'[Q] {prompt}: ')
    
        if (is_path or is_file) and not os.path.exists(answer) or is_file and not os.path.isfile(answer):
            warn(f'The {"path" if is_path else "file"} does not exist. Please try again.')
            return i(prompt, default, is_path, is_file)
    
    def store_password(username, groupname, password):
    
        storage = configparser.ConfigParser()
        storage.read(PASSWORDS)
    
    
        if not groupname in storage:
            storage[groupname] = {}
    
        storage[groupname][username] = password
    
    
        with open(PASSWORDS, 'w') as passwd_file:
            storage.write(passwd_file)
    
    
        def __init__(self, password_generator_func=get_xkcd_password, *args, **kwargs):
            self.password_generator_func = password_generator_func
    
        @staticmethod
        def get_random_name(prefix='', suffix='', k=1):
            return ''.join((prefix, get_xkcd_password(k), suffix))
    
        def make_default_user(self, username, **kwargs):
            return User.objects.update_or_create(username=username, defaults=kwargs)
    
        def make_user_in_group(self, username, groupname, store_pw=False, **kwargs):
            """ This is a specific wrapper for the django update_or_create method of
            objects.
                * A new user is created and password and group are set accordingly
                * If the user was there before password is NOT change but group is. A
                  user must only have one group.
    
            Args:
                username (str): the username is the login name
                group (Group object): the (only) group the user should belong to
                **kwargs: more attributes for user creation
    
            Returns:
                (User object, str): The user object that was added to the group and
                the password of that user if it was created.
            """
            username = username.strip()
    
            user, created = self.make_default_user(
                username=username,
                **kwargs
            )
    
            if created:
                password = self.password_generator_func()
                user.set_password(password)
                user.save()
    
            if created and store_pw:
    
                store_password(username, groupname, password)
    
            group = Group.objects.get(name=groupname)
            user.groups.clear() # remove all other groups
            user.groups.add(group)
    
    
        def make_user_in_student_group(self, username, **kwargs):
            return self.make_user_in_group(username, STUDENTS, **kwargs)
    
        def make_student(self, username=None, name='__name', matrikel_no=None, exam=None, **kwargs):
            if not username:
                username = self.get_random_name(prefix='student_')
    
            user = self.make_user_in_student_group(username, **kwargs)
    
            student, _ = Student.objects.update_or_create(
                name=name,
                defaults={
                    'user': user,
                    'exam': exam,
                    # TODO: find an elegant way to include optionals iff they exist
                }
            )
    
            return student
    
        def make_tutor(self, username=None, **kwargs):
            if not username:
                username = self.get_random_name(prefix='tutor_')
            return self.make_user_in_group(username, TUTORS, **kwargs)
    
        def make_reviewer(self, username=None, **kwargs):
            if not username:
                username = self.get_random_name(prefix='reviewer_')
            return self.make_user_in_group(username, REVIEWERS, **kwargs)
    
    
    def add_user(username, group, **kwargs):
    
        user = GradyUserFactory().make_user_in_group(
    
    def add_student(username, email, submissions, **kwargs):
    
    
        user        = add_user(username, STUDENTS, email=email)
        student, _  = Student.objects.update_or_create(
    
            user=user,
            defaults={'user' : user, **kwargs}
    
    def add_submission(student_obj, code, tests, type):
    
        submission_type = SubmissionType.objects.get(name=type)
    
        submission_obj, _ = Submission.objects.update_or_create(
            type=submission_type,
            student=student_obj,
            defaults={'text' : code}
        )
    
    
        auto_correct, _ = User.objects.get_or_create(
            username='auto_correct',
            defaults={'is_active': False}
        )
    
        available_tests = util.processing.Test.available_tests()
    
        for name, test_data in ((name, tests[name]) for name in TEST_ORDER):
    
            test_obj, created = Test.objects.update_or_create(
                name=test_data['name'],
                submission=submission_obj,
                defaults={
                    'label': test_data['label'],
                    'annotation': test_data['annotation'],
                }
            )
    
    
            if test_obj.label == available_tests[test_obj.name].label_failure\
    
                    and not hasattr(test_obj.submission, 'feedback')\
                    and not test_obj.name == util.processing.UnitTestTest.__name__:
    
                Feedback.objects.update_or_create(
                    of_submission=submission_obj,
                    defaults={
                        'of_tutor'  : auto_correct,
                        'score'     : 0,
                        'text'      : test_obj.label,
    
                        'origin'    : FEEDBACK_MAPPER[test_obj.name],
                        'status'    : Feedback.ACCEPTED if test_obj.name == EmptyTest.__name__ else Feedback.EDITABLE,
    
    def add_user_list(lst, group, **kwargs):
    
            add_user(name, group, **kwargs)
    
    
    def call_loader(func: Callable) -> None:
        """ This function handles if a function will be executed at all. Currently
        it just checks in the RECORDS file for the name of the function. If it is
        present the function will not be executed
    
        Args:
            func (Callable): the loader specified below
        """
        if os.path.exists(RECORDS):
            with open(RECORDS, 'r') as records_f:
                done = [line.strip() for line in records_f]
    
            if func.__name__ in done and not \
                    i(f'{func.__name__} has already been processed once. Proceed anyway?', NO):
                return
    
        func() # This executes the specified loader
    
        with open(RECORDS, 'a') as records_f:
            records_f.write(func.__name__)
            records_f.write('\n')
    
    
    
    def do_convert_xls():
    
        ans = i('''Do you want to convert the ILIAS .xls output to .json?''', YES)
        if not ans:
            return
    
        infile  = i('Please provide the path to the .xls file', is_file=True)
        outfile = i('Where should the output go?', 'submissons.json')
    
        json_dict = util.convert.converter(infile)
        util.convert.write_to_file(json_dict, outfile)
    
    def do_load_submission_types():
    
        print(
        '''For the following import you need three files:
    
        1) A .csv file where the columns are: id, name, score
        2) A path to a directory where I can find sample solutions named
            <id>-lsg.c
        3) A path to a directory where I can find HTML files with an accurate
            description of the task. File name pattern has to be: <id>.html
    
        Example:
            $ cat submission_types.csv
            a01, Alpha Team, 10
            a02, Beta Distribution, 10
            a03, Gamma Ray, 20
    
            $ tree -L 2
            .
            ├── code-lsg
            │   ├── a01-lsg.c
            │   ├── a02-lsg.c
            │   └── a03-lsg.c
            └── html
                ├── a01.html
                ├── a02.html
                └── a03.html
    
        path = i('Where are your files located?', '.', is_path=True)
    
        with chdir_context(path):
            submission_types_csv    = i('CSV file',         'submission_types.csv')
    
            lsg_dir                 = i('solution dir',     'code-lsg')
    
            desc_dir                = i('descriptions dir', 'html')
    
            with open(submission_types_csv, encoding='utf-8') as tfile:
                csv_rows = [row for row in csv.reader(tfile)]
    
            for row in csv_rows:
                tid, name, score = (col.strip() for col in row)
                with \
                        open(os.path.join(lsg_dir, tid + '-lsg.c'), encoding='utf-8') as lsg,\
                        open(os.path.join(desc_dir, tid + '.html'), encoding='utf-8') as desc:
                    data={
                        'name'          : name,
                        'description'   : desc.read(),
                        'solution'      : lsg.read(),
                        'full_score'    : int(score),
                    }
                _, created = SubmissionType.objects.update_or_create(
                    name=name,
                    defaults=data
                )
                info(f'{"Created" if created else "Updated"} {name}')
    
    
    
    def do_load_module_descriptions():
    
        print('''
    
        This loader imports descriptions of modules in an exam. This step is purely
        optional -- Grady works just fine without these information. If you want to
        distinguish students within one instance or give information about the
        grading type you should provide this info.
    
    
        CSV file format: module_reference, total_score, pass_score, pass_only
    
        Example:
            B.Inf.1801,  90, 45, yes
            B.Mat.31415, 50, 10, no
        ''')
    
    
        module_description_csv = i(
            'Where is the file?', 'modules.csv', is_file=True)
    
    
        with open(module_description_csv, encoding='utf-8') as tfile:
            csv_rows = [row for row in csv.reader(tfile)]
    
        for row in csv_rows:
            data = {
                field : kind(data) for field, kind, data in zip(
                    ('module_reference', 'total_score', 'pass_score', 'pass_only'),
                    (str, int, int, lambda x: x == 'yes'),
                    (col.strip() for col in row)
                )
            }
    
            _, created = ExamType.objects.update_or_create(
                module_reference=data['module_reference'],
                defaults=data,
            )
    
            info(f'{"Created" if created else "Updated"} ExamType {data["module_reference"]}')
    
    
    def do_preprocess_submissions():
    
        print('''
        Preprocessing might take some time depending on the amount of data
        and the complexity of the programs and the corresponding unit tests. You can
        specify what test you want to run.
    
        Tests do depend on each other. Therefore specifying a test will also
        result in running all its dependencies\n''')
    
        test_enum = dict(enumerate(util.processing.Test.available_tests()))
    
        print('The following test are available:\n')
        print('\t[q] Do nothing')
        for j, test in test_enum.items():
            print(f'\t[{j}] {test}')
        print()
    
        answer = i('Which tests do you want to run?')
    
        if not answer or answer == 'q':
            return
    
        raise NotImplementedError
    
    def do_load_submissions():
    
        file = i('Get me the file with all the submissions', 'submissions.json')
    
        exam = {}
        if ExamType.objects.all() and i('Do you want to add module/exam information?', NO):
    
            exam_query_set = ExamType.objects.all()
            print('You have the following choices:\n')
            for j, exam_type in enumerate(exam_query_set):
                print(f'\t[{j}] {exam_type.module_reference}')
            print()
    
            exam = i('Choose wisely')
    
            exam = {'exam' : exam_query_set[int(exam)]}
    
        with open(file) as submission_file:
            submissions = json.JSONDecoder().decode(submission_file.read())
    
        for username, data in submissions.items():
    
            student_obj = add_student(username, **exam, **data)
    
    
            for submission_obj in data['submissions']:
                add_submission(student_obj, **submission_obj)
    
    
    def do_load_tutors():
    
        print('Please import tutor users by providing one name per line')
        tutors    = i('List of tutors', 'tutors', is_file=True)
    
    
        with open(tutors) as tutors_f:
            add_user_list(tutors_f, TUTORS)
    
    
        print('Please import reviewer users by providing one name per line')
        reviewers = i('List of reviewers', 'reviewers', is_file=True)
    
    
        with open(reviewers) as reviewers_f:
    
            add_user_list(reviewers_f, REVIEWERS, is_staff=True)
    
    
    
    call_order = (
        do_convert_xls,
        do_load_submission_types,
        do_load_module_descriptions,
        do_preprocess_submissions,
        do_load_submissions,
        do_load_tutors,
        do_load_reviewer
    )
    
    
        if os.path.exists(HISTFILE):
            readline.read_history_file(HISTFILE)
    
        print('''Welcome to the Grady importer!
    
        This script aims at making the setup of the database as easy as possible. It
        at the same time serves as a documentation on how data is imported in Grady.
        Let\'s dive right in.\n''')
    
        try:
            print('The following importers are available:\n')
    
            for fid, func in enumerate(call_order):
    
                print(f'\t[{fid}] {func.__name__}')
    
    
            fid = i('Choose a number or hit enter to start at the beginning')
    
            if not fid:
                for func in call_order:
    
            elif not 0 <= int(fid) < len(call_order):
    
                warn('There is no loader with this number')
    
        except (EOFError, KeyboardInterrupt) as err:
    
            return
        except Exception as err:
            import traceback
            traceback.print_exc()
        finally:
            readline.write_history_file(HISTFILE)