Commit 6cf4cd16 authored by Markus Suhr's avatar Markus Suhr
Browse files

Added AW state handling via memory attribute to enable distinction between multiple workflows

parent 3fa54b76
Pipeline #198131 passed with stage
in 45 seconds
...@@ -26,7 +26,7 @@ _Types of changes_: ...@@ -26,7 +26,7 @@ _Types of changes_:
### Changed ### Changed
* Switched from synchronous to **asynchronous file processing**: instead of moving * Switched from synchronous to **asynchronous file processing**: instead of moving
(copying, deleting) files procedurally when a "receive" or "chech" HTTP request is handled, jobs now will be stored in (copying, deleting) files procedurally when a "receive" or "check" HTTP request is handled, jobs now will be stored in
a **Redis** database and will be executed as background tasks by the FastAPI engine. a **Redis** database and will be executed as background tasks by the FastAPI engine.
See https://fastapi.tiangolo.com/tutorial/background-tasks/ See https://fastapi.tiangolo.com/tutorial/background-tasks/
for details. for details.
......
...@@ -78,6 +78,16 @@ def receive(payload: RequestReceive, background_tasks: BackgroundTasks): ...@@ -78,6 +78,16 @@ def receive(payload: RequestReceive, background_tasks: BackgroundTasks):
response = ResponseReceiveCheck() response = ResponseReceiveCheck()
response.result.logs = [f"Job with id {task_id} added to queue."] response.result.logs = [f"Job with id {task_id} added to queue."]
# This Remote Agent may be part of multiple ActiveWorkflow Workflows
# We have to rely on the AW memory to store job-IDs which are part of the correct Workflow
memory = payload.params.memory
if 'jobs' in memory and type(memory['jobs']) is list:
memory['jobs'].append(task_id)
else:
memory['jobs'] = [task_id]
response.result.memory = memory
return JSONResponse(content=response.dict()) return JSONResponse(content=response.dict())
...@@ -98,21 +108,32 @@ def check(payload: RequestCheck): ...@@ -98,21 +108,32 @@ def check(payload: RequestCheck):
content={'result': ResultReceiveCheck(errors=[info_message]).dict()} content={'result': ResultReceiveCheck(errors=[info_message]).dict()}
) )
# This Remote Agent may be part of multiple ActiveWorkflow Workflows
# We have to rely on the AW memory to store job-IDs which are part of the correct Workflow
memory = payload.params.memory
if 'jobs' not in memory or type(memory['jobs']) is not list:
memory['jobs'] = []
q = Queue() q = Queue()
response = ResponseReceiveCheck() response = ResponseReceiveCheck()
done = [] done = []
for job_id in q.keys(): for job_id in q.keys():
data = q.get(job_id) data = q.get(job_id)
if 'result' in data.keys(): job_id = job_id.decode()
r = data['result'] if job_id in memory['jobs']:
response.result.logs.extend(r['logs']) if 'result' in data.keys():
response.result.errors.extend(r['errors']) r = data['result']
response.result.messages.extend(r['messages']) response.result.logs.extend(r['logs'])
done.append(job_id) response.result.errors.extend(r['errors'])
else: response.result.messages.extend(r['messages'])
response.result.logs.append(f"Job with id {job_id} queued since {data['received']}.") done.append(job_id)
else:
response.result.logs.append(f"Job with id {job_id} queued since {data['received']}.")
for job_id in done: for job_id in done:
q.delete(job_id) q.delete(job_id)
memory['jobs'].remove(job_id)
response.result.memory = memory
return JSONResponse(content=response.dict()) return JSONResponse(content=response.dict())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment