Skip to content
Snippets Groups Projects

Importer is no compatible with Rusty-Hektor 1.0.0

Merged robinwilliam.hundt requested to merge change-import-for-new-rusty-hektor-version into master
4 files
+ 107
717
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 106
377
import csv
import json
import json
import os
import os
import readline
import readline
from typing import Callable
import util
from django.db import transaction
from util.messages import warn
import util.processing
from core.models import ExamType, Feedback, Submission, SubmissionType, Test
from core.models import ExamType, Feedback, Submission, SubmissionType, Test
from core.models import UserAccount as User
from core.models import UserAccount as User
from util.factories import GradyUserFactory
from util.factories import GradyUserFactory
from util.messages import info, warn
 
import semver
 
WELCOME = r'''
WELCOME = r'''
______ __ ____ __
______ __ ____ __
@@ -22,14 +21,17 @@ WELCOME = r'''
@@ -22,14 +21,17 @@ WELCOME = r'''
'''
'''
HISTFILE = '.importer_history'
HISTFILE = '.importer_history'
RECORDS = '.importer'
PASSWORDS = '.importer_passwords'
PASSWORDS = '.importer_passwords'
YES = 'Y/n'
YES = 'Y/n'
NO = 'y/N'
NO = 'y/N'
 
RUSTY_HEKTOR_MIN_VER = ">=1.0.0"
 
RUSTY_HEKTOR_MAX_VER = "<2.0.0"
 
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
 
ORIGIN_ORDER = {
ORIGIN_ORDER = {
Feedback.WAS_EMPTY,
Feedback.WAS_EMPTY,
Feedback.DID_NOT_COMPILE,
Feedback.DID_NOT_COMPILE,
@@ -49,22 +51,48 @@ FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER))
@@ -49,22 +51,48 @@ FEEDBACK_MAPPER = dict(zip(TEST_ORDER, ORIGIN_ORDER))
user_factory = GradyUserFactory()
user_factory = GradyUserFactory()
class chdir_context(object):
def start():
"""
Step into a directory temporarily.
"""
def __init__(self, path):
if os.path.exists(HISTFILE):
self.old_dir = os.getcwd()
readline.read_history_file(HISTFILE)
self.new_dir = path
def __enter__(self):
print(WELCOME + '''
info(f'Changing to {self.new_dir}')
os.chdir(self.new_dir)
def __exit__(self, *args):
Welcome to the Grady import script!
os.chdir(self.old_dir)
info(f'Returned to {self.old_dir}')
This script aims at making the setup of the database as easy as possible.
 
At the same time it serves as a documentation on how data is imported into
 
Grady. Let\'s dive right in.\n''')
 
 
try:
 
print('The following sub importers are available:\n')
 
for fid, func in enumerate(call_order):
 
print(f'\t[{fid}] {func.__name__}')
 
print('\t[q] exit')
 
print()
 
 
fid = i('Choose a number or hit enter to start at the beginning')
 
 
if not fid:
 
for func in call_order:
 
func()
 
elif fid in ('q', 'quit', 'exit'):
 
return
 
elif not 0 <= int(fid) < len(call_order):
 
warn('There is no loader with this number')
 
else:
 
call_order[int(fid)]()
 
 
except (EOFError, KeyboardInterrupt):
 
print()
 
return
 
except FileNotFoundError:
 
raise
 
except Exception:
 
import traceback
 
traceback.print_exc()
 
finally:
 
