Commit c6488195 authored by parciak's avatar parciak
Browse files

Added check endpoint including testing and a whole workflow test.


Signed-off-by: parciak's avatarMarcel Parciak <marcel.parciak@gmail.com>
parent 275618dd
......@@ -49,10 +49,7 @@ def aw_endpoint(payload: awmodels.RequestCommon, background_tasks: BackgroundTas
)
elif payload.method is not None and payload.method == "check":
return check(
payload=awmodels.RequestCheck.parse_obj(payload.dict()),
background_tasks=background_tasks,
)
return check(payload=awmodels.RequestCheck.parse_obj(payload.dict()),)
raise errors.ActiveWorkflowException(
name="Method Error",
......@@ -178,10 +175,10 @@ def run_annotation(
if not stores.upload_files(jsonld_files):
errmsg = f"Could not upload file annotations for files of archive {archive_id} to CouchDB."
error_log.error(errmsg)
utils.write_state_to_metafile("incomplete", metafile)
utils.write_state_to_metafile(utils.AnnotationState.incomplete, metafile)
return
utils.write_state_to_metafile("completed", metafile)
utils.write_state_to_metafile(utils.AnnotationState.completed, metafile)
@app.post(
......@@ -190,14 +187,46 @@ def run_annotation(
response_model_exclude_none=True,
status_code=fastapi.status.HTTP_200_OK,
)
def check(payload: awmodels.RequestCheck, background_tasks: BackgroundTasks):
def check(payload: awmodels.RequestCheck):
""" Checks on a running TOS Job or retrieves the output of a completed Job.
"""
access_log.info(f"{datetime.datetime.now().isoformat()} - Check: {payload.dict()}")
# TODO: implement check here!
pass
resp = awmodels.ResponseCheck()
for archive_id in payload.params.memory.archives:
state = utils.get_state_from_metafile(utils.get_path_for_archive(archive_id))
if not state:
error_log.warning(
f"An archive with id {archive_id} has not been found. Erasing from memory."
)
elif state["state"] == utils.AnnotationState.started:
resp.result.memory.archives.append(archive_id)
resp.result.logs.append(
f"Archive annotation ({archive_id}) in progress since {state['started_at']}."
)
elif state["state"] == utils.AnnotationState.completed:
msg = awmodels.MessageOutput(
archive_id=archive_id, couch_uri=f"", stats=state
)
resp.result.messages.append(msg)
utils.remove_metafile(utils.get_path_for_archive(archive_id))
elif state["state"] == utils.AnnotationState.incomplete:
resp.result.errors.append(
f"There was an error annotating archive {archive_id}. Please check the logs of the agent."
)
utils.remove_metafile(utils.get_path_for_archive(archive_id))
else:
error_log.warning(
f"Orphan annotation archive id ({archive_id}) found and removed."
)
return resp
@app.exception_handler(errors.ActiveWorkflowException)
......
......@@ -103,12 +103,23 @@ class MessageOutput(BaseModel):
archive_id: str = Field(None, example="a1b2c3d4e5f6")
couch_uri: str = Field(None, example="http://couch.example.org/database/something")
stats: Dict[str, Any] = Field(
{},
example={
"started_at": "2020-07-22T12:34:56",
"completed_at": "2020-07-23T23:45:01",
},
)
@staticmethod
def example() -> dict:
return {
"archive_id": "a1b2c3d4e5f6",
"couch_uri": "http://couch.example.org/database/something",
"stats": {
"started_at": "2020-07-22T12:34:56",
"completed_at": "2020-07-23T23:45:01",
},
}
......
from annotator.stores import cdstar
import io
import json
import os
import time
from typing import Any, Dict
import cloudant
import cloudant.database as CloudantDatabase
import cloudant.document as CloudantDocument
import pycdstar3
import pytest
from fastapi.testclient import TestClient
from annotator.main import app, endpoint_name
......@@ -9,6 +19,50 @@ from annotator import test_utils
client = TestClient(app)
@pytest.fixture(scope="function")
def cdstar_archive():
"""
Creates a cdstar Archive consisting of dummy data and uploads it to CDSTAR for testing.
"""
cdstar = pycdstar3.CDStar(
config.BasicSettings().cdstar_uri,
auth=(config.BasicSettings().cdstar_user, config.BasicSettings().cdstar_pass),
)
with cdstar.begin(autocommit=True):
vault = pycdstar3.CDStarVault(cdstar, config.BasicSettings().cdstar_vault)
archive = vault.new_archive()
for i in range(250):
dummy_file = {"some": "thing", "entity": i}
with io.StringIO() as dummy:
dummy.write(json.dumps(dummy_file, indent=2))
archive.file(f"file_{i}.json").put(source=dummy)
yield archive.id
delete_metadata(archive.id)
archive.delete()
def delete_metadata(archive_id: str):
"""
Deletes metadata from CouchDB that has been created using the agent.
"""
with cloudant.couchdb(
config.BasicSettings().couch_user,
config.BasicSettings().couch_pass,
url=config.BasicSettings().couch_uri,
user_basic_auth=True,
) as client:
db: CloudantDatabase.CouchDatabase = client[config.BasicSettings().couch_db]
wait_iterations = 3
while wait_iterations > 0 and archive_id not in db:
# wait for receive to finish in order to ensure no dead docs are created.
time.sleep(3)
archive: CloudantDocument.Document = db[archive_id]
for part in archive["hasPart"]:
f: CloudantDocument.Document = db[part["@id"].split("/")[-1]]
f.delete()
archive.delete()
def test_register_compliance_01():
"""
Tests if a register request yields a valid AW response.
......@@ -103,3 +157,49 @@ def disabled_test_check_compliance_02():
response_data = response.json()
assert response_data
assert test_utils.is_valid_response(response_data)
def test_complete_workflow(cdstar_archive):
"""
Perform a receive -> check cycle with some time in-between to test the whole workflow.
"""
receive_request = {
"method": "receive",
"params": {
"message": {"payload": {"archive_id": cdstar_archive}},
"options": {"annotation": {}},
"memory": {},
"credentials": [],
},
}
response = client.post(f"/{endpoint_name}", json=receive_request)
assert response.status_code >= 200 and response.status_code <= 299
json = response.json()
assert json
assert test_utils.is_valid_response(json)
assert json["result"]["memory"]["archives"]
assert json["result"]["memory"]["archives"][0] == cdstar_archive
# let some time go by for receive to finish
time.sleep(15)
check_request = {
"method": "check",
"params": {
"message": None,
"options": {},
"memory": {"archives": [cdstar_archive]},
"credentials": [],
},
}
response = client.post(f"/{endpoint_name}", json=check_request)
assert response.status_code >= 200 and response.status_code <= 299
response_json = response.json()
assert response_json
assert test_utils.is_valid_response(response_json)
# on success, respond with any kind of message to indicate an annotation run is done
assert len(response_json["result"]["messages"]) > 0
import datetime
import json
import os
import random
import string
......@@ -10,19 +12,25 @@ from fastapi.testclient import TestClient
from annotator.main import app, endpoint_name
from annotator import test_utils
from annotator import utils
client = TestClient(app)
@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def cdstar_archive():
"""
Fakes an annotated archive id, yielding the archive_id and cleaning it up afterwards if necessary.
"""
archive_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=12))
filepath = os.path.join(tempfile.gettempdir(), f"meta_{archive_id}")
archive_id = "".join(random.choices("abcdef" + string.digits, k=12))
filepath = os.path.join(tempfile.gettempdir(), f"meta_{archive_id}.json")
with open(filepath, "w") as metafile:
metafile.write("NONCE")
mocked_state = {
"state": utils.AnnotationState.completed,
"started_at": datetime.datetime.now().isoformat(),
"completed_at": datetime.datetime.now().isoformat(),
}
json.dump(mocked_state, metafile)
yield archive_id
# check should delete the filepath usually. This is a failure cleanup
......@@ -40,7 +48,7 @@ def test_check_01(cdstar_archive):
"params": {
"message": None,
"options": {},
"memory": {"annotations": [cdstar_archive]},
"memory": {"archives": [cdstar_archive]},
"credentials": [],
},
}
......@@ -51,7 +59,7 @@ def test_check_01(cdstar_archive):
assert response_json
assert test_utils.is_valid_response(response_json)
# on success, respond with any kind of message to indicate a annotation run is done
# on success, respond with any kind of message to indicate an annotation run is done
assert len(response_json["result"]["messages"]) > 0
# make sure that the written state file is erased
......@@ -69,7 +77,7 @@ def test_check_02():
"params": {
"message": None,
"options": {},
"memory": {"annotations": []},
"memory": {"archives": []},
"credentials": [],
},
}
......@@ -92,7 +100,7 @@ def test_check_03(cdstar_archive):
"params": {
"message": None,
"options": {},
"memory": {"annotations": [cdstar_archive]},
"memory": {"archives": [cdstar_archive]},
"credentials": [],
},
}
......@@ -120,7 +128,7 @@ def test_check_04():
"params": {
"message": None,
"options": {},
"memory": {"annotations": ["someReallyCoolStuffInHere"]},
"memory": {"archives": ["someReallyCoolStuffInHere"]},
"credentials": [],
},
}
......
......@@ -3,6 +3,7 @@ import io
import json
import os
import tempfile
import time
from typing import Any, Dict
import cloudant
......@@ -22,7 +23,7 @@ generated_files_count = 250
file_list_limit = 50
@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def cdstar_archive():
"""
Creates a cdstar Archive consisting of dummy data and uploads it to CDSTAR for testing.
......@@ -55,6 +56,10 @@ def delete_metadata(archive_id: str):
user_basic_auth=True,
) as client:
db: CloudantDatabase.CouchDatabase = client[config.BasicSettings().couch_db]
wait_iterations = 3
while wait_iterations > 0 and archive_id not in db:
# wait for receive to finish in order to ensure no dead docs are created.
time.sleep(3)
archive: CloudantDocument.Document = db[archive_id]
for part in archive["hasPart"]:
f: CloudantDocument.Document = db[part["@id"].split("/")[-1]]
......
import datetime
import enum
import json
import os
import sys
import tempfile
import traceback
from typing import Any, Dict, Optional
from annotator import error_log
class AnnotationState(str, enum.Enum):
started = "started"
completed = "completed"
incomplete = "incomplete"
def create_file_for_archive(archive_id: str) -> Optional[str]:
if annotation_in_progress(archive_id):
error_log.error(f"Archive {archive_id} is processing.")
return None
path = get_path_for_archive(archive_id)
if not write_state_to_metafile("started", path):
if not write_state_to_metafile(AnnotationState.started, path):
return None
return path
......@@ -65,7 +75,7 @@ def get_jsonld_reference(jsonld: Dict[str, Any]) -> Optional[Dict[str, Any]]:
return reference
def write_state_to_metafile(new_state: str, path: str) -> bool:
def write_state_to_metafile(new_state: AnnotationState, path: str) -> bool:
try:
with open(path, "r") as m_f:
state = json.load(m_f)
......@@ -73,13 +83,45 @@ def write_state_to_metafile(new_state: str, path: str) -> bool:
state = {}
state["state"] = new_state
if new_state in ["started", "completed"]:
if new_state in [AnnotationState.started, AnnotationState.completed]:
state[f"{new_state}_at"] = datetime.datetime.now().isoformat()
with open(path, "w+") as m_f:
json.dump(state, m_f)
"""
try:
with open(path, "w") as m_f:
with open(path, "w+") as m_f:
json.dump(state, m_f)
except:
error_log.error(f"Could not create or write to file: {path}.")
return False
"""
return True
def get_state_from_metafile(path: str) -> Optional[Dict[str, Any]]:
try:
with open(path, "r") as m_f:
state = json.load(m_f)
state["state"] = state["state"]
return state
except FileNotFoundError:
return None
def remove_metafile(path: str) -> bool:
try:
with open(path, "r") as m_f:
state = json.load(m_f)
state["state"] = state["state"]
if state["state"] == AnnotationState.started:
return False
os.unlink(path)
return True
except FileNotFoundError:
error_log.warning(f"File {path} could not be removed as it was not found.")
return True
return False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment