Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
"""
This module handles all operations that deal with runs.
"""
import json
from datetime import datetime
from re import compile, Pattern
from bson import json_util
from pymongo import collection
from model import Model
from commons import remove_mongodb_id_from_result
from gt import get_all_gt
from workflows import get_all_workflows
def get_all_runs(coll: collection.Collection):
"""
Returns all runs available in the DB, regardless of
GT and workflow.
"""
cursor = coll.find({'eval_workflow_id': {'$exists': True}})
json_data = json.loads(json_util.dumps(cursor))
return remove_mongodb_id_from_result(json_data, 'run')
def get_all_latest_runs(coll: collection.Collection):
"""
Returns latest runs available in the DB, regardless of
GT and workflow.
"""
# get all gts
all_gts = get_all_gt(coll)
all_gt_ids = extract_ids(all_gts)
# get all workflows
all_workflows = get_all_workflows(coll)
all_workflow_ids = extract_ids(all_workflows)
# for each gt and each workflow, find the respective latest run
latest_runs = []
for gt in all_gt_ids:
for wf in all_workflow_ids:
latest_runs.append(get_latest_runs(coll, wf, gt))
return latest_runs
def extract_ids(objects: list) -> list:
"""
Returns the values of the resp. id properties of a list
of JSON objects.
"""
result = []
for obj in objects:
result.append(obj['id'])
return result
def get_all_runs_by_gt(coll: collection.Collection,
gt_id: str,
start_date: str | None = None,
end_date: str | None = None) -> list:
"""
Returns evalutation results for all Quiver workspaces with a
given GT
Args:
- gt_id (id): The ID of the GT data used for a run
"""
gt_regex = compile(gt_id)
if start_date and end_date:
json_data = find_results_within_date_range(coll, gt_regex, start_date, end_date)
else:
cursor = coll.find({'metadata.gt_workspace.id': gt_regex})
json_data = json.loads(json_util.dumps(cursor))
return remove_mongodb_id_from_result(json_data, 'run')
def get_all_runs_by_gt_and_wf(coll: collection.Collection,
workflow_id: str,
gt_id: str,
start_date: str | None = None,
end_date: str | None = None) -> list:
"""
Returns evalutation results for all Quiver workspaces with a
given workflow and GT
Args:
- workflow_id (str): The ID of the workflow used for a run
- gt_id (str): The ID of the GT data used for a run
"""
gt_regex = compile(gt_id)
wf_regex = compile(workflow_id)
if start_date and end_date:
json_data = find_results_within_date_range(coll, gt_regex, start_date, end_date, wf_regex)
else:
cursor = coll.find({'$and': [{'metadata.gt_workspace.id': gt_regex},
{'metadata.ocr_workflow.id': wf_regex}]})
json_data = json.loads(json_util.dumps(cursor))
return remove_mongodb_id_from_result(json_data, 'run')
def get_latest_runs(coll: collection.Collection,
workflow_id: str,
gt_id: str) -> list:
"""
Returns evalutation results for the latest Quiver workspace with a
given workflow and GT
Args:
- workflow_id (str): The ID of the workflow used for a run
- gt_id (id): The ID of the GT data used for a run
"""
wf_regex = compile(workflow_id)
gt_regex = compile(gt_id)
all_runs = coll.find({'$and': [{'metadata.gt_workspace.id': gt_regex},
{'metadata.ocr_workflow.id': wf_regex}]})
runs_json = json.loads(json_util.dumps(all_runs))
closest_timestamp = find_timestamp_closest_to_today(runs_json)
latest_run = coll.find({'$and': [{'metadata.gt_workspace.id': gt_regex},
{'metadata.ocr_workflow.id': wf_regex},
{'metadata.timestamp': compile(closest_timestamp)}]})
latest_run_json = json.loads(json_util.dumps(latest_run))
return remove_mongodb_id_from_result(latest_run_json, 'run')
def get_latest_runs_per_gt(coll: collection.Collection,
gt_id: str) -> list:
"""
Returns evalutation results for the latest Quiver workspace with a
given GT
Args:
- gt_id (id): The ID of the GT data used for a run
"""
gt_regex = compile(gt_id)
all_runs = coll.find({'metadata.gt_workspace.id': gt_regex})
runs_json = json.loads(json_util.dumps(all_runs))
closest_timestamp = find_timestamp_closest_to_today(runs_json)
latest_run = coll.find({'$and': [{'metadata.gt_workspace.id': gt_regex},
{'metadata.timestamp': compile(closest_timestamp)}]})
latest_run_json = json.loads(json_util.dumps(latest_run))
return remove_mongodb_id_from_result(latest_run_json, 'run')
def post_new_result(coll: collection.Collection,
data: Model):
"""
Posts information about a new evaluation workspace to the database.
Args:
- data (Model): information about the evaluation workspace
Return
- bool: True if POST was successful
"""
result = coll.insert_one(data.model_dump())
return str(result.acknowledged)
def find_timestamp_closest_to_today(run_json: list) -> str:
"""
Finds the timestamp in MongoDB that is closest to today's date.
"""
# get all the dates
timestamps_dates = get_timestamps(run_json)
# find out which is the latest one
current_date = datetime.today()
closest_date = min(timestamps_dates, key=lambda d: abs(d - current_date))
return datetime.strftime(closest_date, '%Y-%m-%d')
def get_timestamps(run_json: list) -> list:
"""
Returns a list with all timestamps for a list of MongoDB
query results.
"""
timestamps_dates = []
for entry in run_json:
stamp = entry['metadata']['timestamp'].split('T')[0]
stamp_datetime = datetime.strptime(stamp, '%Y-%m-%d')
timestamps_dates.append(stamp_datetime)
return timestamps_dates
def find_dates_within_range(timestamps: list,
start_datetime: datetime,
end_datetime: datetime) -> list:
"""
Returns a list of all timestamps that are within a given rande.
Args:
- timestamps (list): a list of timestamps of the queried runs
- start_datetime (datetime): the lower bound of the range
- end_datetime (datetime): the upper bound of the range
"""
relevant_dates = []
for stamp in timestamps:
if start_datetime <= stamp <= end_datetime:
relevant_dates.append(stamp)
return relevant_dates
def get_results_within_date_range(json_data_tmp, relevant_dates: list) -> list:
"""
Returns all runs that have a timestamp that is within a given range.
"""
json_data = []
for entry in json_data_tmp:
stamp = entry['metadata']['timestamp'].split('T')[0]
stamp_datetime = datetime.strptime(stamp, '%Y-%m-%d')
if stamp_datetime in relevant_dates:
json_data.append(entry)
return json_data
def find_results_within_date_range(coll: collection.Collection,
gt_regex: Pattern,
start_date: str,
end_date: str,
wf_regex: Pattern | None = None) -> list:
"""
Returns all runs that match a given GT, a given workspace (optional)
and a given date filter.
"""
start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
end_datetime = datetime.strptime(end_date, '%Y-%m-%d')
if wf_regex:
cursor = coll.find({'$and': [{'metadata.gt_workspace.id': gt_regex},
{'metadata.ocr_workflow.id': wf_regex}]})
else:
cursor = coll.find({'metadata.gt_workspace.id': gt_regex})
json_data_tmp = json.loads(json_util.dumps(cursor))
timestamps_dates = get_timestamps(json_data_tmp)
relevant_dates = find_dates_within_range(timestamps_dates, start_datetime, end_datetime)
return get_results_within_date_range(json_data_tmp, relevant_dates)