readline.write_history_file(HISTFILE)
def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False):
def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = False):
@@ -86,332 +114,32 @@ def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = Fal
@@ -86,332 +114,32 @@ def i(prompt: str, default: str = '', is_path: bool = False, is_file: bool = Fal
return answer
return answer
def add_feedback_if_test_recommends_it(test_obj):
def load_hektor_json():
available_tests = util.processing.Test.available_tests()
file = i('Get me the file with the output from rusty-hektor',
if test_obj.label == available_tests[test_obj.name].label_failure \
and not hasattr(test_obj.submission, 'feedback') \
and (test_obj.name == util.processing.EmptyTest.__name__ or
test_obj.name == util.processing.CompileTest.__name__):
return Feedback.objects.update_or_create(
of_submission=test_obj.submission,
defaults={
'score': 0,
'origin': FEEDBACK_MAPPER[test_obj.name],
'is_final': True,
}
)
def add_tests(submission_obj, tests):
auto_correct, _ = User.objects.get_or_create(
username='auto_correct',
defaults={'is_active': False}
)
for name in (name for name in TEST_ORDER if name in tests):
test_data = tests[name]
test_obj, created = Test.objects.update_or_create(
name=test_data['name'],
submission=submission_obj,
defaults={
'label': test_data['label'],
'annotation': test_data['annotation'],
}
)
add_feedback_if_test_recommends_it(test_obj)
# submission_type is the name outputted by rust_hektor, type the one from hektor
def add_submission(student_obj, code, tests, submission_type=None, type=None):
if submission_type is None and type is None:
raise Exception("Submission need to contain submission_type or type")
elif type is not None:
submission_type = type
submission_type_obj = SubmissionType.objects.get(name=submission_type)
submission_obj, _ = Submission.objects.update_or_create(
type=submission_type_obj,
student=student_obj,
defaults={'text': code}
)
if tests:
add_tests(submission_obj, tests)
def call_loader(func: Callable) -> None:
""" This function handles if a function will be executed at all. Currently
it just checks in the RECORDS file for the name of the function. If it is
present the function will not be executed
Args:
func (Callable): the loader specified below
"""
if os.path.exists(RECORDS):
with open(RECORDS, 'r') as records_f:
done = [line.strip() for line in records_f]
if func.__name__ in done:
warn(f'{func.__name__} has already been processed once.')
if not i('Proceed anyway?', NO):
return
with transaction.atomic():
func() # This executes the specified loader
with open(RECORDS, 'a') as records_f:
records_f.write(func.__name__)
records_f.write('\n')
info(f'{func.__name__} is done.')
def file_suffix_to_lang_name(suffix: str) -> str:
suffix2name = {
'hs': 'haskell',
's': 'mipsasm',
'asm': 'mipsasm'
}
if suffix not in suffix2name:
return suffix
return suffix2name[suffix]
def do_load_submission_types():
print(
'''For the following import you need three files:
1) A .csv file where the columns are: id, name, score, (file suffix). No
suffix defaults to .c
Supported suffixes: .c , .java , .hs , .s (for mips)
2) A path to a directory where I can find sample solutions named
<id>-lsg.c
3) A path to a directory where I can find HTML files with an accurate
description of the task. File name pattern has to be: <id>.html
Example:
$ cat submission_types.csv
a01, Alpha Team, 10, .c
a02, Beta Distribution, 10, .java
a03, Gamma Ray, 20
$ tree -L 2
.
├── code-lsg
│ ├── a01.c
│ ├── a02.java
│ └── a03.hs
└── html
├── a01.html
├── a02.html
└── a03.html
''')
path = i('Where are your files located?', '.', is_path=True)
with chdir_context(path):
submission_types_csv = i('CSV file', 'submission_types.csv')
lsg_dir = i('solution dir', 'code-lsg')
desc_dir = i('descriptions dir', 'html')
with open(submission_types_csv, encoding='utf-8') as tfile:
csv_rows = [row for row in csv.reader(tfile) if len(row) > 0]
for row in csv_rows:
tid, name, score, *suffix = (col.strip() for col in row)
if not suffix:
suffix = '.c'
else:
suffix = suffix[0]
suffix = suffix.lower().strip('.')
lang_name = file_suffix_to_lang_name(suffix)
with \
open(os.path.join(lsg_dir, tid + '.' + suffix),
encoding='utf-8') as lsg, \
open(os.path.join(desc_dir, tid + '.html'),
encoding='utf-8') as desc:
data = {
'name': name,
'description': desc.read(),
'solution': lsg.read(),
'full_score': int(score),
'programming_language': lang_name
}
_, created = SubmissionType.objects.update_or_create(
name=name,
defaults=data
)
info(f'{"Created" if created else "Updated"} {name}')
def do_load_module_descriptions():
print('''
This loader imports descriptions of modules in an exam. This information
is used to distinguish students within one instance or give information
about the grading type.
CSV file format: module_reference, total_score, pass_score, pass_only
Example:
B.Inf.1801, 90, 45, yes
B.Mat.31415, 50, 10, no
''')
module_description_csv = i(
'Where is the file?', 'modules.csv', is_file=True)
with open(module_description_csv, encoding='utf-8') as tfile:
csv_rows = [row for row in csv.reader(tfile) if len(row) > 0]
for row in csv_rows:
data = {
field: kind(data) for field, kind, data in zip(
('module_reference', 'total_score', 'pass_score', 'pass_only'),
(str, int, int, lambda x: x == 'yes'),
(col.strip() for col in row)
)
}
_, created = ExamType.objects.update_or_create(
module_reference=data['module_reference'],
defaults=data,
)
modification = "Created" if created else "Updated"
info(f'{modification} ExamType {data["module_reference"]}')
def _do_check_empty_submissions():
submissions = i(
'Please provide the student submissions', 'binf1601-anon.json',
is_file=True)
return (
util.processing.process('', '', '', submissions, '', util.processing.EmptyTest.__name__),
submissions)
def _do_preprocess_c_submissions(test_to_run):
location = i('Where do you keep the specifications for the tests?',
'anon-export', is_path=True)
with chdir_context(location):
descfile = i(
'Please provide usage for sample solution', 'descfile.txt',
is_file=True)
binaries = i(
'Please provide executable binaries of solution', 'bin',
is_path=True)
objects = i(
'Please provide object files of solution', 'objects',
is_path=True)
submissions = i(
'Please provide the student submissions', 'binf1601-anon.json',
is_file=True)
headers = i(
'Please provide header files if any', 'code-testing',
is_path=True)
info('Looks good. The tests mights take some time.')
return util.processing.process(descfile,
binaries,
objects,
submissions,
headers,
test_to_run), submissions
def do_preprocess_submissions():
print('''
Preprocessing might take some time depending on the amount of data
and the complexity of the programs and the corresponding unit tests. You
can specify what test you want to run.
Tests do depend on each other. Therefore specifying a test will also
result in running all its dependencies.
The EmptyTest can be run on all submission types. The other tests are very specific
to the c programming course.
\n''')
test_enum = dict(enumerate(util.processing.Test.available_tests()))
print('The following test are available:\n')
print('\t[q] Do nothing')
for j, test in test_enum.items():
print(f'\t[{j}] {test}')
print()
test_index = i('Which tests do you want to run?')
if not test_index or test_index == 'q':
return
test_to_run = test_enum[int(test_index)]
# processed_submissions = None
if test_to_run == util.processing.EmptyTest.__name__:
processed_submissions, submissions = _do_check_empty_submissions()
else:
processed_submissions, submissions = _do_preprocess_c_submissions(test_to_run)
output_f = i('And everything is done. Where should I put the results?',
f'{submissions.rsplit(".")[0]}.processed.json')
with open(output_f, 'w+') as outfile:
json.dump(processed_submissions, outfile,
sort_keys=True, indent=4)
info('Wrote processed data to %s' % os.path.join(os.curdir, output_f))
def do_load_submissions():
file = i('Get me the file with all the submissions',
'submissions.json', is_file=True)
'submissions.json', is_file=True)
if not ExamType.objects.all():
with open(file, 'r') as f:
raise Exception('Modules need to be loaded before submissions.')
exam_data = json.JSONDecoder().decode(f.read())
else:
exam_query_set = ExamType.objects.all()
hektor_version = exam_data['meta']['version']
print('Please select the corresponding module')
if not (semver.match(hektor_version, RUSTY_HEKTOR_MIN_VER) and
print('You have the following choices:\n')
semver.match(hektor_version, RUSTY_HEKTOR_MAX_VER)):
for j, exam_type in enumerate(exam_query_set):
warn(f'The data you\'re trying to import has the wrong version {hektor_version}\n'
print(f'\t[{j}] {exam_type.module_reference}')
f'Requirements: {RUSTY_HEKTOR_MIN_VER}, {RUSTY_HEKTOR_MAX_VER}')
print()
exam_prompt_key = i('Choose wisely')
exam, _ = ExamType.objects.get_or_create(**exam_data['module'])
exam_obj = {'exam': exam_query_set[int(exam_prompt_key)]}
with open(file) as exam_data_file:
for submission_type in exam['submission_types']:
exam_data = json.JSONDecoder().decode(exam_data_file.read())
SubmissionType.objects.get_or_create(**submission_type)
for student in exam_data['students']:
for student in exam_data['students']:
student_obj = user_factory.make_student(**exam_obj,
student_obj = user_factory.make_student(exam=exam,
**student).student
**student).student
for submission_obj in student['submissions']:
for submission_obj in student['submissions']:
add_submission(student_obj, **submission_obj)
add_submission(student_obj, **submission_obj)
def do_load_tutors():
def load_reviewers():
print('Please import tutor users by providing one name per line')
tutors = i('List of tutors', 'tutors', is_file=True)
with open(tutors) as tutors_f:
for tutor in tutors_f:
if len(tutor.strip()) > 0:
user_factory.make_tutor(tutor.strip(), store_pw=True)
def do_load_reviewer():
print('Please import reviewer users by providing one name per line')
print('Please import reviewer users by providing one name per line')
reviewers = i('List of reviewers', 'reviewers', is_file=True)
reviewers = i('List of reviewers', 'reviewers', is_file=True)
@@ -422,55 +150,56 @@ def do_load_reviewer():
@@ -422,55 +150,56 @@ def do_load_reviewer():
store_pw=True)
store_pw=True)
call_order = (
def add_submission(student_obj, code, tests, type=None):
do_load_submission_types,
submission_type_obj = SubmissionType.objects.get(name=type)
do_load_module_descriptions,
do_preprocess_submissions,
do_load_submissions,
do_load_tutors,
do_load_reviewer
)
 
