Skip to content
Snippets Groups Projects 8.43 KiB
Newer Older
Michelle Weidling's avatar
Michelle Weidling committed
This module handles all operations that deal with runs.

import json
from datetime import datetime
from re import compile, Pattern
from bson import json_util
from pymongo import collection
from model import Model
from commons import remove_mongodb_id_from_result, remove_key_from_dict
Michelle Weidling's avatar
Michelle Weidling committed

from gt import get_all_gt
from workflows import get_all_workflows

def get_all_runs(coll: collection.Collection):
    Returns all runs available in the DB, regardless of
    GT and workflow.
    cursor = coll.find({'eval_workflow_id': {'$exists': True}})
    json_data = json.loads(json_util.dumps(cursor))
    purged = remove_key_from_dict(json_data, 'cpu')
    return remove_mongodb_id_from_result(purged, 'run')
Michelle Weidling's avatar
Michelle Weidling committed

def get_all_latest_runs(coll: collection.Collection):
    Returns latest runs available in the DB, regardless of
    GT and workflow.
    # get all gts
    all_gts = get_all_gt(coll)
    all_gt_ids = extract_ids(all_gts)
    # get all workflows
    all_workflows = get_all_workflows(coll)
    all_workflow_ids = extract_ids(all_workflows)
    # for each gt and each workflow, find the respective latest run
    latest_runs = []
    for gt in all_gt_ids:
        for wf in all_workflow_ids:
            latest_runs.append(get_latest_runs(coll, wf, gt))
    return latest_runs

def extract_ids(objects: list) -> list:
    Returns the values of the resp. id properties of a list
    of JSON objects.
    result = []
    for obj in objects:

    return result

def get_all_runs_by_gt(coll: collection.Collection,
                       gt_id: str,
                       start_date: str | None = None,
                       end_date: str | None = None) -> list:
    Returns evalutation results for all Quiver workspaces with a 
    given GT

        - gt_id (id): The ID of the GT data used for a run 
    gt_regex = compile(gt_id)
    if start_date and end_date:
        json_data = find_results_within_date_range(coll, gt_regex, start_date, end_date)
        cursor = coll.find({'': gt_regex})
        json_data = json.loads(json_util.dumps(cursor))

    purged = remove_key_from_dict(json_data, 'cpu')

    return remove_mongodb_id_from_result(purged, 'run')
Michelle Weidling's avatar
Michelle Weidling committed

def get_all_runs_by_gt_and_wf(coll: collection.Collection,
                              workflow_id: str,
                              gt_id: str,
                              start_date: str | None = None,
                              end_date: str | None = None) -> list:
    Returns evalutation results for all Quiver workspaces with a 
    given workflow and GT

        - workflow_id (str): The ID of the workflow used for a run
        - gt_id (str): The ID of the GT data used for a run 
    gt_regex = compile(gt_id)
    wf_regex = compile(workflow_id)
    if start_date and end_date:
        json_data = find_results_within_date_range(coll, gt_regex, start_date, end_date, wf_regex)
        cursor = coll.find({'$and': [{'': gt_regex},
                                    {'': wf_regex}]})
        json_data = json.loads(json_util.dumps(cursor))
    purged = remove_key_from_dict(json_data, 'cpu')

    return remove_mongodb_id_from_result(purged, 'run')
Michelle Weidling's avatar
Michelle Weidling committed

def get_latest_runs(coll: collection.Collection,
                    workflow_id: str,
                    gt_id: str) -> list:
    Returns evalutation results for the latest Quiver workspace with a 
    given workflow and GT

        - workflow_id (str): The ID of the workflow used for a run
        - gt_id (id): The ID of the GT data used for a run 
    wf_regex = compile(workflow_id)
    gt_regex = compile(gt_id)
    all_runs = coll.find({'$and': [{'': gt_regex},
                                 {'': wf_regex}]})
    runs_json = json.loads(json_util.dumps(all_runs))
    closest_timestamp = find_timestamp_closest_to_today(runs_json)
    latest_run = coll.find({'$and': [{'': gt_regex},
                                 {'': wf_regex},
                                 {'metadata.timestamp': compile(closest_timestamp)}]})
    latest_run_json = json.loads(json_util.dumps(latest_run))
    purged = remove_key_from_dict(latest_run_json, 'cpu')

    return remove_mongodb_id_from_result(purged, 'run')
