Skip to content
Snippets Groups Projects
Commit 2b845705 authored by robinwilliam.hundt's avatar robinwilliam.hundt
Browse files

Importer is no compatible with Rusty-Hektor 1.0.0

Also removed a bunch of unnecessary code and extracted the test runner into https://gitlab.gwdg.de/grady-corp/grady-test-runner
parent e9dbd6b4
Branches
No related tags found
No related merge requests found
Pipeline #93765 passed
...@@ -8,8 +8,8 @@ drf-yasg~=1.12.0 ...@@ -8,8 +8,8 @@ drf-yasg~=1.12.0
gunicorn~=19.9.0 gunicorn~=19.9.0
psycopg2-binary~=2.7.0 psycopg2-binary~=2.7.0
python-json-logger~=0.1.0 python-json-logger~=0.1.0
tqdm~=4.28.0
whitenoise~=4.1.0 whitenoise~=4.1.0
xlrd~=1.2.0 xlrd~=1.2.0
xkcdpass==1.17.0 xkcdpass==1.17.0
django-constance[database]~=2.3.1 django-constance[database]~=2.3.1
semver~=2.8.1
import csv
import json import json
import os import os
import readline import readline
from typing import Callable import util
from django.db import transaction from util.messages import warn
import util.processing
from core.models import ExamType, Feedback, Submission, SubmissionType, Test from core.models import ExamType, Feedback, Submission, SubmissionType, Test
from core.models import UserAccount as User from core.models import UserAccount as User
from util.factories import GradyUserFactory from util.factories import GradyUserFactory
from util.messages import info, warn
import semver
WELCOME = r''' WELCOME = r'''
______ __ ____ __ ______ __ ____ __
...@@ -22,14 +21,17 @@ WELCOME = r''' ...@@ -22,14 +21,17 @@ WELCOME = r'''
''' '''
HISTFILE = '.importer_history' HISTFILE = '.importer_history'
RECORDS = '.importer'
PASSWORDS = '.importer_passwords' PASSWORDS = '.importer_passwords'
YES = 'Y/n' YES = 'Y/n'
NO = 'y/N' NO = 'y/N'
RUSTY_HEKTOR_MIN_VER = ">=1.0.0"
RUSTY_HEKTOR_MAX_VER = "<2.0.0"
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
ORIGIN_ORDER = { ORIGIN_ORDER = {
Feedback.WAS_EMPTY, Feedback.WAS_EMPTY,
Feedback.DID_NOT_COMPILE, Feedback.DID_NOT_COMPILE,
...@@ -49,22 +51,48 @@ FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER)) ...@@ -49,22 +51,48 @@ FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER))
user_factory = GradyUserFactory() user_factory = GradyUserFactory()
class chdir_context(object): def start():
"""
Step into a directory temporarily.
"""
def __init__(self, path): if os.path.exists(HISTFILE):
self.old_dir = os.getcwd() readline.read_history_file(HISTFILE)
self.new_dir = path
def __enter__(self): print(WELCOME + '''
info(f'Changing to {self.new_dir}')
os.chdir(self.new_dir)
def __exit__(self, *args): Welcome to the Grady import script!
os.chdir(self.old_dir)
info(f'Returned to {self.old_dir}') This script aims at making the setup of the database as easy as possible.
At the same time it serves as a documentation on how data is imported into
Grady. Let\'s dive right in.\n''')
try:
print('The following sub importers are available:\n')
for fid, func in enumerate(call_order):
print(f'\t[{fid}] {func.__name__}')
print('\t[q] exit')
print()
fid = i('Choose a number or hit enter to start at the beginning')
if not fid:
for func in call_order:
func()
elif fid in ('q', 'quit', 'exit'):
return
elif not 0 <= int(fid) < len(call_order):
warn('There is no loader with this number')
else:
call_order[int(fid)]()
except (EOFError, KeyboardInterrupt):
print()
return
except FileNotFoundError:
raise
except Exception:
import traceback
traceback.print_exc()
finally:
readline.write_history_file(HISTFILE)
def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False): def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False):
...@@ -86,332 +114,32 @@ def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = Fal ...@@ -86,332 +114,32 @@ def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = Fal
return answer return answer
def add_feedback_if_test_recommends_it(test_obj): def load_hektor_json():
available_tests = util.processing.Test.available_tests() file = i('Get me the file with the output from rusty-hektor',
if test_obj.label == available_tests[test_obj.name].label_failure \
and not hasattr(test_obj.submission, 'feedback') \
and (test_obj.name == util.processing.EmptyTest.__name__ or
test_obj.name == util.processing.CompileTest.__name__):
return Feedback.objects.update_or_create(
of_submission=test_obj.submission,
defaults={
'score': 0,
'origin': FEEDBACK_MAPPER[test_obj.name],
'is_final': True,
}
)
def add_tests(submission_obj, tests):
auto_correct, _ = User.objects.get_or_create(
username='auto_correct',
defaults={'is_active': False}
)
for name in (name for name in TEST_ORDER if name in tests):
test_data = tests[name]
test_obj, created = Test.objects.update_or_create(
name=test_data['name'],
submission=submission_obj,
defaults={
'label': test_data['label'],
'annotation': test_data['annotation'],
}
)
add_feedback_if_test_recommends_it(test_obj)
# submission_type is the name outputted by rust_hektor, type the one from hektor
def add_submission(student_obj, code, tests, submission_type=None, type=None):
if submission_type is None and type is None:
raise Exception("Submission need to contain submission_type or type")
elif type is not None:
submission_type = type
submission_type_obj = SubmissionType.objects.get(name=submission_type)
submission_obj, _ = Submission.objects.update_or_create(
type=submission_type_obj,
student=student_obj,
defaults={'text': code}
)
if tests:
add_tests(submission_obj, tests)
def call_loader(func: Callable) -> None:
""" This function handles if a function will be executed at all. Currently
it just checks in the RECORDS file for the name of the function. If it is
present the function will not be executed
Args:
func (Callable): the loader specified below
"""
if os.path.exists(RECORDS):
with open(RECORDS, 'r') as records_f:
done = [line.strip() for line in records_f]
if func.__name__ in done:
warn(f'{func.__name__} has already been processed once.')
if not i('Proceed anyway?', NO):
return
with transaction.atomic():
func() # This executes the specified loader
with open(RECORDS, 'a') as records_f:
records_f.write(func.__name__)
records_f.write('\n')
info(f'{func.__name__} is done.')
def file_suffix_to_lang_name(suffix: str) -> str:
suffix2name = {
'hs': 'haskell',
's': 'mipsasm',
'asm': 'mipsasm'
}
if suffix not in suffix2name:
return suffix
return suffix2name[suffix]
def do_load_submission_types():
print(
'''For the following import you need three files:
1) A .csv file where the columns are: id, name, score, (file suffix). No
suffix defaults to .c
Supported suffixes: .c , .java , .hs , .s (for mips)
2) A path to a directory where I can find sample solutions named
<id>-lsg.c
3) A path to a directory where I can find HTML files with an accurate
description of the task. File name pattern has to be: <id>.html
Example:
$ cat submission_types.csv
a01, Alpha Team, 10, .c
a02, Beta Distribution, 10, .java
a03, Gamma Ray, 20
$ tree -L 2
.
├── code-lsg
│ ├── a01.c
│ ├── a02.java
│ └── a03.hs
└── html
├── a01.html
├── a02.html
└── a03.html
''')
path = i('Where are your files located?', '.', is_path=True)
with chdir_context(path):
submission_types_csv = i('CSV file', 'submission_types.csv')
lsg_dir = i('solution dir', 'code-lsg')
desc_dir = i('descriptions dir', 'html')
with open(submission_types_csv, encoding='utf-8') as tfile:
csv_rows = [row for row in csv.reader(tfile) if len(row) > 0]
for row in csv_rows:
tid, name, score, *suffix = (col.strip() for col in row)
if not suffix:
suffix = '.c'
else:
suffix = suffix[0]
suffix = suffix.lower().strip('.')
lang_name = file_suffix_to_lang_name(suffix)
with \
open(os.path.join(lsg_dir, tid + '.' + suffix),
encoding='utf-8') as lsg, \
open(os.path.join(desc_dir, tid + '.html'),
encoding='utf-8') as desc:
data = {
'name': name,
'description': desc.read(),
'solution': lsg.read(),
'full_score': int(score),
'programming_language': lang_name
}
_, created = SubmissionType.objects.update_or_create(
name=name,
defaults=data
)
info(f'{"Created" if created else "Updated"} {name}')
def do_load_module_descriptions():
print('''
This loader imports descriptions of modules in an exam. This information
is used to distinguish students within one instance or give information
about the grading type.
CSV file format: module_reference, total_score, pass_score, pass_only
Example:
B.Inf.1801, 90, 45, yes
B.Mat.31415, 50, 10, no
''')
module_description_csv = i(
'Where is the file?', 'modules.csv', is_file=True)
with open(module_description_csv, encoding='utf-8') as tfile:
csv_rows = [row for row in csv.reader(tfile) if len(row) > 0]
for row in csv_rows:
data = {
field: kind(data) for field, kind, data in zip(
('module_reference', 'total_score', 'pass_score', 'pass_only'),
(str, int, int, lambda x: x == 'yes'),
(col.strip() for col in row)
)
}
_, created = ExamType.objects.update_or_create(
module_reference=data['module_reference'],
defaults=data,
)
modification = "Created" if created else "Updated"
info(f'{modification} ExamType {data["module_reference"]}')
def _do_check_empty_submissions():
submissions = i(
'Please provide the student submissions', 'binf1601-anon.json',
is_file=True)
return (
util.processing.process('', '', '', submissions, '', util.processing.EmptyTest.__name__),
submissions)
def _do_preprocess_c_submissions(test_to_run):
location = i('Where do you keep the specifications for the tests?',
'anon-export', is_path=True)
with chdir_context(location):
descfile = i(
'Please provide usage for sample solution', 'descfile.txt',
is_file=True)
binaries = i(
'Please provide executable binaries of solution', 'bin',
is_path=True)
objects = i(
'Please provide object files of solution', 'objects',
is_path=True)
submissions = i(
'Please provide the student submissions', 'binf1601-anon.json',
is_file=True)
headers = i(
'Please provide header files if any', 'code-testing',
is_path=True)
info('Looks good. The tests mights take some time.')
return util.processing.process(descfile,
binaries,
objects,
submissions,
headers,
test_to_run), submissions
def do_preprocess_submissions():
print('''
Preprocessing might take some time depending on the amount of data
and the complexity of the programs and the corresponding unit tests. You
can specify what test you want to run.
Tests do depend on each other. Therefore specifying a test will also
result in running all its dependencies.
The EmptyTest can be run on all submission types. The other tests are very specific
to the c programming course.
\n''')
test_enum = dict(enumerate(util.processing.Test.available_tests()))
print('The following test are available:\n')
print('\t[q] Do nothing')
for j, test in test_enum.items():
print(f'\t[{j}] {test}')
print()
test_index = i('Which tests do you want to run?')
if not test_index or test_index == 'q':
return
test_to_run = test_enum[int(test_index)]
# processed_submissions = None
if test_to_run == util.processing.EmptyTest.__name__:
processed_submissions, submissions = _do_check_empty_submissions()
else:
processed_submissions, submissions = _do_preprocess_c_submissions(test_to_run)
output_f = i('And everything is done. Where should I put the results?',
f'{submissions.rsplit(".")[0]}.processed.json')
with open(output_f, 'w+') as outfile:
json.dump(processed_submissions, outfile,
sort_keys=True, indent=4)
info('Wrote processed data to %s' % os.path.join(os.curdir, output_f))
def do_load_submissions():
file = i('Get me the file with all the submissions',
'submissions.json', is_file=True) 'submissions.json', is_file=True)
if not ExamType.objects.all(): with open(file, 'r') as f:
raise Exception('Modules need to be loaded before submissions.') exam_data = json.JSONDecoder().decode(f.read())
else:
exam_query_set = ExamType.objects.all() hektor_version = exam_data['meta']['version']
print('Please select the corresponding module') if not (semver.match(hektor_version, RUSTY_HEKTOR_MIN_VER) and
print('You have the following choices:\n') semver.match(hektor_version, RUSTY_HEKTOR_MAX_VER)):
for j, exam_type in enumerate(exam_query_set): warn(f'The data you\'re trying to import has the wrong version {hektor_version}\n'
print(f'\t[{j}] {exam_type.module_reference}') f'Requirements: {RUSTY_HEKTOR_MIN_VER}, {RUSTY_HEKTOR_MAX_VER}')
print()
exam_prompt_key = i('Choose wisely') exam, _ = ExamType.objects.get_or_create(**exam_data['module'])
exam_obj = {'exam': exam_query_set[int(exam_prompt_key)]}
with open(file) as exam_data_file: for submission_type in exam['submission_types']:
exam_data = json.JSONDecoder().decode(exam_data_file.read()) SubmissionType.objects.get_or_create(**submission_type)
for student in exam_data['students']: for student in exam_data['students']:
student_obj = user_factory.make_student(**exam_obj, student_obj = user_factory.make_student(exam=exam,
**student).student **student).student
for submission_obj in student['submissions']: for submission_obj in student['submissions']:
add_submission(student_obj, **submission_obj) add_submission(student_obj, **submission_obj)
def do_load_tutors(): def load_reviewers():
print('Please import tutor users by providing one name per line')
tutors = i('List of tutors', 'tutors', is_file=True)
with open(tutors) as tutors_f:
for tutor in tutors_f:
if len(tutor.strip()) > 0:
user_factory.make_tutor(tutor.strip(), store_pw=True)
def do_load_reviewer():
print('Please import reviewer users by providing one name per line') print('Please import reviewer users by providing one name per line')
reviewers = i('List of reviewers', 'reviewers', is_file=True) reviewers = i('List of reviewers', 'reviewers', is_file=True)
...@@ -422,55 +150,56 @@ def do_load_reviewer(): ...@@ -422,55 +150,56 @@ def do_load_reviewer():
store_pw=True) store_pw=True)
call_order = ( def add_submission(student_obj, code, tests, type=None):
do_load_submission_types, submission_type_obj = SubmissionType.objects.get(name=type)
do_load_module_descriptions,
do_preprocess_submissions,
do_load_submissions,
do_load_tutors,
do_load_reviewer
)
submission_obj, _ = Submission.objects.update_or_create(
type=submission_type_obj,
student=student_obj,
defaults={'text': code}
)
def start(): if tests:
add_tests(submission_obj, tests)
if os.path.exists(HISTFILE):
readline.read_history_file(HISTFILE)
print(WELCOME + ''' def add_tests(submission_obj, tests):
auto_correct, _ = User.objects.get_or_create(
username='auto_correct',
defaults={'is_active': False}
)
Welcome to the Grady import script! for name in (name for name in TEST_ORDER if name in tests):
test_data = tests[name]
test_obj, created = Test.objects.update_or_create(
name=test_data['name'],
submission=submission_obj,
defaults={
'label': test_data['label'],
'annotation': test_data['annotation'],
}
)
add_feedback_if_test_recommends_it(test_obj)
This script aims at making the setup of the database as easy as possible.
At the same time it serves as a documentation on how data is imported into
Grady. Let\'s dive right in.\n''')
try: def add_feedback_if_test_recommends_it(test_obj):
print('The following sub importers are available:\n') available_tests = util.processing.Test.available_tests()
for fid, func in enumerate(call_order):
print(f'\t[{fid}] {func.__name__}')
print('\t[q] exit')
print()
fid = i('Choose a number or hit enter to start at the beginning') if test_obj.label == available_tests[test_obj.name].label_failure \
and not hasattr(test_obj.submission, 'feedback') \
and (test_obj.name == util.processing.EmptyTest.__name__ or
test_obj.name == util.processing.CompileTest.__name__):
return Feedback.objects.update_or_create(
of_submission=test_obj.submission,
defaults={
'score': 0,
'origin': FEEDBACK_MAPPER[test_obj.name],
'is_final': True,
}
)
if not fid:
for func in call_order:
call_loader(func)
elif fid in ('q', 'quit', 'exit'):
return
elif not 0 <= int(fid) < len(call_order):
warn('There is no loader with this number')
else:
call_loader(call_order[int(fid)])
except (EOFError, KeyboardInterrupt): call_order = [
print() load_hektor_json,
return load_reviewers
except FileNotFoundError: ]
raise
except Exception:
import traceback
traceback.print_exc()
finally:
readline.write_history_file(HISTFILE)
import abc
import hashlib
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
from tqdm import tqdm
try:
import testcases
except ModuleNotFoundError:
from util import testcases
log = logging.getLogger(__name__)
def run_cmd(cmd, stdin=None, check=False, timeout=1):
return subprocess.run(
'timeout 1 ' + cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
input=stdin,
shell=True,
check=check,
encoding='utf-8',
timeout=timeout
)
def all_subclasses(cls):
return cls.__subclasses__() \
+ [g for s in cls.__subclasses__() for g in all_subclasses(s)]
def sha1(submission_obj):
return hashlib.sha1(submission_obj['code'].encode()).hexdigest()
def get_submission_id(submission_obj):
t = submission_obj['type']
m = re.search(r'(a0\d)', t)
return m.group(0)
class Test(metaclass=abc.ABCMeta):
"""docstring for IliasQuestion"""
@classmethod
def available_tests(cls):
return {sub.__name__: sub for sub in all_subclasses(cls)}
def __new__(cls, *args, **kwargs):
assert hasattr(cls, 'depends'), "depends not defined"
assert hasattr(cls, 'label_success'), "label_success not defined"
assert hasattr(cls, 'label_failure'), "label_failure not defined"
return super().__new__(cls)
def __init__(self, submission_obj, **kwargs):
if not self.dependencies_satisfied(submission_obj):
self.result = False
self.annotation = "TEST DEPENDENCY NOT MET"
self.serialize(submission_obj)
elif str(self) in submission_obj['tests']:
self.deserialize(submission_obj['tests'][str(self)])
else:
self.result, self.annotation = self.run_test(submission_obj)
self.serialize(submission_obj)
def __bool__(self):
return self.result
def __str__(self):
return self.__class__.__name__
def dependencies_satisfied(self, submission_obj):
return all(dep(submission_obj).result for dep in self.depends)
def deserialize(self, test):
self.result = test['label'] == self.label_success
self.annotation = test['annotation']
def serialize(self, submission_obj):
as_dict = {
'name': str(self),
'annotation': self.annotation
}
if self.result:
as_dict['label'] = self.label_success
else:
as_dict['label'] = self.label_failure
submission_obj['tests'][str(self)] = as_dict
@abc.abstractmethod
def run_test(self, submission_obj) -> (bool, str):
return NotImplemented
class EmptyTest(Test):
"""docstring for EmptyTest"""
depends = ()
label_success = 'NOT_EMPTY'
label_failure = 'EMPTY'
def run_test(self, submission_obj):
return bool(submission_obj['code'].strip()), ""
class CompileTest(Test):
depends = (EmptyTest, )
label_success = 'COMPILATION_SUCCESSFUL'
label_failure = 'COMPILATION_FAILED'
def run_test(self, submission_obj):
ret = run_cmd(
"gcc -Wall -c -x c -std=c11 -Icode-testing -o code.o -",
submission_obj['code'])
return not ret.returncode, ret.stderr
class LinkTest(Test):
depends = (CompileTest, )
label_success = 'LINKING_SUCCESSFUL'
label_failure = 'LINKING_FAILED'
def run_test(self, submission_obj):
if submission_obj['type'] not in testcases_dict:
return False, 'This program was not required to be executable.'
cid = get_submission_id(submission_obj)
ret = run_cmd(f"gcc-7 -o ./bin/{cid} objects/{cid}-testing.o code.o")
return not ret.returncode, ret.stderr
class UnitTestTest(Test):
"""docstring for UnitTestTest"""
depends = (LinkTest, )
label_success = 'UNITTEST_SUCCSESSFUL'
label_failure = 'UNITTEST_FAILED'
@staticmethod
def testcase(i, args, stdout, cid):
try:
ret = run_cmd("./bin/%s %s" % (cid, args), check=True, timeout=0.1)
assert ret.stdout == stdout
except AssertionError:
return False, "Case #{}: [ASSERT FAIL] ./prog {:>2} WAS '{}' SHOULD '{}'".format( # noqa: E501
i, args, ret.stdout.strip(), stdout.strip())
except subprocess.CalledProcessError as err:
return False, "Case #{:>2}: [FAILED] ./prog {} ERROR '{}'".format(
i, args, err.stderr.strip())
except subprocess.TimeoutExpired:
return False, "Case #{:>2}: [TIMEOUT] ./prog {}".format(i, args)
else:
return True, "Case #{:>2}: [SUCCESS] ./prog {}".format(i, args)
def run_test(self, submission_obj):
task = testcases_dict[submission_obj['type']]
cid = get_submission_id(submission_obj)
return_data = [self.testcase(i, case, result, cid)
for i, (case, result) in enumerate(zip(task['cases'],
task['results']))
]
results, messages = zip(*return_data)
return all(results), '\n'.join(messages)
def process(descfile, binaries, objects, submissions, header, highest_test):
if isinstance(highest_test, str):
highest_test_class = Test.available_tests()[highest_test]
if highest_test != EmptyTest.__name__: # not needed for EmptyTest
global testcases_dict
testcases_dict = testcases.evaluated_testcases(descfile, binaries)
with open(submissions) as submission_file:
submissions_json = json.JSONDecoder().decode(
submission_file.read())
# Get something disposable
if highest_test != EmptyTest.__name__:
path = tempfile.mkdtemp()
run_cmd(f'cp -r {objects} {path}')
run_cmd(f'cp -r {binaries} {path}')
run_cmd(f'cp -r {header} {path}')
os.chdir(path)
os.makedirs('bin')
def iterate_submissions():
yield from (obj
for student in tqdm(submissions_json['students'])
for obj in student['submissions'])
for submission_obj in tqdm(iterate_submissions()):
highest_test_class(submission_obj)
if highest_test != EmptyTest.__name__:
run_cmd('rm code*')
print() # line after progress bar
if highest_test != EmptyTest.__name__:
shutil.rmtree(path)
return submissions_json
def parseme():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('descfile')
parser.add_argument('binaries')
parser.add_argument('objects')
parser.add_argument('submissions')
parser.add_argument('header')
parser.add_argument('test')
return parser.parse_args()
if __name__ == '__main__':
args = parseme()
testcases_dict = testcases.evaluated_testcases(args.descfile,
args.binaries)
print(json.dumps(process(args.descfile,
args.binaries,
args.objects,
args.submissions,
args.header,
args.test),
sort_keys=True,
indent=4))
import os
import random
import re
from string import ascii_letters, digits
try:
import processing
except ModuleNotFoundError:
from util import processing
types = ('integer', 'unsigned_integer', 'character', 'string')
list_sep = '...'
re_task = re.compile(r'^-- (?P<title>.*)\n(USAGE: (?P<cmd>[\./\w]+) (?P<syntax>.*)|NO EXECUTABLE)', re.MULTILINE) # noqa: E501
re_args = re.compile(rf"<({'|'.join(types)}|{'|'.join(t + '_list' for t in types)})>") # noqa: E501
def call_function(name: str, *args, **kwargs):
return globals()[name](*args, **kwargs)
def integer(bounds=50):
return random.randint(-bounds, bounds)
def unsigned_integer(upper=50):
return random.randint(0, upper)
def character():
return random.choice(10 * ascii_letters + 2 * digits + '%*+,-./:?@[]^_{}~')
def string(lenght=31):
return ''.join(character() for i in range(2, 2 + unsigned_integer(lenght)))
def type_list(_type):
def generic_list():
return ' '.join(str(
call_function(_type)) for i in range(2, unsigned_integer(6) * 2))
return generic_list
def rubbish():
return str(call_function(
random.choice(tuple(t + '_list' for t in types) + types)))
def argument_generator(syntax):
syntax, _ = re.subn(
r'<([\w\s]+)> <\1> \.\.\. <\1> <\1>', r'<\1_list>', syntax)
syntax, _ = re.subn(r'<(\w+)\s(\w+)>', r'<\1_\2>', syntax)
return ' '.join(
str(call_function(arg)) for arg in re.findall(re_args, syntax))
def testcases_generator(task, n=10):
syntax = task.group('syntax')
if not syntax:
return
if syntax == 'NO INPUT':
yield 'NO INPUT'
return
for i in range(n):
yield argument_generator(syntax)
def testcases(description_path):
for t in types:
globals()[t + '_list'] = type_list(t) # I fucking love it
with open(description_path) as description_file:
description = description_file.read()
return {
task['title']: {
'cmd': task['cmd'],
'cases': [t for t in testcases_generator(task)]
} for task in re.finditer(re_task, description)
}
def evaluated_testcases(description_path, binaries):
task_testcases = testcases(description_path)
for task in filter(lambda t: t['cmd'], task_testcases.values()):
path_to_binary = os.path.join(os.path.join(
binaries, os.path.basename(task['cmd'])))
task['results'] = [processing.run_cmd(
f"{path_to_binary} {case}").stdout for case in task['cases']]
return task_testcases
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment