From 340123149f36e2ef2e2edb09064e974a7a080108 Mon Sep 17 00:00:00 2001 From: "robinwilliam.hundt" <robinwilliam.hundt@stud.uni-goettingen.de> Date: Fri, 29 Mar 2019 20:32:56 +0100 Subject: [PATCH] Importer is no compatible with Rusty-Hektor 1.0.0 Also removed a bunch of unnecessary code and extracted the test runner into https://gitlab.gwdg.de/grady-corp/grady-test-runner --- requirements.txt | 2 +- util/importer.py | 483 ++++++++++----------------------------------- util/processing.py | 242 ----------------------- util/testcases.py | 97 --------- 4 files changed, 107 insertions(+), 717 deletions(-) delete mode 100644 util/processing.py delete mode 100644 util/testcases.py diff --git a/requirements.txt b/requirements.txt index 5f8f6778..39aaeba4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,8 +8,8 @@ drf-yasg~=1.12.0 gunicorn~=19.9.0 psycopg2-binary~=2.7.0 python-json-logger~=0.1.0 -tqdm~=4.28.0 whitenoise~=4.1.0 xlrd~=1.2.0 xkcdpass==1.17.0 django-constance[database]~=2.3.1 +semver~=2.8.1 diff --git a/util/importer.py b/util/importer.py index eb5c1703..d72f4878 100644 --- a/util/importer.py +++ b/util/importer.py @@ -1,16 +1,15 @@ -import csv import json import os import readline -from typing import Callable +import util -from django.db import transaction - -import util.processing +from util.messages import warn from core.models import ExamType, Feedback, Submission, SubmissionType, Test from core.models import UserAccount as User from util.factories import GradyUserFactory -from util.messages import info, warn + +import semver + WELCOME = r''' ______ __ ____ __ @@ -22,14 +21,17 @@ WELCOME = r''' ''' HISTFILE = '.importer_history' -RECORDS = '.importer' PASSWORDS = '.importer_passwords' YES = 'Y/n' NO = 'y/N' +RUSTY_HEKTOR_MIN_VER = ">=1.0.0" +RUSTY_HEKTOR_MAX_VER = "<2.0.0" + valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} + ORIGIN_ORDER = { Feedback.WAS_EMPTY, Feedback.DID_NOT_COMPILE, @@ -49,22 +51,48 @@ FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER)) user_factory = GradyUserFactory() -class chdir_context(object): - """ - Step into a directory temporarily. - """ +def start(): - def __init__(self, path): - self.old_dir = os.getcwd() - self.new_dir = path + if os.path.exists(HISTFILE): + readline.read_history_file(HISTFILE) - def __enter__(self): - info(f'Changing to {self.new_dir}') - os.chdir(self.new_dir) + print(WELCOME + ''' - def __exit__(self, *args): - os.chdir(self.old_dir) - info(f'Returned to {self.old_dir}') + Welcome to the Grady import script! + + This script aims at making the setup of the database as easy as possible. + At the same time it serves as a documentation on how data is imported into + Grady. Let\'s dive right in.\n''') + + try: + print('The following sub importers are available:\n') + for fid, func in enumerate(call_order): + print(f'\t[{fid}] {func.__name__}') + print('\t[q] exit') + print() + + fid = i('Choose a number or hit enter to start at the beginning') + + if not fid: + for func in call_order: + func() + elif fid in ('q', 'quit', 'exit'): + return + elif not 0 <= int(fid) < len(call_order): + warn('There is no loader with this number') + else: + call_order[int(fid)]() + + except (EOFError, KeyboardInterrupt): + print() + return + except FileNotFoundError: + raise + except Exception: + import traceback + traceback.print_exc() + finally: + readline.write_history_file(HISTFILE) def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False): @@ -86,332 +114,32 @@ def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = Fal return answer -def add_feedback_if_test_recommends_it(test_obj): - available_tests = util.processing.Test.available_tests() - - if test_obj.label == available_tests[test_obj.name].label_failure \ - and not hasattr(test_obj.submission, 'feedback') \ - and (test_obj.name == util.processing.EmptyTest.__name__ or - test_obj.name == util.processing.CompileTest.__name__): - return Feedback.objects.update_or_create( - of_submission=test_obj.submission, - defaults={ - 'score': 0, - 'origin': FEEDBACK_MAPPER[test_obj.name], - 'is_final': True, - } - ) - - -def add_tests(submission_obj, tests): - auto_correct, _ = User.objects.get_or_create( - username='auto_correct', - defaults={'is_active': False} - ) - - for name in (name for name in TEST_ORDER if name in tests): - test_data = tests[name] - test_obj, created = Test.objects.update_or_create( - name=test_data['name'], - submission=submission_obj, - defaults={ - 'label': test_data['label'], - 'annotation': test_data['annotation'], - } - ) - add_feedback_if_test_recommends_it(test_obj) - - -# submission_type is the name outputted by rust_hektor, type the one from hektor -def add_submission(student_obj, code, tests, submission_type=None, type=None): - if submission_type is None and type is None: - raise Exception("Submission need to contain submission_type or type") - elif type is not None: - submission_type = type - - submission_type_obj = SubmissionType.objects.get(name=submission_type) - - submission_obj, _ = Submission.objects.update_or_create( - type=submission_type_obj, - student=student_obj, - defaults={'text': code} - ) - - if tests: - add_tests(submission_obj, tests) - - -def call_loader(func: Callable) -> None: - """ This function handles if a function will be executed at all. Currently - it just checks in the RECORDS file for the name of the function. If it is - present the function will not be executed - - Args: - func (Callable): the loader specified below - """ - if os.path.exists(RECORDS): - with open(RECORDS, 'r') as records_f: - done = [line.strip() for line in records_f] - - if func.__name__ in done: - warn(f'{func.__name__} has already been processed once.') - if not i('Proceed anyway?', NO): - return - - with transaction.atomic(): - func() # This executes the specified loader - - with open(RECORDS, 'a') as records_f: - records_f.write(func.__name__) - records_f.write('\n') - - info(f'{func.__name__} is done.') - - -def file_suffix_to_lang_name(suffix: str) -> str: - suffix2name = { - 'hs': 'haskell', - 's': 'mipsasm', - 'asm': 'mipsasm' - } - if suffix not in suffix2name: - return suffix - return suffix2name[suffix] - - -def do_load_submission_types(): - - print( - '''For the following import you need three files: - - 1) A .csv file where the columns are: id, name, score, (file suffix). No - suffix defaults to .c - Supported suffixes: .c , .java , .hs , .s (for mips) - 2) A path to a directory where I can find sample solutions named - <id>-lsg.c - 3) A path to a directory where I can find HTML files with an accurate - description of the task. File name pattern has to be: <id>.html - - Example: - $ cat submission_types.csv - a01, Alpha Team, 10, .c - a02, Beta Distribution, 10, .java - a03, Gamma Ray, 20 - - $ tree -L 2 - . - ├── code-lsg - │  ├── a01.c - │  ├── a02.java - │  └── a03.hs - └── html -   ├── a01.html -   ├── a02.html -   └── a03.html - ''') - - path = i('Where are your files located?', '.', is_path=True) - - with chdir_context(path): - submission_types_csv = i('CSV file', 'submission_types.csv') - lsg_dir = i('solution dir', 'code-lsg') - desc_dir = i('descriptions dir', 'html') - - with open(submission_types_csv, encoding='utf-8') as tfile: - csv_rows = [row for row in csv.reader(tfile) if len(row) > 0] - - for row in csv_rows: - tid, name, score, *suffix = (col.strip() for col in row) - - if not suffix: - suffix = '.c' - else: - suffix = suffix[0] - - suffix = suffix.lower().strip('.') - lang_name = file_suffix_to_lang_name(suffix) - - with \ - open(os.path.join(lsg_dir, tid + '.' + suffix), - encoding='utf-8') as lsg, \ - open(os.path.join(desc_dir, tid + '.html'), - encoding='utf-8') as desc: - data = { - 'name': name, - 'description': desc.read(), - 'solution': lsg.read(), - 'full_score': int(score), - 'programming_language': lang_name - } - _, created = SubmissionType.objects.update_or_create( - name=name, - defaults=data - ) - info(f'{"Created" if created else "Updated"} {name}') - - -def do_load_module_descriptions(): - - print(''' - This loader imports descriptions of modules in an exam. This information - is used to distinguish students within one instance or give information - about the grading type. - - CSV file format: module_reference, total_score, pass_score, pass_only - - Example: - B.Inf.1801, 90, 45, yes - B.Mat.31415, 50, 10, no - ''') - - module_description_csv = i( - 'Where is the file?', 'modules.csv', is_file=True) - - with open(module_description_csv, encoding='utf-8') as tfile: - csv_rows = [row for row in csv.reader(tfile) if len(row) > 0] - - for row in csv_rows: - data = { - field: kind(data) for field, kind, data in zip( - ('module_reference', 'total_score', 'pass_score', 'pass_only'), - (str, int, int, lambda x: x == 'yes'), - (col.strip() for col in row) - ) - } - - _, created = ExamType.objects.update_or_create( - module_reference=data['module_reference'], - defaults=data, - ) - - modification = "Created" if created else "Updated" - info(f'{modification} ExamType {data["module_reference"]}') - - -def _do_check_empty_submissions(): - submissions = i( - 'Please provide the student submissions', 'binf1601-anon.json', - is_file=True) - return ( - util.processing.process('', '', '', submissions, '', util.processing.EmptyTest.__name__), - submissions) - - -def _do_preprocess_c_submissions(test_to_run): - location = i('Where do you keep the specifications for the tests?', - 'anon-export', is_path=True) - - with chdir_context(location): - descfile = i( - 'Please provide usage for sample solution', 'descfile.txt', - is_file=True) - binaries = i( - 'Please provide executable binaries of solution', 'bin', - is_path=True) - objects = i( - 'Please provide object files of solution', 'objects', - is_path=True) - submissions = i( - 'Please provide the student submissions', 'binf1601-anon.json', - is_file=True) - headers = i( - 'Please provide header files if any', 'code-testing', - is_path=True) - - info('Looks good. The tests mights take some time.') - return util.processing.process(descfile, - binaries, - objects, - submissions, - headers, - test_to_run), submissions - - -def do_preprocess_submissions(): - - print(''' - Preprocessing might take some time depending on the amount of data - and the complexity of the programs and the corresponding unit tests. You - can specify what test you want to run. - - Tests do depend on each other. Therefore specifying a test will also - result in running all its dependencies. - - The EmptyTest can be run on all submission types. The other tests are very specific - to the c programming course. - \n''') - - test_enum = dict(enumerate(util.processing.Test.available_tests())) - - print('The following test are available:\n') - print('\t[q] Do nothing') - for j, test in test_enum.items(): - print(f'\t[{j}] {test}') - print() - - test_index = i('Which tests do you want to run?') - - if not test_index or test_index == 'q': - return - - test_to_run = test_enum[int(test_index)] - - # processed_submissions = None - if test_to_run == util.processing.EmptyTest.__name__: - processed_submissions, submissions = _do_check_empty_submissions() - else: - processed_submissions, submissions = _do_preprocess_c_submissions(test_to_run) - - output_f = i('And everything is done. Where should I put the results?', - f'{submissions.rsplit(".")[0]}.processed.json') - - with open(output_f, 'w+') as outfile: - json.dump(processed_submissions, outfile, - sort_keys=True, indent=4) - info('Wrote processed data to %s' % os.path.join(os.curdir, output_f)) - - -def do_load_submissions(): - - file = i('Get me the file with all the submissions', +def load_hektor_json(): + file = i('Get me the file with the output from rusty-hektor', 'submissions.json', is_file=True) - if not ExamType.objects.all(): - raise Exception('Modules need to be loaded before submissions.') - else: - exam_query_set = ExamType.objects.all() - print('Please select the corresponding module') - print('You have the following choices:\n') - for j, exam_type in enumerate(exam_query_set): - print(f'\t[{j}] {exam_type.module_reference}') - print() + with open(file, 'r') as f: + exam_data = json.JSONDecoder().decode(f.read()) + + hektor_version = exam_data['meta']['version'] + if not (semver.match(hektor_version, RUSTY_HEKTOR_MIN_VER) and + semver.match(hektor_version, RUSTY_HEKTOR_MAX_VER)): + warn(f'The data you\'re trying to import has the wrong version {hektor_version}\n' + f'Requirements: {RUSTY_HEKTOR_MIN_VER}, {RUSTY_HEKTOR_MAX_VER}') - exam_prompt_key = i('Choose wisely') - exam_obj = {'exam': exam_query_set[int(exam_prompt_key)]} + exam, _ = ExamType.objects.get_or_create(**exam_data['module']) - with open(file) as exam_data_file: - exam_data = json.JSONDecoder().decode(exam_data_file.read()) + for submission_type in exam['submission_types']: + SubmissionType.objects.get_or_create(**submission_type) for student in exam_data['students']: - student_obj = user_factory.make_student(**exam_obj, + student_obj = user_factory.make_student(exam=exam, **student).student for submission_obj in student['submissions']: add_submission(student_obj, **submission_obj) -def do_load_tutors(): - - print('Please import tutor users by providing one name per line') - tutors = i('List of tutors', 'tutors', is_file=True) - - with open(tutors) as tutors_f: - for tutor in tutors_f: - if len(tutor.strip()) > 0: - user_factory.make_tutor(tutor.strip(), store_pw=True) - - -def do_load_reviewer(): - +def load_reviewers(): print('Please import reviewer users by providing one name per line') reviewers = i('List of reviewers', 'reviewers', is_file=True) @@ -422,55 +150,56 @@ def do_load_reviewer(): store_pw=True) -call_order = ( - do_load_submission_types, - do_load_module_descriptions, - do_preprocess_submissions, - do_load_submissions, - do_load_tutors, - do_load_reviewer -) +def add_submission(student_obj, code, tests, type=None): + submission_type_obj = SubmissionType.objects.get(name=type) + submission_obj, _ = Submission.objects.update_or_create( + type=submission_type_obj, + student=student_obj, + defaults={'text': code} + ) -def start(): + if tests: + add_tests(submission_obj, tests) - if os.path.exists(HISTFILE): - readline.read_history_file(HISTFILE) - print(WELCOME + ''' +def add_tests(submission_obj, tests): + auto_correct, _ = User.objects.get_or_create( + username='auto_correct', + defaults={'is_active': False} + ) - Welcome to the Grady import script! + for name in (name for name in TEST_ORDER if name in tests): + test_data = tests[name] + test_obj, created = Test.objects.update_or_create( + name=test_data['name'], + submission=submission_obj, + defaults={ + 'label': test_data['label'], + 'annotation': test_data['annotation'], + } + ) + add_feedback_if_test_recommends_it(test_obj) - This script aims at making the setup of the database as easy as possible. - At the same time it serves as a documentation on how data is imported into - Grady. Let\'s dive right in.\n''') - try: - print('The following sub importers are available:\n') - for fid, func in enumerate(call_order): - print(f'\t[{fid}] {func.__name__}') - print('\t[q] exit') - print() +def add_feedback_if_test_recommends_it(test_obj): + available_tests = util.processing.Test.available_tests() - fid = i('Choose a number or hit enter to start at the beginning') + if test_obj.label == available_tests[test_obj.name].label_failure \ + and not hasattr(test_obj.submission, 'feedback') \ + and (test_obj.name == util.processing.EmptyTest.__name__ or + test_obj.name == util.processing.CompileTest.__name__): + return Feedback.objects.update_or_create( + of_submission=test_obj.submission, + defaults={ + 'score': 0, + 'origin': FEEDBACK_MAPPER[test_obj.name], + 'is_final': True, + } + ) - if not fid: - for func in call_order: - call_loader(func) - elif fid in ('q', 'quit', 'exit'): - return - elif not 0 <= int(fid) < len(call_order): - warn('There is no loader with this number') - else: - call_loader(call_order[int(fid)]) - except (EOFError, KeyboardInterrupt): - print() - return - except FileNotFoundError: - raise - except Exception: - import traceback - traceback.print_exc() - finally: - readline.write_history_file(HISTFILE) +call_order = [ + load_hektor_json, + load_reviewers +] diff --git a/util/processing.py b/util/processing.py deleted file mode 100644 index 6063600e..00000000 --- a/util/processing.py +++ /dev/null @@ -1,242 +0,0 @@ -import abc -import hashlib -import json -import logging -import os -import re -import shutil -import subprocess -import tempfile - -from tqdm import tqdm - -try: - import testcases -except ModuleNotFoundError: - from util import testcases - -log = logging.getLogger(__name__) - - -def run_cmd(cmd, stdin=None, check=False, timeout=1): - return subprocess.run( - 'timeout 1 ' + cmd, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, - input=stdin, - shell=True, - check=check, - encoding='utf-8', - timeout=timeout - ) - - -def all_subclasses(cls): - return cls.__subclasses__() \ - + [g for s in cls.__subclasses__() for g in all_subclasses(s)] - - -def sha1(submission_obj): - return hashlib.sha1(submission_obj['code'].encode()).hexdigest() - - -def get_submission_id(submission_obj): - t = submission_obj['type'] - m = re.search(r'(a0\d)', t) - return m.group(0) - - -class Test(metaclass=abc.ABCMeta): - """docstring for IliasQuestion""" - - @classmethod - def available_tests(cls): - return {sub.__name__: sub for sub in all_subclasses(cls)} - - def __new__(cls, *args, **kwargs): - assert hasattr(cls, 'depends'), "depends not defined" - assert hasattr(cls, 'label_success'), "label_success not defined" - assert hasattr(cls, 'label_failure'), "label_failure not defined" - return super().__new__(cls) - - def __init__(self, submission_obj, **kwargs): - if not self.dependencies_satisfied(submission_obj): - self.result = False - self.annotation = "TEST DEPENDENCY NOT MET" - self.serialize(submission_obj) - - elif str(self) in submission_obj['tests']: - self.deserialize(submission_obj['tests'][str(self)]) - - else: - self.result, self.annotation = self.run_test(submission_obj) - self.serialize(submission_obj) - - def __bool__(self): - return self.result - - def __str__(self): - return self.__class__.__name__ - - def dependencies_satisfied(self, submission_obj): - return all(dep(submission_obj).result for dep in self.depends) - - def deserialize(self, test): - self.result = test['label'] == self.label_success - self.annotation = test['annotation'] - - def serialize(self, submission_obj): - as_dict = { - 'name': str(self), - 'annotation': self.annotation - } - - if self.result: - as_dict['label'] = self.label_success - else: - as_dict['label'] = self.label_failure - - submission_obj['tests'][str(self)] = as_dict - - @abc.abstractmethod - def run_test(self, submission_obj) -> (bool, str): - return NotImplemented - - -class EmptyTest(Test): - """docstring for EmptyTest""" - - depends = () - label_success = 'NOT_EMPTY' - label_failure = 'EMPTY' - - def run_test(self, submission_obj): - return bool(submission_obj['code'].strip()), "" - - -class CompileTest(Test): - - depends = (EmptyTest, ) - label_success = 'COMPILATION_SUCCESSFUL' - label_failure = 'COMPILATION_FAILED' - - def run_test(self, submission_obj): - - ret = run_cmd( - "gcc -Wall -c -x c -std=c11 -Icode-testing -o code.o -", - submission_obj['code']) - return not ret.returncode, ret.stderr - - -class LinkTest(Test): - - depends = (CompileTest, ) - label_success = 'LINKING_SUCCESSFUL' - label_failure = 'LINKING_FAILED' - - def run_test(self, submission_obj): - - if submission_obj['type'] not in testcases_dict: - return False, 'This program was not required to be executable.' - - cid = get_submission_id(submission_obj) - ret = run_cmd(f"gcc-7 -o ./bin/{cid} objects/{cid}-testing.o code.o") - return not ret.returncode, ret.stderr - - -class UnitTestTest(Test): - """docstring for UnitTestTest""" - - depends = (LinkTest, ) - label_success = 'UNITTEST_SUCCSESSFUL' - label_failure = 'UNITTEST_FAILED' - - @staticmethod - def testcase(i, args, stdout, cid): - try: - ret = run_cmd("./bin/%s %s" % (cid, args), check=True, timeout=0.1) - assert ret.stdout == stdout - except AssertionError: - return False, "Case #{}: [ASSERT FAIL] ./prog {:>2} WAS '{}' SHOULD '{}'".format( # noqa: E501 - i, args, ret.stdout.strip(), stdout.strip()) - except subprocess.CalledProcessError as err: - return False, "Case #{:>2}: [FAILED] ./prog {} ERROR '{}'".format( - i, args, err.stderr.strip()) - except subprocess.TimeoutExpired: - return False, "Case #{:>2}: [TIMEOUT] ./prog {}".format(i, args) - else: - return True, "Case #{:>2}: [SUCCESS] ./prog {}".format(i, args) - - def run_test(self, submission_obj): - - task = testcases_dict[submission_obj['type']] - cid = get_submission_id(submission_obj) - return_data = [self.testcase(i, case, result, cid) - for i, (case, result) in enumerate(zip(task['cases'], - task['results'])) - ] - results, messages = zip(*return_data) - - return all(results), '\n'.join(messages) - - -def process(descfile, binaries, objects, submissions, header, highest_test): - if isinstance(highest_test, str): - highest_test_class = Test.available_tests()[highest_test] - if highest_test != EmptyTest.__name__: # not needed for EmptyTest - global testcases_dict - testcases_dict = testcases.evaluated_testcases(descfile, binaries) - - with open(submissions) as submission_file: - submissions_json = json.JSONDecoder().decode( - submission_file.read()) - - # Get something disposable - if highest_test != EmptyTest.__name__: - path = tempfile.mkdtemp() - run_cmd(f'cp -r {objects} {path}') - run_cmd(f'cp -r {binaries} {path}') - run_cmd(f'cp -r {header} {path}') - os.chdir(path) - os.makedirs('bin') - - def iterate_submissions(): - yield from (obj - for student in tqdm(submissions_json['students']) - for obj in student['submissions']) - - for submission_obj in tqdm(iterate_submissions()): - highest_test_class(submission_obj) - if highest_test != EmptyTest.__name__: - run_cmd('rm code*') - print() # line after progress bar - if highest_test != EmptyTest.__name__: - shutil.rmtree(path) - return submissions_json - - -def parseme(): - import argparse - parser = argparse.ArgumentParser() - parser.add_argument('descfile') - parser.add_argument('binaries') - parser.add_argument('objects') - parser.add_argument('submissions') - parser.add_argument('header') - parser.add_argument('test') - return parser.parse_args() - - -if __name__ == '__main__': - args = parseme() - testcases_dict = testcases.evaluated_testcases(args.descfile, - args.binaries) - - print(json.dumps(process(args.descfile, - args.binaries, - args.objects, - args.submissions, - args.header, - args.test), - sort_keys=True, - indent=4)) diff --git a/util/testcases.py b/util/testcases.py deleted file mode 100644 index 819d871a..00000000 --- a/util/testcases.py +++ /dev/null @@ -1,97 +0,0 @@ -import os -import random -import re -from string import ascii_letters, digits - -try: - import processing -except ModuleNotFoundError: - from util import processing - -types = ('integer', 'unsigned_integer', 'character', 'string') -list_sep = '...' - -re_task = re.compile(r'^-- (?P<title>.*)\n(USAGE: (?P<cmd>[\./\w]+) (?P<syntax>.*)|NO EXECUTABLE)', re.MULTILINE) # noqa: E501 -re_args = re.compile(rf"<({'|'.join(types)}|{'|'.join(t + '_list' for t in types)})>") # noqa: E501 - - -def call_function(name: str, *args, **kwargs): - return globals()[name](*args, **kwargs) - - -def integer(bounds=50): - return random.randint(-bounds, bounds) - - -def unsigned_integer(upper=50): - return random.randint(0, upper) - - -def character(): - return random.choice(10 * ascii_letters + 2 * digits + '%*+,-./:?@[]^_{}~') - - -def string(lenght=31): - return ''.join(character() for i in range(2, 2 + unsigned_integer(lenght))) - - -def type_list(_type): - def generic_list(): - return ' '.join(str( - call_function(_type)) for i in range(2, unsigned_integer(6) * 2)) - return generic_list - - -def rubbish(): - return str(call_function( - random.choice(tuple(t + '_list' for t in types) + types))) - - -def argument_generator(syntax): - syntax, _ = re.subn( - r'<([\w\s]+)> <\1> \.\.\. <\1> <\1>', r'<\1_list>', syntax) - syntax, _ = re.subn(r'<(\w+)\s(\w+)>', r'<\1_\2>', syntax) - - return ' '.join( - str(call_function(arg)) for arg in re.findall(re_args, syntax)) - - -def testcases_generator(task, n=10): - syntax = task.group('syntax') - - if not syntax: - return - - if syntax == 'NO INPUT': - yield 'NO INPUT' - return - - for i in range(n): - yield argument_generator(syntax) - - -def testcases(description_path): - for t in types: - globals()[t + '_list'] = type_list(t) # I fucking love it - - with open(description_path) as description_file: - description = description_file.read() - - return { - task['title']: { - 'cmd': task['cmd'], - 'cases': [t for t in testcases_generator(task)] - } for task in re.finditer(re_task, description) - } - - -def evaluated_testcases(description_path, binaries): - task_testcases = testcases(description_path) - - for task in filter(lambda t: t['cmd'], task_testcases.values()): - path_to_binary = os.path.join(os.path.join( - binaries, os.path.basename(task['cmd']))) - task['results'] = [processing.run_cmd( - f"{path_to_binary} {case}").stdout for case in task['cases']] - - return task_testcases -- GitLab