Michelle Weidling's avatar
Michelle Weidling committed

def get_latest_runs_per_gt(coll: collection.Collection,
                           gt_id: str) -> list:
    Returns evalutation results for the latest Quiver workspace with a 
    given GT

        - gt_id (id): The ID of the GT data used for a run 
    gt_regex = compile(gt_id)
    all_runs = coll.find({'': gt_regex})
    runs_json = json.loads(json_util.dumps(all_runs))
    closest_timestamp = find_timestamp_closest_to_today(runs_json)
    latest_run = coll.find({'$and': [{'': gt_regex},
                                 {'metadata.timestamp': compile(closest_timestamp)}]})
    latest_run_json = json.loads(json_util.dumps(latest_run))
    return remove_mongodb_id_from_result(latest_run_json, 'run')

def post_new_result(coll: collection.Collection,
                    data: Model):
    Posts information about a new evaluation workspace to the database.

        - data (Model): information about the evaluation workspace
        - bool: True if POST was successful
    result = coll.insert_one(data.model_dump())
    return str(result.acknowledged)

def find_timestamp_closest_to_today(run_json: list) -> str:
    Finds the timestamp in MongoDB that is closest to today's date.
    # get all the dates
    timestamps_dates = get_timestamps(run_json)
    # find out which is the latest one
    current_date =
    closest_date = min(timestamps_dates, key=lambda d: abs(d - current_date))
    return datetime.strftime(closest_date, '%Y-%m-%d')

def get_timestamps(run_json: list) -> list:
    Returns a list with all timestamps for a list of MongoDB
    query results.
    timestamps_dates = []
    for entry in run_json:
        stamp = entry['metadata']['timestamp'].split('T')[0]
        stamp_datetime = datetime.strptime(stamp, '%Y-%m-%d')
    return timestamps_dates

def find_dates_within_range(timestamps: list,
                            start_datetime: datetime,
                            end_datetime: datetime) -> list:
    Returns a list of all timestamps that are within a given rande.
        - timestamps (list): a list of timestamps of the queried runs
        - start_datetime (datetime): the lower bound of the range
        - end_datetime (datetime): the upper bound of the range
    relevant_dates = []
    for stamp in timestamps:
        if start_datetime <= stamp <= end_datetime:
    return relevant_dates

def get_results_within_date_range(json_data_tmp, relevant_dates: list) -> list:
    Returns all runs that have a timestamp that is within a given range.
    json_data = []
    for entry in json_data_tmp:
        stamp = entry['metadata']['timestamp'].split('T')[0]
        stamp_datetime = datetime.strptime(stamp, '%Y-%m-%d')
        if stamp_datetime in relevant_dates:
    return json_data

def find_results_within_date_range(coll: collection.Collection,
                                   gt_regex: Pattern,
                                   start_date: str,
                                   end_date: str,
                                   wf_regex: Pattern | None = None) -> list:
    Returns all runs that match a given GT, a given workspace (optional)
    and a given date filter.
    start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
    end_datetime = datetime.strptime(end_date, '%Y-%m-%d')

    if wf_regex:
        cursor = coll.find({'$and': [{'': gt_regex},
                                    {'': wf_regex}]})
        cursor = coll.find({'': gt_regex})
    json_data_tmp = json.loads(json_util.dumps(cursor))
    timestamps_dates = get_timestamps(json_data_tmp)

    relevant_dates = find_dates_within_range(timestamps_dates, start_datetime, end_datetime)

    return get_results_within_date_range(json_data_tmp, relevant_dates)