import abc import hashlib import json import logging import os import re import shutil import subprocess import tempfile from tqdm import tqdm try: import testcases except ModuleNotFoundError: from util import testcases log = logging.getLogger(__name__) def run_cmd(cmd, stdin=None, check=False, timeout=1): return subprocess.run( 'timeout 1 ' + cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, input=stdin, shell=True, check=check, encoding='utf-8', timeout=timeout ) def all_subclasses(cls): return cls.__subclasses__() \ + [g for s in cls.__subclasses__() for g in all_subclasses(s)] def sha1(submission_obj): return hashlib.sha1(submission_obj['code'].encode()).hexdigest() def get_submission_id(submission_obj): t = submission_obj['type'] m = re.search(r'(a0\d)', t) return m.group(0) class Test(metaclass=abc.ABCMeta): """docstring for IliasQuestion""" @classmethod def available_tests(cls): return {sub.__name__: sub for sub in all_subclasses(cls)} def __new__(cls, *args, **kwargs): assert hasattr(cls, 'depends'), "depends not defined" assert hasattr(cls, 'label_success'), "label_success not defined" assert hasattr(cls, 'label_failure'), "label_failure not defined" return super().__new__(cls) def __init__(self, submission_obj, **kwargs): if not self.dependencies_satisfied(submission_obj): self.result = False self.annotation = "TEST DEPENDENCY NOT MET" self.serialize(submission_obj) elif str(self) in submission_obj['tests']: self.deserialize(submission_obj['tests'][str(self)]) else: self.result, self.annotation = self.run_test(submission_obj) self.serialize(submission_obj) def __bool__(self): return self.result def __str__(self): return self.__class__.__name__ def dependencies_satisfied(self, submission_obj): return all(dep(submission_obj).result for dep in self.depends) def deserialize(self, test): self.result = test['label'] == self.label_success self.annotation = test['annotation'] def serialize(self, submission_obj): as_dict = { 'name': str(self), 'annotation': self.annotation } if self.result: as_dict['label'] = self.label_success else: as_dict['label'] = self.label_failure submission_obj['tests'][str(self)] = as_dict @abc.abstractmethod def run_test(self, submission_obj) -> (bool, str): return NotImplemented class EmptyTest(Test): """docstring for EmptyTest""" depends = () label_success = 'NOT_EMPTY' label_failure = 'EMPTY' def run_test(self, submission_obj): return bool(submission_obj['code'].strip()), "" class CompileTest(Test): depends = (EmptyTest, ) label_success = 'COMPILATION_SUCCESSFUL' label_failure = 'COMPILATION_FAILED' def run_test(self, submission_obj): ret = run_cmd( "gcc -Wall -c -x c -std=c11 -Icode-testing -o code.o -", submission_obj['code']) return not ret.returncode, ret.stderr class LinkTest(Test): depends = (CompileTest, ) label_success = 'LINKING_SUCCESSFUL' label_failure = 'LINKING_FAILED' def run_test(self, submission_obj): if submission_obj['type'] not in testcases_dict: return False, 'This program was not required to be executable.' cid = get_submission_id(submission_obj) ret = run_cmd(f"gcc-7 -o ./bin/{cid} objects/{cid}-testing.o code.o") return not ret.returncode, ret.stderr class UnitTestTest(Test): """docstring for UnitTestTest""" depends = (LinkTest, ) label_success = 'UNITTEST_SUCCSESSFUL' label_failure = 'UNITTEST_FAILED' @staticmethod def testcase(i, args, stdout, cid): try: ret = run_cmd("./bin/%s %s" % (cid, args), check=True, timeout=0.1) assert ret.stdout == stdout except AssertionError: return False, "Case #{}: [ASSERT FAIL] ./prog {:>2} WAS '{}' SHOULD '{}'".format( # noqa: E501 i, args, ret.stdout.strip(), stdout.strip()) except subprocess.CalledProcessError as err: return False, "Case #{:>2}: [FAILED] ./prog {} ERROR '{}'".format( i, args, err.stderr.strip()) except subprocess.TimeoutExpired: return False, "Case #{:>2}: [TIMEOUT] ./prog {}".format(i, args) else: return True, "Case #{:>2}: [SUCCESS] ./prog {}".format(i, args) def run_test(self, submission_obj): task = testcases_dict[submission_obj['type']] cid = get_submission_id(submission_obj) return_data = [self.testcase(i, case, result, cid) for i, (case, result) in enumerate(zip(task['cases'], task['results'])) ] results, messages = zip(*return_data) return all(results), '\n'.join(messages) def process(descfile, binaries, objects, submissions, header, highest_test): if isinstance(highest_test, str): highest_test_class = Test.available_tests()[highest_test] if highest_test != EmptyTest.__name__: # not needed for EmptyTest global testcases_dict testcases_dict = testcases.evaluated_testcases(descfile, binaries) with open(submissions) as submission_file: submissions_json = json.JSONDecoder().decode( submission_file.read()) # Get something disposable if highest_test != EmptyTest.__name__: path = tempfile.mkdtemp() run_cmd(f'cp -r {objects} {path}') run_cmd(f'cp -r {binaries} {path}') run_cmd(f'cp -r {header} {path}') os.chdir(path) os.makedirs('bin') def iterate_submissions(): yield from (obj for student in tqdm(submissions_json['students']) for obj in student['submissions']) for submission_obj in tqdm(iterate_submissions()): highest_test_class(submission_obj) if highest_test != EmptyTest.__name__: run_cmd('rm code*') print() # line after progress bar if highest_test != EmptyTest.__name__: shutil.rmtree(path) return submissions_json def parseme(): import argparse parser = argparse.ArgumentParser() parser.add_argument('descfile') parser.add_argument('binaries') parser.add_argument('objects') parser.add_argument('submissions') parser.add_argument('header') parser.add_argument('test') return parser.parse_args() if __name__ == '__main__': args = parseme() testcases_dict = testcases.evaluated_testcases(args.descfile, args.binaries) print(json.dumps(process(args.descfile, args.binaries, args.objects, args.submissions, args.header, args.test), sort_keys=True, indent=4))