submission_obj, _ = Submission.objects.update_or_create(
 
type=submission_type_obj,
 
student=student_obj,
 
defaults={'text': code}
 
)
def start():
if tests:
 
add_tests(submission_obj, tests)
if os.path.exists(HISTFILE):
readline.read_history_file(HISTFILE)
print(WELCOME + '''
def add_tests(submission_obj, tests):
 
auto_correct, _ = User.objects.get_or_create(
 
username='auto_correct',
 
defaults={'is_active': False}
 
)
Welcome to the Grady import script!
for name in (name for name in TEST_ORDER if name in tests):
 
test_data = tests[name]
 
test_obj, created = Test.objects.update_or_create(
 
name=test_data['name'],
 
submission=submission_obj,
 
defaults={
 
'label': test_data['label'],
 
'annotation': test_data['annotation'],
 
}
 
)
 
add_feedback_if_test_recommends_it(test_obj)
This script aims at making the setup of the database as easy as possible.
At the same time it serves as a documentation on how data is imported into
Grady. Let\'s dive right in.\n''')
try:
def add_feedback_if_test_recommends_it(test_obj):
print('The following sub importers are available:\n')
available_tests = util.processing.Test.available_tests()
for fid, func in enumerate(call_order):
print(f'\t[{fid}] {func.__name__}')
print('\t[q] exit')
print()
fid = i('Choose a number or hit enter to start at the beginning')
if test_obj.label == available_tests[test_obj.name].label_failure \
 
and not hasattr(test_obj.submission, 'feedback') \
 
and (test_obj.name == util.processing.EmptyTest.__name__ or
 
test_obj.name == util.processing.CompileTest.__name__):
 
return Feedback.objects.update_or_create(
 
of_submission=test_obj.submission,
 
defaults={
 
'score': 0,
 
'origin': FEEDBACK_MAPPER[test_obj.name],
 
'is_final': True,
 
}
 
)
if not fid:
for func in call_order:
call_loader(func)
elif fid in ('q', 'quit', 'exit'):
return
elif not 0 <= int(fid) < len(call_order):
warn('There is no loader with this number')
else:
call_loader(call_order[int(fid)])
except (EOFError, KeyboardInterrupt):
call_order = [
print()
load_hektor_json,
return
load_reviewers
except FileNotFoundError:
]
raise
except Exception:
import traceback
traceback.print_exc()
finally:
readline.write_history_file(HISTFILE)
Loading