Commit 845df5b1 authored by parciak's avatar parciak
Browse files

Added modifiable CDSTAR vault id as well as CouchDB database name using options or receive-payload.


Signed-off-by: parciak's avatarMarcel Parciak <marcel.parciak@gmail.com>
parent b85f288f
......@@ -11,7 +11,7 @@ class BasicSettings(BaseSettings):
cdstar_uri: str = "http://localhost:8082/v3"
cdstar_user: str = "medic1"
cdstar_pass: str = "medIC!umG!9"
cdstar_vault: str = "medic"
vault_id: str = "medic"
couch_uri: str = "http://localhost:5984"
couch_user: str = "medic"
couch_pass: str = "medic2020"
......
class ActiveWorkflowException(Exception):
class ActiveWorkflowError(Exception):
def __init__(self, name: str, message: str):
self.name = name
self.message = message
class ConfigurationError(Exception):
def __init__(self, message: str):
self.name = "Agent Configuation Error"
self.message = message
from annotator.errors import ActiveWorkflowError, ConfigurationError
import datetime
import json
import os
import time
from typing import Any, Dict
from typing import Any, Dict, Optional, Union
from annotator import access_log, error_log
from annotator import config
......@@ -51,7 +50,7 @@ def aw_endpoint(payload: awmodels.RequestCommon, background_tasks: BackgroundTas
elif payload.method is not None and payload.method == "check":
return check(payload=awmodels.RequestCheck.parse_obj(payload.dict()),)
raise errors.ActiveWorkflowException(
raise errors.ActiveWorkflowError(
name="Method Error",
message=f"You did not supply a valid method. Valid methods are: register, receive, check",
)
......@@ -98,13 +97,26 @@ def receive(payload: awmodels.RequestReceive, background_tasks: BackgroundTasks)
response.result.memory.archives += payload.params.memory.archives
archive_id = payload.params.message.payload.archive_id
if not stores.is_valid_archive(archive_id):
vault_id = get_setting_from_payload(payload, "vault_id")
if not vault_id:
# real error please
raise errors.ConfigurationError(
"There is an configuration error regarding the chosen CDSTAR vault. Please check the configuration of the agent."
)
couch_db = get_setting_from_payload(payload, "couch_db")
if not couch_db:
# real error please
raise errors.ConfigurationError(
"There is an configuration error regarding the chosen CouchDB database. Please check the configuraton of the agent."
)
if not stores.is_valid_archive(vault_id, archive_id):
response.result.errors.append(
f"Archive {archive_id} is not available in CDSTAR."
f"Archive {archive_id} is not available in CDSTAR vault {vault_id}."
)
return response
if utils.annotation_in_progress(archive_id):
if utils.annotation_in_progress(vault_id, archive_id):
response.result.logs.append(
f"Archive {archive_id} is currently being processed."
)
......@@ -118,34 +130,40 @@ def receive(payload: awmodels.RequestReceive, background_tasks: BackgroundTasks)
for k, v in payload.params.message.payload.annotations_file.items():
annotations_file[k] = v
metafile = utils.create_file_for_archive(archive_id)
metafile = utils.create_file_for_archive(vault_id, archive_id)
if not metafile:
# TODO: real error please
raise errors.ActiveWorkflowException(
"Another Error needed here", "Something does not work"
raise errors.ConfigurationError(
"There is something wrong with the configured temporary directory to save the annotation state. Please check the configuration of the agent."
)
if payload.params.message.payload.couch_db:
couch_db = payload.params.message.payload.couch_db
background_tasks.add_task(
run_annotation,
archive_id=archive_id,
vault_id=vault_id,
annotations_archive=annotations_archive,
annotations_file=annotations_file,
couch_db=couch_db,
metafile=metafile,
)
response.result.logs.append(f"Starting annotation of CDSTAR archive {archive_id}")
response.result.memory.archives.append(archive_id)
response.result.memory.archives.append((vault_id, archive_id))
return response
def run_annotation(
archive_id: str,
vault_id: str,
annotations_archive: Dict[str, Any],
annotations_file: Dict[str, Any],
couch_db: str,
metafile: str,
) -> None:
archive, filelist = stores.get_cdstar_metadata(archive_id)
archive, filelist = stores.get_cdstar_metadata(vault_id, archive_id)
if not archive:
errmsg = f"Not able to retrieve archive {archive_id} in BackgroundTask. How is that possible?"
error_log.error(errmsg)
......@@ -164,7 +182,7 @@ def run_annotation(
jsonld_archive["hasPart"].append(utils.get_jsonld_reference(meta_file))
meta_file["isPartOf"] = utils.get_jsonld_reference(jsonld_archive)
if not stores.upload_archive(jsonld_archive):
if not stores.upload_archive(jsonld_archive, couch_db):
errmsg = (
f"Could not upload archive annotations for archive {archive_id} to CouchDB."
)
......@@ -172,7 +190,7 @@ def run_annotation(
os.unlink(metafile)
return
if not stores.upload_files(jsonld_files):
if not stores.upload_files(jsonld_files, couch_db):
errmsg = f"Could not upload file annotations for files of archive {archive_id} to CouchDB."
error_log.error(errmsg)
utils.write_state_to_metafile(utils.AnnotationState.incomplete, metafile)
......@@ -195,15 +213,16 @@ def check(payload: awmodels.RequestCheck):
resp = awmodels.ResponseCheck()
for archive_id in payload.params.memory.archives:
state = utils.get_state_from_metafile(utils.get_path_for_archive(archive_id))
for vault_id, archive_id in payload.params.memory.archives:
metafile = utils.get_path_for_archive(vault_id, archive_id)
state = utils.get_state_from_metafile(metafile)
if not state:
error_log.warning(
f"An archive with id {archive_id} has not been found. Erasing from memory."
)
elif state["state"] == utils.AnnotationState.started:
resp.result.memory.archives.append(archive_id)
resp.result.memory.archives.append((vault_id, archive_id))
resp.result.logs.append(
f"Archive annotation ({archive_id}) in progress since {state['started_at']}."
)
......@@ -213,13 +232,13 @@ def check(payload: awmodels.RequestCheck):
archive_id=archive_id, couch_uri=f"", stats=state
)
resp.result.messages.append(msg)
utils.remove_metafile(utils.get_path_for_archive(archive_id))
utils.remove_metafile(metafile)
elif state["state"] == utils.AnnotationState.incomplete:
resp.result.errors.append(
f"There was an error annotating archive {archive_id}. Please check the logs of the agent."
)
utils.remove_metafile(utils.get_path_for_archive(archive_id))
utils.remove_metafile(metafile)
else:
error_log.warning(
......@@ -229,8 +248,35 @@ def check(payload: awmodels.RequestCheck):
return resp
@app.exception_handler(errors.ActiveWorkflowException)
async def aw_exception_handler(request: Request, exc: errors.ActiveWorkflowException):
def get_setting_from_payload(
payload: Union[awmodels.RequestReceive, awmodels.RequestCheck], key: str
) -> Optional[str]:
msg_payload = payload.params.message.payload.dict()
if key in msg_payload.keys() and msg_payload[key]:
return msg_payload[key]
opt_payload = payload.params.options.dict()
if key in opt_payload.keys() and opt_payload[key]:
return opt_payload[key]
cfg = config.BasicSettings().dict()
if key in cfg.keys() and cfg[key]:
return cfg[key]
return None
@app.exception_handler(errors.ConfigurationError)
async def config_exception_handler(request: Request, exc: errors.ConfigurationError):
error_log.error(f"{exc.name}: {exc.message}")
resp = awmodels.ResponseCheck()
resp.result.errors.append(f"{exc.name}: {exc.message}")
return fastapi.responses.JSONResponse(
status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
content=resp.dict(exclude_none=True),
)
@app.exception_handler(errors.ActiveWorkflowError)
async def aw_exception_handler(request: Request, exc: errors.ConfigurationError):
error_log.error(f"{exc.name}: {exc.message}")
resp = awmodels.ResponseCheck()
resp.result.errors.append(f"{exc.name}: {exc.message}")
......
import abc
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel, Field, validator
......@@ -75,6 +75,8 @@ class PayloadInput(BaseModel):
"""
archive_id: str = Field(..., example="a1b2c3d4e5f6")
vault_id: str = Field(None, example="medic")
couch_db: str = Field(None, example="medic")
annotations_archive: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
......@@ -82,11 +84,6 @@ class PayloadInput(BaseModel):
{}, example={"id": "something", "name": "Some Thing"}
)
# TODO: for now, I will use dict as the annotation basis
# annotations: MetadataBase = Field(
# None, example=MetadataBase(id="something", name="Some Thing")
# )
@staticmethod
def example() -> Dict[str, Any]:
return {
......@@ -102,6 +99,7 @@ class MessageOutput(BaseModel):
"""
archive_id: str = Field(None, example="a1b2c3d4e5f6")
vault_id: str = Field(None, example="medic")
couch_uri: str = Field(None, example="http://couch.example.org/database/something")
stats: Dict[str, Any] = Field(
{},
......@@ -128,6 +126,8 @@ class OptionsCommon(BaseModel):
This model represents expected options to configure this agent (or rather a call to this agent).
"""
vault_id: str = Field(None, example="medic")
couch_db: str = Field(None, example="medic")
annotations_archive: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
......@@ -135,11 +135,6 @@ class OptionsCommon(BaseModel):
{}, example={"id": "something", "name": "Some Thing"}
)
# TODO: for now, I will use dicts as metadata
# annotations: MetadataBase = Field(
# None, example=MetadataBase(id="something", name="Some Thing")
# )
@staticmethod
def example() -> Dict[str, Any]:
return {
......@@ -153,11 +148,11 @@ class MemoryCommon(BaseModel):
This model represents the expected memory content to communicate state of the agent.
"""
archives: List[str] = Field([], example=["a1b2c3d4e5f6"])
archives: List[Tuple[str, str]] = Field([], example=[("medic", "a1b2c3d4e5f6")])
@staticmethod
def example() -> dict:
return {"archives": ["a1b2c3d4e5f6"]}
return {"archives": [("medic", "a1b2c3d4e5f6")]}
class CredentialsCommon(BaseModel):
......
......@@ -17,14 +17,14 @@ cdstar = CDStar(
)
def is_valid_archive(archive_id: str) -> bool:
def is_valid_archive(vault_id: str, archive_id: str) -> bool:
cdstar = CDStar(
config.BasicSettings().cdstar_uri,
auth=(config.BasicSettings().cdstar_user, config.BasicSettings().cdstar_pass),
)
try:
vault = CDStarVault(cdstar, config.BasicSettings().cdstar_vault)
vault = CDStarVault(cdstar, vault_id)
archive = vault.archive(archive_id)
return archive.exists()
except:
......@@ -34,7 +34,7 @@ def is_valid_archive(archive_id: str) -> bool:
def get_cdstar_metadata(
archive_id: str,
vault_id: str, archive_id: str,
) -> Tuple[Optional[cdmodels.ArchiveInfo], List[cdmodels.FileInfo]]:
cdstar = CDStar(
config.BasicSettings().cdstar_uri,
......@@ -42,7 +42,7 @@ def get_cdstar_metadata(
)
try:
vault = CDStarVault(cdstar, config.BasicSettings().cdstar_vault)
vault = CDStarVault(cdstar, vault_id)
archive = vault.archive(archive_id)
meta_archive = cdmodels.ArchiveInfo(
**cdstar.archive_info(vault.name, archive.id)
......@@ -58,18 +58,18 @@ def get_cdstar_metadata(
return None, []
def upload_archive(archive_meta: Dict[str, Any]) -> bool:
return upload_jsonld(archive_meta)
def upload_archive(archive_meta: Dict[str, Any], couch_db: str) -> bool:
return upload_jsonld(archive_meta, couch_db)
def upload_files(files_metalist: List[Dict[str, Any]]) -> bool:
def upload_files(files_metalist: List[Dict[str, Any]], couch_db: str) -> bool:
for file_meta in files_metalist:
if not upload_jsonld(file_meta):
if not upload_jsonld(file_meta, couch_db):
return False
return True
def upload_jsonld(jsonld: Dict[str, Any]) -> bool:
def upload_jsonld(jsonld: Dict[str, Any], couch_db: str) -> bool:
if "identifier" not in jsonld.keys():
error_log.error(f"Supplied jsonld has no id for `upload_jsonld`: {jsonld}")
return False
......@@ -80,9 +80,7 @@ def upload_jsonld(jsonld: Dict[str, Any]) -> bool:
url=config.BasicSettings().couch_uri,
use_basic_auth=True,
) as client:
database: CloudantDatabase.CouchDatabase = client[
config.BasicSettings().couch_db
]
database: CloudantDatabase.CouchDatabase = client[couch_db]
if not database.exists():
return False
jsonld["_id"] = jsonld["identifier"]
......@@ -93,196 +91,3 @@ def upload_jsonld(jsonld: Dict[str, Any]) -> bool:
error_log.error("Could not connect to CouchDB.")
error_log.error(f"{t}: {v}\n{traceback.print_tb(tb)}")
return False
"""
def upload_files(
filelist: List[str], annotation: dict = {}
) -> models_metadata.MetadataArchive:
Uploads a list of files to CDSTAR.
For each filepath in filelist, a call to `upload_file_to_archive` is made.
Parameters
----------
filelist : List[str]
A list of strings, where each sting is a full path to the file which shall be uploaded to CDSTAR.
annotations : dict
A JSON with the annotations that shall be made according to schema.org. This does not use JSON-LD as is,
but is a pretty close representation of it. If available, the encodingFormat will be used as a MIME-type
for uploading the data to CDSTAR.
Returns
-------
List[models.MetadataArchive]
A list of Archives modelled according to schema.org with full File-metadata instead of references.
if "encodingFormat" in annotation.keys():
mime_type = annotation["encodingFormat"]
else:
mime_type = None
with cdstar.begin(autocommit=True):
vault = CDStarVault(cdstar, config.BasicSettings().cdstar_vault_target)
archive = vault.new_archive()
response = models_metadata.MetadataArchive(
id=archive.id,
name=archive.id,
url=os.path.join(
config.BasicSettings().cdstar_uri, archive.vault.name, archive.id
),
accessibilityAPI=config.BasicSettings().metadata_accessibility_api,
dateCreated=datetime.datetime.now().isoformat(),
)
for k, v in annotation.items():
response.__setattr__(k, v)
for file in filelist:
uploaded = upload_file_to_archive(file, archive, mime_type)
for k, v in annotation.items():
uploaded.__setattr__(k, v)
response.hasPart.append(uploaded)
return response
def upload_file_to_archive(
file: str, archive: CDStarArchive, mime_type: Optional[str]
) -> models_metadata.MetadataFile:
Uploads a file to a CDStarArchive.
If not CDSTAR archive is supplied, a new archive will be created prior to uploading a file.
Parameters
----------
file : str
A full path to a file that shall be uploaded to the CDSTAR archive.
archive : CDStarArchive
An instance of a CDStarArchive. If None, it will be created prior to uploading file.
mime_type : str
The MIME-Type of the to-be-uploaded file. If None, the automatic detection of CDSTAR will be used.
Returns
-------
models.MetadataFile
The result of the upload procedure modelled after [schema.org/DataDownload](http://schema.org/DataDownload).
On error, the ID of the annotation will be an empty String, name is set to the filename but the rest of the
attributes are set to None.
if os.path.isfile(file) and os.path.exists(file) and os.access(file, os.R_OK):
filename = os.path.basename(file)
cdstar_file = archive.file(filename)
kargs = {"type": None, "source": file}
if mime_type is not None:
kargs["type"] = mime_type
response = cdstar_file.put(**kargs)
if (
"id" not in response.keys()
or "name" not in response.keys()
or "type" not in response.keys()
):
return models_metadata.MetadataFile(
id=None,
name=filename,
description=f"The CDSTAR response is invalid: {json.dumps(response)}",
)
return models_metadata.MetadataFile(
id=response["id"],
name=response["name"],
encodingFormat=response["type"],
contentSize=response["size"],
dateCreated=response["created"],
uploadDate=response["modified"],
accessibilityAPI=config.BasicSettings().metadata_accessibility_api,
contentUrl=os.path.join(
config.BasicSettings().cdstar_uri,
archive.vault.name,
archive.id,
cdstar_file.name,
),
)
else:
return models_metadata.MetadataFile(
id=None, name=file, description=f"File not found or not readable: {file}",
)
Annotates an Archive.
def annotate_archive(
archive: models_metadata.MetadataArchive,
) -> models_metadata.MetadataArchive:
Will create metadata for an CDSTAR Archive and upload it to the CouchDB Metadata store.
It will combine responses from CDSTAR as well as user / active_workflow supplied metadata.
Parameters
----------
archive : models_metadata.MetadataArchive
Metadata about a CDSTAR archive that will be stored.
Returns
-------
models_metadata.Metadata Archive
Returns the uploaded metadata archive. Most of the time, this should be equal to the `archive` parameter.
couchdb = pycouchdb.Server(config.BasicSettings().couchdb_uri, authmethod="basic")
try:
db = couchdb.database(config.BasicSettings().couchdb_db)
except pycouchdb.exceptions.NotFound:
db = couchdb.create(config.BasicSettings().couchdb_db)
archive_doc = schema_org_to_couchdb(archive.dict(exclude_none=True))
archive_doc = db.save(archive_doc)
archive_doc["hasPart"] = []
for dist in archive.hasPart:
if dist.id is not None:
new_dist = annotate_file(dist, archive)
archive_doc["hasPart"].append(new_dist.schema_org_ref())
db.save(archive_doc)
return archive
def annotate_file(
upload: models_metadata.MetadataFile, partOf: models_metadata.MetadataArchive
) -> models_metadata.MetadataFile:
Annotate a File.
Creates and uploads metadata of a CDSTAR File to a CouchDB Metadata store. It will combine
responses from CDSTAR as well as user / active_workflow supplied metadata.
Parameters
----------
upload : models_metadata.MetadataFile
Metadata about a CDSTAR File that will be stored.
partOf : models_metadata.MetadataArchive
Metadata about the CDSTAR Archive that contains this file in order to annotate the fact
that this file is a part of the archive.
Results
-------
models_metadata.MetadataFile
Returns the uploaded metadata file. Most of the time, this should be equal to the `upload`parameter
with the part of information added.
couchdb = pycouchdb.Server(config.BasicSettings().couchdb_uri, authmethod="basic")
try:
db = couchdb.database(config.BasicSettings().couchdb_db)
except pycouchdb.exceptions.NotFound:
db = couchdb.create(config.BasicSettings().couchdb_db)
meta_json = schema_org_to_couchdb(upload.dict(exclude_none=True))
meta_json["isPartOf"] = partOf.schema_org_ref()
file_doc = db.save(meta_json)
upload.isPartOf = partOf
return upload
def schema_org_to_couchdb(in_dict: dict) -> dict:
in_dict["_id"] = in_dict["id"]
in_dict["@id"] = in_dict.pop("id", None)
in_dict["@type"] = in_dict.pop("type", None)
in_dict["@context"] = in_dict.pop("context", None)
return in_dict
"""
......@@ -17,25 +17,25 @@ class AnnotationState(str, enum.Enum):
incomplete = "incomplete"
def create_file_for_archive(archive_id: str) -> Optional[str]:
if annotation_in_progress(archive_id):
def create_file_for_archive(vault_id: str, archive_id: str) -> Optional[str]:
if annotation_in_progress(vault_id, archive_id):
error_log.error(f"Archive {archive_id} is processing.")
return None
path = get_path_for_archive(archive_id)
path = get_path_for_archive(vault_id, archive_id)
if not write_state_to_metafile(AnnotationState.started, path):
return None
return path
def annotation_in_progress(archive_id: str) -> bool:
path = get_path_for_archive(archive_id)
def annotation_in_progress(vault_id: str, archive_id: str) -> bool:
path = get_path_for_archive(vault_id, archive_id)
if os.path.exists(path) and os.path.isfile(path):
return True
return False
def get_path_for_archive(archive_id: str) -> str:
return os.path.join(tempfile.gettempdir(), f"meta_{archive_id}.json")
def get_path_for_archive(vault_id: str, archive_id: str) -> str:
return os.path.join(tempfile.gettempdir(), f"meta_{vault_id}_{archive_id}.json")
def merge_jsonld(jsonld: Dict[str, Any], annotations: Dict[str, Any]) -> Dict[str, Any]:
......@@ -120,4 +120,3 @@ def remove_metafile(path: str) -> bool:
error_log.warning(f"File {path} could not be removed as it was not found.")
return True
return False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment