Commit 3fa54b76 authored by Markus Suhr's avatar Markus Suhr
Browse files

Switched to Redis for job queue handling, added documentation

parent 1752ed87
Pipeline #178608 passed with stage
in 25 seconds
......@@ -5,7 +5,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
_Types of changes_:
_Types of changes_:
* `Added` for new features.
* `Changed` for changes in existing functionality.
* `Deprecated` for soon-to-be removed features.
......@@ -15,12 +16,41 @@ _Types of changes_:
## [Unreleased]
## [0.8.0] - unreleased
### Added
* Supplied an [example compose file](docker-compose.yml) for deployment with _docker-compose_
including the newly required Redis database service.
### Changed
* Switched from synchronous to **asynchronous file processing**: instead of moving
(copying, deleting) files procedurally when a "receive" or "chech" HTTP request is handled, jobs now will be stored in
a **Redis** database and will be executed as background tasks by the FastAPI engine.
See https://fastapi.tiangolo.com/tutorial/background-tasks/
for details.
* Connection to the Redis database requires new environment variables to be set when operating the File Mover
Agent: `redis_host`, `redis_port`, `redis_password`. If not specified, default values for a local Redis node within
the Docker network are assumed, i.e.
```yaml
environment:
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: ""
```
### Removed
* The Agent no longer processes files when queried via the "check" REST endpoint. To trigger actual file manipulations,
a HTTP query at the "receive" endpoint is required and will get enqueued for asynchronous execution.
## [0.7.0] - 2021-02-21
### Added
- Introduced new option to limit the number of files to process. Limit can be
specified as a parameter within the `criteria` block of config options:
- Introduced new option to limit the number of files to process. Limit can be specified as a parameter within
the `criteria` block of config options:
```json
"criteria": {
"limit": 10
......
......@@ -9,8 +9,7 @@ verify_ssl = true
fastapi = "*"
uvicorn = "*"
pydantic = "*"
pickledb = "*"
puckdb = "*"
redis = "*"
[requires]
python_version = "3.8"
{
"_meta": {
"hash": {
"sha256": "ed061717393de7a37a099618ff032584331ce7a6e1224f647a424095f19aa75a"
"sha256": "914fd8cc9b85f09b3bfc5177137cf854daa6eccc1f715e78d5533efc0ac72060"
},
"pipfile-spec": 6,
"requires": {
......@@ -38,48 +38,41 @@
],
"version": "==0.12.0"
},
"pickledb": {
"hashes": [
"sha256:ec6973e65d7d112849e78ce522840aa908efb2523470bb8ce5c7942310192240"
],
"index": "pypi",
"version": "==0.9.2"
},
"puckdb": {
"pydantic": {
"hashes": [
"sha256:8c5c11b33f482311c768dc01578ac226b55349a0bfdd5bfb162fd811c94fea7c",
"sha256:e47fbd4dea1534265775632069d672216840244a2b51d5295933c9a6d8f0e5d1"
"sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850",
"sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f",
"sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683",
"sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e",
"sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3",
"sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9",
"sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c",
"sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f",
"sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a",
"sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2",
"sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125",
"sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8",
"sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99",
"sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f",
"sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0",
"sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d",
"sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520",
"sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58",
"sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771",
"sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4",
"sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e",
"sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3"
],
"index": "pypi",
"version": "==0.9.2"
"version": "==1.8.1"
},
"pydantic": {
"redis": {
"hashes": [
"sha256:0b71ca069c16470cb00be0acaf0657eb74cbc4ff5f11b42e79647f170956cda3",
"sha256:12ed0b175bba65e29dfc5859cd539d3512f58bb776bf620a3d3338501fd0f389",
"sha256:22fe5756c6c57279234e4c4027a3549507aca29e9ee832d6aa39c367cb43c99f",
"sha256:26821f61623b01d618bd8b3243f790ac8bd7ae31b388c0e41aa586002cf350eb",
"sha256:2bc9e9f5d91a29dec53346efc5c719d82297885d89c8a62b971492fba222c68d",
"sha256:42b8fb1e4e4783c4aa31df44b64714f96aa4deeacbacf3713a8a238ee7df3b2b",
"sha256:4a83d24bcf9ce8e6fa55c379bba1359461eedb85721bfb3151e240871e2b13a8",
"sha256:5759a4b276bda5ac2360f00e9b1e711aaac51fabd155b422d27f3339710f4264",
"sha256:77e04800d19acc2a8fbb95fe3d47ff397ce137aa5a2b32cc23a87bac70dda343",
"sha256:865410a6df71fb60294887770d19c67d499689f7ce64245182653952cdbd4183",
"sha256:91baec8ed771d4c53d71ef549d8e36b0f92a31c32296062d562d1d7074dd1d6e",
"sha256:999cc108933425752e45d1bf2f57d3cf091f2a5e8b9b8afab5b8872d2cc7645f",
"sha256:a0ff36e3f929d76b91d1624c6673dbdc1407358700d117bb7f29d5696c52d288",
"sha256:a989924324513215ad2b2cfd187426e6372f76f507b17361142c0b792294960c",
"sha256:ad2fae68e185cfae5b6d83e7915352ff0b6e5fa84d84bc6a94c3e2de58327114",
"sha256:b4e03c84f4e96e3880c9d34508cccbd0f0df6e7dc14b17f960ea8c71448823a3",
"sha256:c26d380af3e9a8eb9abe3b6337cea28f057b5425330817c918cf74d0a0a2303d",
"sha256:c8a3600435b83a4f28f5379f3bb574576521180f691828268268e9f172f1b1eb",
"sha256:ccc2ab0a240d01847f3d5f0f9e1582d450a2fc3389db33a7af8e7447b205a935",
"sha256:d361d181a3fb53ebfdc2fb1e3ca55a6b2ad717578a5e119c99641afd11b31a47",
"sha256:d5aeab86837f8799df0d84bec1190e6cc0062d5c5374636b5599234f2b39fe0a",
"sha256:edf37d30ea60179ef067add9772cf42299ea6cd490b3c94335a68f1021944ac4"
"sha256:0e7e0cfca8660dea8b7d5cd8c4f6c5e29e11f31158c0b0ae91a397f00e5a05a2",
"sha256:432b788c4530cfe16d8d943a09d40ca6c16149727e4afe8c2c9d5580c59d9f24"
],
"index": "pypi",
"version": "==1.8"
"version": "==3.5.3"
},
"starlette": {
"hashes": [
......
# FileMover Active Workflow Agent
This Agents complies to the [external REST-API](https://github.com/automaticmode/active_workflow/blob/master/docs/remote_agent_api.md)
of [ActiveWorkflow](https://github.com/automaticmode/active_workflow).
It moves (all) files from a source path to destination path according to
sent parameters.
This Agents complies to
the [external REST-API](https://github.com/automaticmode/active_workflow/blob/master/docs/remote_agent_api.md)
of [ActiveWorkflow](https://github.com/automaticmode/active_workflow). It moves files from a source path to destination
path according to sent parameters.
## Configuration
Since Version **0.8** the File Mover Agent requires a **Redis database** to handle the job queue. See the
example [compose file](docker-compose.yml) supplied and be sure to set the Redis environment variable(s).
### Environment Variables
The following environment variables can the specified when running _File Mover_ agent
with Docker:
* `application_name`: Overrides the default name of the Agent, i.e. the Agent Name displayed
in ActiveWorkflow user interface. (**NB:** This name is also used to internally by
the ActiveWorkflow engine to identify agents. **Use different names for multiple instances**
of the same Remote Agent across one or multiple host servers. **Changing the name** of an
instance that is already in use **may break the internal reference to that agent**.)
The following environment variables can the specified when running _File Mover_ agent with Docker:
* `application_name`: Overrides the default name of the Agent, i.e. the Agent Name displayed in ActiveWorkflow user
interface. (**NB:** This name is also used to internally by the ActiveWorkflow engine to identify agents. **Use
different names for multiple instances**
of the same Remote Agent across one or multiple host servers. **Changing the name** of an instance that is already in
use **may break the internal reference to that agent**.)
* `application_version`: Overrides the Agent version. No reason to set this variable.
* `api_key`: (optional) If set, _File Mover_ expects that the specified value is transmitted
as an option with all incoming `check` and `receive` requests. See agent
[usage description](mover/description.md#Options) for details.
* `default_source_dir`: Default source directory for copy or move operations. See agent
[usage description](mover/description.md#Options) for details.
* `default_destination_dir`: Default target directory for copy or move operations. See agent
[usage description](mover/description.md#Options) for details.
* `api_key`: (optional) If set, _File Mover_ expects that the specified value is transmitted as an option with all
incoming `check` and `receive` requests. See agent
[usage description](mover/description.md#Options) for details.
* `default_source_dir`: Default source directory for copy or move operations. See agent
[usage description](mover/description.md#Options) for details.
* `default_destination_dir`: Default target directory for copy or move operations. See agent
[usage description](mover/description.md#Options) for details.
* `redis_host`: Hostname of the Redis service, defaults to `redis`
* `redis_port`: Port of the Redis service, defaults to `6379`
* `redis_password`: Password for the Redis database. Empty by default.
## Usage
`./build_dev.sh`
`docker run -d -p 8000:80 harbor.umg.eu/medic/agent_mover:latest`
`docker-compose up -d`
Find your app at `localhost:8000`, you can verify the app is running by
checking the docs under `localhost:8000/docs` or `localhost:8000/redoc`.
Find your app at `localhost:80`, you can verify the app is running by checking the docs under `localhost/docs`
or `localhost/redoc`.
## ActiveWorkflow
### ActiveWorkflow
After starting FileMover Agent, you can supply
the `mover` endpoint as an environment variable to your ActiveWorkflow controller:
`REMOTE_AGENT_URL=http://localhost:8000/mover`
the `mover` endpoint as an environment variable to your ActiveWorkflow controller:
`REMOTE_AGENT_URL=http://localhost/mover`
See agent [usage description](mover/description.md) for details.
......@@ -47,7 +52,7 @@ File Mover is licensed under terms of the [GPLv3 license](LICENSE)
```
File Mover
Copyright (C) 2020 UMG MeDIC
Copyright (C) 2020-2021 UMG MeDIC
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......
version: "3.5"
services:
agent_mover:
image: harbor.umg.eu/medic/agent_mover:dev
build: .
environment:
API_KEY: change_me_now
REDIS_HOST: redis
ports:
- 80:80
redis:
image: redis
VERSION = (0, 7, 0)
VERSION = (0, 8, 0)
__version__ = ".".join(map(str, VERSION))
import json
import os
from json.encoder import JSONEncoder
from puckdb import PuckDB
from pydantic import BaseSettings
from redis import StrictRedis
from mover import __version__
......@@ -13,7 +14,9 @@ class BasicSettings(BaseSettings):
api_key: str = None
default_source_dir: str = "/tmp/source"
default_destination_dir: str = "/tmp/destination"
queue: str = "/tmp/queue.db"
redis_host: str = "redis"
redis_port: int = 6379
redis_password: str = ""
def get_version():
......@@ -27,8 +30,23 @@ with open(
agent_description = markdown.read()
def get_queue():
filepath = BasicSettings().queue
# Set default JSON encoding engine to "str" to enable datetime serialization
JSONEncoder.default = str
return PuckDB(location=filepath, auto_dump=True, sig=False)
class Queue:
"""
Queue class to encapsulate Redis connection and enforce JSON encoding of values
"""
def __init__(self, settings: BasicSettings = BasicSettings()):
self.redis = StrictRedis(host=settings.redis_host, port=settings.redis_port, password=settings.redis_password)
def set(self, key, value):
JSONEncoder.default = str
return self.redis.set(key, json.dumps(value))
def get(self, key):
return json.loads(self.redis.get(key))
def delete(self, key):
return self.redis.delete(key)
def keys(self, pattern: str = '*'):
return self.redis.keys(pattern)
......@@ -7,6 +7,7 @@ from a specified _source_ to _destination_ path, or deletes all files from
the specified _source_ directory.
## Usage
Source and destination path can be set via options:
```
......
......@@ -6,7 +6,7 @@ from fastapi import FastAPI, BackgroundTasks, status
from fastapi.responses import JSONResponse
from mover.auth import authorize
from mover.config import get_queue
from mover.config import Queue
from mover.models import *
from mover.mover import move
......@@ -32,7 +32,7 @@ def mover(payload: RequestCommon, background_tasks: BackgroundTasks):
@app.post('/mover/register', response_model=ResponseRegister)
def register(payload: RequestRegister):
"""
Handle register method of active_workflow. Same as sending method = "register" to the jobname API endpoint.
Handle register method of active_workflow.
Check [active_workflow docs](https://github.com/automaticmode/active_workflow/blob/master/docs/remote_agent_api.md#register-method)
for more information.
"""
......@@ -68,9 +68,12 @@ def receive(payload: RequestReceive, background_tasks: BackgroundTasks):
# generate id
timestamp = datetime.now()
task_id = sha1(str(timestamp).encode()).hexdigest()[0:15]
# store params in pickledb
q = get_queue()
# store params in queue
q = Queue()
q.set(task_id, {'received': timestamp, 'action': 'move', 'params': params})
# Add task to background tasks for asynchronous execution
background_tasks.add_task(move, task_id=task_id, **params)
response = ResponseReceiveCheck()
......@@ -95,10 +98,10 @@ def check(payload: RequestCheck):
content={'result': ResultReceiveCheck(errors=[info_message]).dict()}
)
q = get_queue()
q = Queue()
response = ResponseReceiveCheck()
done = []
for job_id in q.getall():
for job_id in q.keys():
data = q.get(job_id)
if 'result' in data.keys():
r = data['result']
......@@ -109,7 +112,7 @@ def check(payload: RequestCheck):
else:
response.result.logs.append(f"Job with id {job_id} queued since {data['received']}.")
for job_id in done:
q.rem(job_id)
q.delete(job_id)
return JSONResponse(content=response.dict())
......
......@@ -3,7 +3,7 @@ import shutil
from datetime import date, timedelta, datetime
from os.path import getctime
from mover.config import get_queue
from mover.config import Queue
from mover.models import ResultReceiveCheck
......@@ -14,7 +14,8 @@ def move(task_id: str, source: str, destination: str, method: str, criteria: dic
result = ResultReceiveCheck()
result.logs = [
f"[id={task_id}] FileMover agent started. Source path: {source}, destination path: {destination}, method: {method}, criteria: {criteria}"]
f"[{task_id}] FileMover agent started. Source path: {source}, destination path: {destination}, " +
f"method: {method}, criteria: {criteria}"]
# Active Workflow response message
msg = {}
......@@ -91,24 +92,24 @@ def move(task_id: str, source: str, destination: str, method: str, criteria: dic
else:
not_moved.append(file)
if log_message is not None:
result.logs.append(f"[id={task_id}] " + log_message)
result.logs.append(f"[{task_id}] " + log_message)
code = 2
count = len(moved)
count_not = len(source_files) - count
if code == 0:
if count_not == 0:
result.logs.append(f"[id={task_id}] All {count} files {verb} from {source} to {destination}")
result.logs.append(f"[{task_id}] All {count} files {verb} from {source} to {destination}")
else:
result.logs.append(f"[id={task_id}] {count} files {verb} from {source} to {destination}, \
result.logs.append(f"[{task_id}] {count} files {verb} from {source} to {destination}, \
{count_not} files skipped.")
elif code == 3:
result.logs.append(f"[id={task_id}] All {count} files {verb} from {source}")
result.logs.append(f"[{task_id}] All {count} files {verb} from {source}")
elif code == 2:
result.logs.append(f"[id={task_id}] Files have been partially {verb} from {source} to {destination}. \
result.logs.append(f"[{task_id}] Files have been partially {verb} from {source} to {destination}. \
{count} files {verb}. \
See log messages for further details.")
result.logs.append(f"[id={task_id}] {count_not} files not processed: {not_moved}")
result.logs.append(f"[{task_id}] {count_not} files not processed: {not_moved}")
finished_timestamp = datetime.now()
......@@ -125,9 +126,10 @@ def move(task_id: str, source: str, destination: str, method: str, criteria: dic
print(e)
result.errors = [str(e)]
# According to Active Workflow convention/logic, do not return a message on error
finished_timestamp = datetime.now()
# write Result to queue storage
q = get_queue()
q = Queue()
data = q.get(task_id)
data['result'] = result.dict()
data['started'] = started_timestamp
......
click==7.1.2
fastapi==0.63.0
h11==0.12.0
pickledb==0.9.2
puckdb==0.9.2
pydantic==1.8
pydantic==1.8.1
redis==3.5.3
starlette==0.13.6
typing-extensions==3.7.4.3
uvicorn==0.13.4
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment