Commit 9f65075a authored by parciak's avatar parciak
Browse files

End of day commit. Working on the Background Task method to annotate a CDSTAR Archive


Signed-off-by: parciak's avatarMarcel Parciak <marcel.parciak@gmail.com>
parent 90f9636e
......@@ -7,6 +7,7 @@ verify_ssl = true
fastapi = "*"
uvicorn = "*"
black = "*"
pipenv-to-requirements = "*"
[packages]
cloudant = "*"
......
......@@ -7,6 +7,7 @@ from annotator import __version__
class BasicSettings(BaseSettings):
application_name: str = "CouchDB Annotation Agent"
application_version: str = __version__.__version__
tmp_directory: str = "/tmp"
cdstar_uri: str = "http://localhost:8082/v3"
cdstar_user: str = "medic1"
cdstar_pass: str = "medIC!umG!9"
......
import datetime
import os
from typing import Any, Dict
from annotator import access_log, error_log
from annotator import stores
from annotator import config
from annotator import errors
from annotator import models_activeworkflow as awmodels
from annotator.config import BasicSettings
from annotator.errors import ActiveWorkflowException
from annotator import stores
from annotator import utils
from fastapi import BackgroundTasks, FastAPI, Request
import fastapi
......@@ -20,7 +23,7 @@ endpoint_name = "annotator"
def appinfo():
""" Prints the configuration of this instance.
"""
return BasicSettings().dict()
return config.BasicSettings().dict()
@app.post(
......@@ -49,7 +52,7 @@ def aw_endpoint(payload: awmodels.RequestCommon, background_tasks: BackgroundTas
background_tasks=background_tasks,
)
raise ActiveWorkflowException(
raise errors.ActiveWorkflowException(
name="Method Error",
message=f"You did not supply a valid method. Valid methods are: register, receive, check",
)
......@@ -82,6 +85,7 @@ def register(payload: awmodels.RequestRegister):
f"/{endpoint_name}/receive",
response_model=awmodels.ResponseReceive,
response_model_exclude_none=True,
status_code=fastapi.status.HTTP_202_ACCEPTED,
)
def receive(payload: awmodels.RequestReceive, background_tasks: BackgroundTasks):
""" Receives a message that will annotate a CDSTAR archive.
......@@ -92,17 +96,76 @@ def receive(payload: awmodels.RequestReceive, background_tasks: BackgroundTasks)
)
response = awmodels.ResponseReceive()
response.result.memory.archives += payload.params.memory.archives
archive_id = payload.params.message.payload.archive_id
if not stores.is_valid_archive(archive_id):
raise ActiveWorkflowException(
raise errors.ActiveWorkflowException(
"Invalid value for `archive_id`",
"The `archive_id` you supplied is not available in CDSTAR.",
)
if utils.annotation_in_progress(archive_id):
response.result.logs.append(
f"Archive {archive_id} is currently being processed."
)
return response
annotations_archive = payload.params.options.annotations_archive
for k, v in payload.params.message.payload.annotations_archive.items():
annotations_archive[k] = v
annotations_file = payload.params.options.annotations_file
for k, v in payload.params.message.payload.annotations_file.items():
annotations_file[k] = v
metafile = utils.create_file_for_archive(archive_id)
background_tasks.add_task(
run_annotation,
archive_id=archive_id,
annotations_archive=annotations_archive,
annotations_file=annotations_file,
metafile=metafile,
)
response.result.logs.append(f"Starting annotation of CDSTAR archive {archive_id}")
response.result.memory.archives.append(archive_id)
return response
def run_annotation(
archive_id: str,
annotations_archive: Dict[str, Any],
annotations_file: Dict[str, Any],
metafile: str,
) -> None:
archive, filelist = stores.get_cdstar_metadata(archive_id)
if not archive:
errmsg = f"Not able to retrieve archive {archive_id} in BackgroundTask. How is that possible?"
error_log.error(errmsg)
os.unlink(metafile)
return
jsonld_archive = utils.merge_jsonld(archive.to_jsonld(), annotations_archive)
jsonld_files = [
utils.merge_jsonld(f.to_jsonld(), annotations_file) for f in filelist
]
if "hasPart" not in jsonld_archive or type(jsonld_archive["hasPart"]) is not list:
jsonld_archive["hasPart"] = []
for meta_file in jsonld_files:
jsonld_archive["hasPart"].append(utils.get_jsonld_reference(meta_file))
meta_file["isPartOf"] = utils.get_jsonld_reference(meta_file)
# TODO: in sync or async?...
# historically, couchdb was rather slow (lib fault?)
# if sync: what do I need check for?
try:
stores.upload_archive(jsonld_archive)
stores.upload_files(jsonld_files)
except:
errmsg = f"Could not upload annotations for archive {archive_id} to CouchDB"
error_log.error(errmsg)
os.unlink(metafile)
return
@app.post(
......@@ -121,8 +184,8 @@ def check(payload: awmodels.RequestCheck, background_tasks: BackgroundTasks):
pass
@app.exception_handler(ActiveWorkflowException)
async def aw_exception_handler(request: Request, exc: ActiveWorkflowException):
@app.exception_handler(errors.ActiveWorkflowException)
async def aw_exception_handler(request: Request, exc: errors.ActiveWorkflowException):
error_log.error(f"{exc.name}: {exc.message}")
resp = awmodels.ResponseCheck()
resp.result.errors.append(f"{exc.name}: {exc.message}")
......
......@@ -4,7 +4,9 @@ from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field, validator
from annotator import config
from annotator.models_metadata import MetadataBase
# TODO: for now, I use dict for annotations
# from annotator.models_metadata import MetadataBase
#
# Common models
......@@ -73,15 +75,24 @@ class PayloadInput(BaseModel):
"""
archive_id: str = Field(..., example="abcdef123456")
annotations: MetadataBase = Field(
None, example=MetadataBase(id="something", name="Some Thing")
annotations_archive: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
annotations_file: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
# TODO: for now, I will use dict as the annotation basis
# annotations: MetadataBase = Field(
# None, example=MetadataBase(id="something", name="Some Thing")
# )
@staticmethod
def example() -> Dict[str, Any]:
return {
"archive_id": "abcdef123456",
"annotations": MetadataBase(id="something", name="Some Thing").dict(),
"annotations_archive": {"id": "something", "name": "Some Thing"},
"annotations_file": {"id": "something", "name": "Some Thing"},
}
......@@ -106,13 +117,24 @@ class OptionsCommon(BaseModel):
This model represents expected options to configure this agent (or rather a call to this agent).
"""
annotations: MetadataBase = Field(
None, example=MetadataBase(id="something", name="Some Thing")
annotations_archive: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
annotations_file: Dict[str, Any] = Field(
{}, example={"id": "something", "name": "Some Thing"}
)
# TODO: for now, I will use dicts as metadata
# annotations: MetadataBase = Field(
# None, example=MetadataBase(id="something", name="Some Thing")
# )
@staticmethod
def example() -> Dict[str, Any]:
return {"annotations": MetadataBase(id="something", name="Some Thing").dict()}
return {
"annotations_archive": {"id": "something", "name": "Some Thing"},
"annotations_file": {"id": "something", "name": "Some Thing"},
}
class MemoryCommon(BaseModel):
......@@ -120,9 +142,11 @@ class MemoryCommon(BaseModel):
This model represents the expected memory content to communicate state of the agent.
"""
archives: List[str] = Field([], example=["abcdef123456"])
@staticmethod
def example() -> dict:
return {}
return {"archives": ["abcdef123456"]}
class CredentialsCommon(BaseModel):
......
import datetime
from enum import Enum
from typing import Any, Dict
from pydantic import BaseModel, Field
from annotator import config
class ArchiveState(Enum):
open = "open"
locked = "locked"
archived = "archived"
pending_recover = "pending-recover"
pending_archive = "pending-archive"
class ArchiveInfo(BaseModel):
"""
This model represents [ArchiveInfo](https://cdstar.gwdg.de/docs/dev/#ArchiveInfo) of CDSTAR.
"""
id: str = Field(..., example="abcdef123456")
vault: str = Field(..., example="medic")
revision: str = Field(None, example="1")
profile: str = Field(None, example="default")
state: ArchiveState = Field(None, example=ArchiveState.open)
created: datetime.date = Field(None, example="2020-07-24T09:40:10.548+0000")
modified: datetime.date = Field(None, example="2020-07-24T09:41:05.881+0000")
file_count: int = Field(None, example=1)
def to_jsonld(self) -> Dict[str, Any]:
jsonld: Dict[str, Any] = {
"@context": "http://schema.org/",
"@type": "Dataset",
"alternateName": self.id,
"identifier": self.id,
}
if self.file_count:
jsonld["size"] = {
"minValue": 0,
"unitText": "file_count",
"value": self.file_count,
}
if self.revision:
jsonld["version"] = self.revision
if self.created:
jsonld["dateCreated"] = self.created
if self.modified:
jsonld["dateModified"] = self.modified
return jsonld
class FileInfo(BaseModel):
"""
This model represents [FileInfo](https://cdstar.gwdg.de/docs/dev/#FileInfo) of CDSTAR.
"""
id: str = Field(..., example="abcd1234")
name: str = Field(None, example="example.csv")
type: str = Field(None, example="application/csv")
size: int = Field(None, example=250)
created: datetime.date = Field(None, example="2020-07-24T09:40:10.848+0000")
modified: datetime.date = Field(None, example="2020-07-24T09:41:05.882+0000")
digests: Dict[str, str] = Field(
None,
example={
"md5": "446464a97a5a63b081a66e137f648584",
"sha1": "5f1ff6fb342dbb028d0eaf098735711a9886c93e",
"sha256": "2a57d4a5d06599f9ea4df8bfdaaa7c57b3a6755ba8c445147f8911972cd58441",
},
)
def to_jsonld(self) -> Dict[str, Any]:
jsonld: Dict[str, Any] = {
"@context": "http://schema.org/",
"@type": "DataDownload",
"identifier": self.id,
}
if self.name:
jsonld["name"] = self.name
if self.type:
jsonld["encodingFormat"] = self.type
if self.size:
jsonld["contentSize"] = self.size
if self.created:
jsonld["dateCreated"] = self.created
if self.modified:
jsonld["dateModified"] = self.modified
return jsonld
......@@ -2,13 +2,14 @@ import datetime
import json
import os
import sys
from typing import List, Optional
from typing import List, Optional, Tuple
from pycdstar3 import CDStar, CDStarVault
from pycdstar3.api import CDStarArchive
from annotator import config, models_metadata
from annotator import config
from annotator import error_log
from annotator import models_cdstar as cdmodels
cdstar = CDStar(
config.BasicSettings().cdstar_uri,
......@@ -32,10 +33,36 @@ def is_valid_archive(archive_id: str) -> bool:
return False
def get_cdstar_metadata(
archive_id: str,
) -> Tuple[Optional[cdmodels.ArchiveInfo], List[cdmodels.FileInfo]]:
cdstar = CDStar(
config.BasicSettings().cdstar_uri,
auth=(config.BasicSettings().cdstar_user, config.BasicSettings().cdstar_pass),
)
try:
vault = CDStarVault(cdstar, config.BasicSettings().cdstar_vault)
archive = vault.archive(archive_id)
meta_archive = cdmodels.ArchiveInfo(
**cdstar.archive_info(vault.name, archive.id)
)
meta_files = [
cdmodels.FileInfo(**cd_file)
for cd_file in cdstar.iter_files(vault.name, archive.id)
]
return meta_archive, meta_files
except:
t, v, tb = sys.exc_info()
error_log.error(f"{t}: {v}\n{tb}")
return None, []
"""
def upload_files(
filelist: List[str], annotation: dict = {}
) -> models_metadata.MetadataArchive:
""" Uploads a list of files to CDSTAR.
Uploads a list of files to CDSTAR.
For each filepath in filelist, a call to `upload_file_to_archive` is made.
......@@ -52,7 +79,6 @@ def upload_files(
-------
List[models.MetadataArchive]
A list of Archives modelled according to schema.org with full File-metadata instead of references.
"""
if "encodingFormat" in annotation.keys():
mime_type = annotation["encodingFormat"]
......@@ -85,7 +111,7 @@ def upload_files(
def upload_file_to_archive(
file: str, archive: CDStarArchive, mime_type: Optional[str]
) -> models_metadata.MetadataFile:
""" Uploads a file to a CDStarArchive.
Uploads a file to a CDStarArchive.
If not CDSTAR archive is supplied, a new archive will be created prior to uploading a file.
......@@ -104,7 +130,6 @@ def upload_file_to_archive(
The result of the upload procedure modelled after [schema.org/DataDownload](http://schema.org/DataDownload).
On error, the ID of the annotation will be an empty String, name is set to the filename but the rest of the
attributes are set to None.
"""
if os.path.isfile(file) and os.path.exists(file) and os.access(file, os.R_OK):
filename = os.path.basename(file)
......@@ -144,7 +169,7 @@ def upload_file_to_archive(
)
""" Annotates an Archive.
Annotates an Archive.
def annotate_archive(
archive: models_metadata.MetadataArchive,
) -> models_metadata.MetadataArchive:
......
import os
import tempfile
from typing import Any, Dict, Optional
from annotator import error_log
def create_file_for_archive(archive_id: str) -> Optional[str]:
if annotation_in_progress(archive_id):
error_log.error(f"Archive {archive_id} is processing.")
return None
path = get_path_for_archive(archive_id)
try:
with open(path, "w") as metafile:
metafile.write("{}")
except:
error_log.error(f"Could not create or write to file: {path}.")
return None
return path
def annotation_in_progress(archive_id: str) -> bool:
path = get_path_for_archive(archive_id)
if os.path.exists(path) and os.path.isfile(path):
return True
return False
def get_path_for_archive(archive_id: str) -> str:
return os.path.join(tempfile.gettempdir(), f"meta_{archive_id}.json")
def merge_jsonld(jsonld: Dict[str, Any], annotations: Dict[str, Any]) -> Dict[str, Any]:
# Do not allow to override these keys
blacklist = ["@type", "@context", "id"]
for k, v in annotations:
if v is not None and k not in blacklist:
jsonld[k] = v
return jsonld
def get_jsonld_reference(jsonld: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if "@type" in jsonld.keys():
if "id" in jsonld.keys():
return {"@type": jsonld["@type"], "id": jsonld["id"]}
return None
################################################################################
# This requirements file has been automatically generated from `Pipfile` with
# `pipenv-to-requirements`
#
#
# This has been done to maintain backward compatibility with tools and services
# that do not support `Pipfile` yet.
#
# Do NOT edit it directly, use `pipenv install [-d]` to modify `Pipfile` and
# `Pipfile.lock` and then regenerate `requirements*.txt`.
################################################################################
appdirs==1.4.4
attrs==19.3.0
black==19.10b0
click==7.1.2
fastapi==0.60.1
h11==0.9.0
httptools==0.1.1 ; sys_platform != 'win32' and sys_platform != 'cygwin' and platform_python_implementation != 'PyPy'
pathspec==0.8.0
pydantic==1.6.1
regex==2020.7.14
starlette==0.13.6
toml==0.10.1
typed-ast==1.4.1
uvicorn==0.11.6
uvloop==0.14.0 ; sys_platform != 'win32' and sys_platform != 'cygwin' and platform_python_implementation != 'PyPy'
websockets==8.1
################################################################################
# This requirements file has been automatically generated from `Pipfile` with
# `pipenv-to-requirements`
#
#
# This has been done to maintain backward compatibility with tools and services
# that do not support `Pipfile` yet.
#
# Do NOT edit it directly, use `pipenv install [-d]` to modify `Pipfile` and
# `Pipfile.lock` and then regenerate `requirements*.txt`.
################################################################################
alembic==1.4.2
attrs==19.3.0
certifi==2020.6.20
chardet==3.0.4
cloudant==2.13.0
idna==2.10
iniconfig==1.0.0
mako==1.1.3
markupsafe==2.0.0a1
more-itertools==8.4.0
packaging==20.4
pluggy==0.13.1
py==1.9.0
pycdstar3 @ git+https://gitlab.gwdg.de/cdstar/pycdstar3@bde5c7ee604a74e8b20d770716a59310b8f19166#egg=pycdstar3
pyparsing==3.0.0a2
pytest==6.0.0rc1
python-dateutil==2.8.1
python-editor==1.0.4
requests==2.24.0
six==1.15.0
sqlalchemy==1.3.18
toml==0.10.1
urllib3==1.25.10
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment