Skip to content
Snippets Groups Projects
importer.py 10.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • import csv
    import os
    import readline
    import secrets
    
    import sys
    import json
    from typing import Callable
    
    
    from django.contrib.auth.models import Group, User
    
    
    import util.convert
    import util.processing
    from core.models import Feedback, Student, Submission, SubmissionType, Test
    from util.messages import *
    from util.processing import EmptyTest
    
    
    STUDENTS  = Group.objects.get(name='Students')
    TUTORS    = Group.objects.get(name='Tutors')
    REVIEWERS = Group.objects.get(name='Reviewers')
    
    
    HISTFILE  = '.importer_history'
    RECORDS   = '.importer'
    
    YES = 'Y/n'
    NO  = 'y/N'
    
    valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
    
    
    class chdir_context(object):
        """
        Step into a directory temporarily.
        """
    
        def __init__(self, path):
            self.old_dir = os.getcwd()
            self.new_dir = path
    
        def __enter__(self):
            info(f'Changing to {self.new_dir}')
            os.chdir(self.new_dir)
    
        def __exit__(self, *args):
            info(f'Returning to {self.new_dir}')
            os.chdir(self.old_dir)
    
    
    
    def get_xkcd_password(k=2):
        with open('/usr/share/dict/words') as words:
            choose_from = list({word.strip().lower()
                                for word in words if 5 < len(word) < 8})
    
        return ''.join(secrets.choice(choose_from) for _ in range(k))
    
    
    
    def i(prompt: str, default: str='', is_path: bool=False, is_file: bool=False):
        if default is YES or default is NO:
            answer = valid[input(f'[Q] {prompt} ({default}): ').lower() or ('y' if YES == default else 'n')]
        elif default:
            answer = input(f'[Q] {prompt} ({default}): ') or default
        else:
            answer = input(f'[Q] {prompt}: ')
    
        if (is_path or is_file) and not os.path.exists(answer) or is_file and not os.path.isfile(answer):
            warn(f'The {"path" if is_path else "file"} does not exist. Please try again.')
            return i(prompt, default, is_path, is_file)
    
    def add_user(username: str, group: str, **kwargs):
        """ This is a specific wrapper for the django update_or_create method of
        objects.
            * A new user is created and password and group are set accordingly
            * If the user was there before password is NOT change but group is. A
              user must only have one group.
    
        Args:
            username (str): the username is the login name
            group (str): the (only) group the user should belong to
            **kwargs: more attributes for user creation
    
        Returns:
            TYPE: Description
        """
        user, created = User.objects.update_or_create(
            username=username.strip(),
            defaults=kwargs
        )
    
        if created:
            password = get_xkcd_password()
            user.set_password(password)
            user.save()
    
        user.groups.clear() # remove all other groups
    
        group.user_set.add(user)
    
        return user
    
    
    def add_student(username, name, matrikel_no, email, **kwargs):
    
        user        = add_user(username, STUDENTS, email=email)
        student, _  = Student.objects.update_or_create(
            name=name,
            defaults={'matrikel_no' : matrikel_no, 'user' : user}
        )
    
        return student
    
    def add_submission(student_obj, code, tests, type):
    
        submission_type = SubmissionType.objects.get(name=type)
    
        submission_obj, _ = Submission.objects.update_or_create(
            type=submission_type,
            student=student_obj,
            defaults={'text' : code}
        )
    
        for name, test_data in tests.items():
            test_obj, created = Test.objects.update_or_create(
                name=test_data['name'],
                submission=submission_obj,
                defaults={
                    'label': test_data['label'],
                    'annotation': test_data['annotation'],
                }
            )
    
            if test_obj.name == EmptyTest.__name__ and test_obj.label == EmptyTest.label_failure:
                auto_correct, _ = User.objects.update_or_create(username='auto_correct', defaults={'is_active': False})
                Feedback.objects.update_or_create(
                    of_submission=submission_obj,
                    defaults={
                        'of_tutor'  : auto_correct,
                        'score'     : 0,
                        'text'      : test_obj.label,
                        'origin'    : Feedback.WAS_EMPTY,
                        'status'    : Feedback.ACCEPTED,
                    }
                )
    
    def add_user_list(lst, group, **kwargs):
    
            add_user(name, group, **kwargs)
    
    
    def call_loader(func: Callable) -> None:
        """ This function handles if a function will be executed at all. Currently
        it just checks in the RECORDS file for the name of the function. If it is
        present the function will not be executed
    
        Args:
            func (Callable): the loader specified below
        """
        if os.path.exists(RECORDS):
            with open(RECORDS, 'r') as records_f:
                done = [line.strip() for line in records_f]
    
            if func.__name__ in done and not \
                    i(f'{func.__name__} has already been processed once. Proceed anyway?', NO):
                return
    
        func() # This executes the specified loader
    
        with open(RECORDS, 'a') as records_f:
            records_f.write(func.__name__)
            records_f.write('\n')
    
    
    def do_convert_xls():
        info('[Executing]', sys._getframe().f_code.co_name)
    
        ans = i('''Do you want to convert the ILIAS .xls output to .json?''', YES)
        if not ans:
            return
    
        infile  = i('Please provide the path to the .xls file', is_file=True)
        outfile = i('Where should the output go?', 'submissons.json')
    
        json_dict = util.convert.converter(infile)
        util.convert.write_to_file(json_dict, outfile)
    
    def do_load_submission_types():
        info('[Executing] ', sys._getframe().f_code.co_name)
    
        print('''For the following import you need three files:
    
            1) A .csv file where the columns are: id, name, score
            2) A path to a directory where I can find sample solutions named
                <id>-lsg.c
            3) A path to a directory where I can find HTML files with an accurate
                description of the task. File name pattern has to be: <id>.html
        ''')
    
        path = i('Where are your files located?', '.', is_path=True)
    
        with chdir_context(path):
            submission_types_csv    = i('CSV file',         'submission_types.csv')
            lsg_dir                 = i('solution dir',     'code/code-lsg')
            desc_dir                = i('descriptions dir', 'html')
    
            with open(submission_types_csv, encoding='utf-8') as tfile:
                csv_rows = [row for row in csv.reader(tfile)]
    
            for row in csv_rows:
                tid, name, score = (col.strip() for col in row)
                with \
                        open(os.path.join(lsg_dir, tid + '-lsg.c'), encoding='utf-8') as lsg,\
                        open(os.path.join(desc_dir, tid + '.html'), encoding='utf-8') as desc:
                    data={
                        'name'          : name,
                        'description'   : desc.read(),
                        'solution'      : lsg.read(),
                        'full_score'    : int(score),
                    }
                _, created = SubmissionType.objects.update_or_create(
                    name=name,
                    defaults=data
                )
                info(f'{"Created" if created else "Updated"} {name}')
    
    
    def do_preprocess_submissions():
        info('[Executing] ', sys._getframe().f_code.co_name)
    
        print('''
        Preprocessing might take some time depending on the amount of data
        and the complexity of the programs and the corresponding unit tests. You can
        specify what test you want to run.
    
        Tests do depend on each other. Therefore specifying a test will also
        result in running all its dependencies\n''')
    
        test_enum = dict(enumerate(util.processing.Test.available_tests()))
    
        print('The following test are available:\n')
        print('\t[q] Do nothing')
        for j, test in test_enum.items():
            print(f'\t[{j}] {test}')
        print()
    
        answer = i('Which tests do you want to run?')
    
        if not answer or answer == 'q':
            return
    
        raise NotImplementedError
    
    def do_load_submissions():
        info('[Executing] ', sys._getframe().f_code.co_name)
    
        file = i('Get me the file with all the submissions', 'submissions.json')
        with open(file) as submission_file:
            submissions = json.JSONDecoder().decode(submission_file.read())
    
        for username, data in submissions.items():
            student_obj = add_student(username, **data)
    
            for submission_obj in data['submissions']:
                add_submission(student_obj, **submission_obj)
    
    
    def do_load_tutors():
        info('[Executing] ', sys._getframe().f_code.co_name)
        print('Please import tutor users by providing one name per line')
        tutors    = i('List of tutors', 'tutors', is_file=True)
    
    
        with open(tutors) as tutors_f:
            add_user_list(tutors_f, TUTORS)
    
    
    
    def do_load_reviewer():
        info('[Executing] ', sys._getframe().f_code.co_name)
        print('Please import reviewer users by providing one name per line')
        reviewers = i('List of reviewers', 'reviewers', is_file=True)
    
    
        with open(reviewers) as reviewers_f:
    
            add_user_list(reviewers_f, REVIEWERS, is_staff=True)
    
    
    call_order = collections.OrderedDict({
        0 : do_convert_xls,
        1 : do_load_submission_types,
        2 : do_preprocess_submissions,
        3 : do_load_submissions,
        4 : do_load_tutors,
        5 : do_load_reviewer
    })
    
    
    
    def start():
        if User.objects.filter(is_superuser=False) :
    
            warn('Warning database is not clean. Aborting.')
    
        if os.path.exists(HISTFILE):
            readline.read_history_file(HISTFILE)
    
        print('''Welcome to the Grady importer!
    
        This script aims at making the setup of the database as easy as possible. It
        at the same time serves as a documentation on how data is imported in Grady.
        Let\'s dive right in.\n''')
    
        try:
            print('The following importers are available:\n')
            for fid, func in call_order.items():
                print(f'\t[{fid}] {func.__name__}')
    
            print()
            fid = i('Press enter for all in given order or choose a number')
            if fid:
                call_loader(call_order[int(fid)])
            else:
                for func in call_order.values():
                    call_loader(func)
        except (EOFError, KeyboardInterrupt) as err:
            return
        except Exception as err:
            import traceback
            traceback.print_exc()
        finally:
            readline.write_history_file(HISTFILE)