diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000000000000000000000000000000000000..2c211f7f99c781fb6efd51c9c34e1e87a22e9380 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,11 @@ +{ + "configurations": [ + { + "name": "Python: main.py", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/src/main.py", + "justMyCode": false + } + ] +} diff --git a/requirements.txt b/requirements.txt index 78aade9fefbd74d48110ed5621d249c71aec8e8e..24416cd71eefdf06ca8b1788e2ddf9b61b12ae54 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +https://gitlab.gwdg.de/dariah-de/textgridrep/textgrid-python-clients/-/archive/e6971b39e19efb0e81d35a634bd616917b3543b3/textgrid-python-clients-e6971b39e19efb0e81d35a634bd616917b3543b3.tar.gz cheroot==8.5.2 requests==2.26.0 sentry-sdk==1.4.3 diff --git a/src/repdav/stream_tools.py b/src/repdav/stream_tools.py new file mode 100644 index 0000000000000000000000000000000000000000..390a101d447409d78ce633aa54f4de7cf9af0952 --- /dev/null +++ b/src/repdav/stream_tools.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2009-2022 Martin Wendt and contributors <https://github.com/mar10/wsgidav> +# SPDX-FileCopyrightText: 2022 Stefan Hynek +# +# SPDX-License-Identifier: MIT +# +# Original file: https://github.com/mar10/wsgidav/blob/d22ada5db70812ac9201c6861c53ce5cf2157342/wsgidav/stream_tools.py +"""Implement the FileLikeQueue helper class. + +This helper class is intended to handle use cases where an incoming PUT +request should be directly streamed to a remote target. + +Usage: return an instance of this class to`begin_write` and pass it to the +consumer at the same time:: + + def begin_write(self, contentType=None): + queue = FileLikeQueue(length) + requests.post(..., data=queue) + return queue + +""" +import logging +import queue + +_logger = logging.getLogger(__name__) + + +# ============================================================================ +# FileLikeQueue +# ============================================================================ + + +class FileLikeQueue: + """A queue for chunks that behaves like a file-like. + + read() and write() are typically called from different threads. + + This helper class is intended to handle use cases where an incoming PUT + request should be directly streamed to a remote target: + + def begin_write(self, contentType=None): + # Create a proxy buffer + queue = FileLikeQueue(length) + # ... and use it as source for the consumer: + requests.post(..., data=queue) + # pass it to the PUT handler as target + return queue + """ + + def __init__(self, length: int, max_size: int=0): + self.is_closed = False + self.queue = queue.Queue(max_size) + self.unread = b"" + self.len = length + + def __len__(self) -> int: + _logger.debug( + "Called FileLikeQueue.__len__(self). Returned %s", self.len) + return self.len + + #: File API + def read(self, size: int=0) -> bytes: + """Read a chunk of bytes from queue. + + This method blocks until the requested size become available. + However, if close() was called, '' is returned immediately. + + Args: + size: Chunk length. + size = 0: Read next chunk (arbitrary length) + > 0: Read one chunk of `size` bytes (or less if stream was closed) + < 0: Read all bytes as single chunk (i.e. blocks until stream is closed) + + Returns: + Chunk in bytes. + """ + _logger.debug( + "Called FileLikeQueue.read(self, size=%s).", size) + + res = self.unread + self.unread = b"" + # Get next chunk, cumulating requested size as needed + while res == b"" or size < 0 or (size > 0 and len(res) < size): + try: + # Read pending data, blocking if neccessary + # (but handle the case that close() is called while waiting) + res += self.queue.get(True, 0.1) + except queue.Empty: + # There was no pending data: wait for more, unless close() was called + if self.is_closed: + break + # Deliver `size` bytes from buffer + if len(res) > size > 0: + self.unread = res[size:] + res = res[:size] + # Reduce the Queue's length by the number of bytes read + self.len -= len(res) + + return res + + def write(self, chunk: bytes): + """Put a chunk of bytes (or an iterable) to the queue. + + May block if max_size number of chunks is reached. + + Args: + chunk: A chunk of bytes or an iterable + """ + _logger.debug( + "Called FileLikeQueue.write(self, chunk=%s).", len(chunk)) + if self.is_closed: + raise ValueError("Cannot write to closed object") + # Add chunk to queue (blocks if queue is full) + if isinstance(chunk, (str, bytes)): + self.queue.put(chunk) + else: # if not a string, assume an iterable + for o in chunk: + self.queue.put(o) + + def close(self): + """Close the Queue for incoming data. + """ + _logger.debug("Called FileLikeQueue.close(self).") + self.is_closed = True + + #: Iterator API + def __iter__(self): + _logger.debug("Called FileLikeQueue.__iter__(self).") + return self + + def __next__(self): + _logger.debug("Called FileLikeQueue.__next__(self).") + result = self.read() + if not result: + raise StopIteration + return result + + next = __next__ # Python 2.x diff --git a/src/repdav/textgrid_dav_provider.py b/src/repdav/textgrid_dav_provider.py index 5f4ad4a14b5069451e5c94b16816fc52fd8fa6c9..b13f8a679a732fc9ded6fedf8950fa0ddcb59c63 100644 --- a/src/repdav/textgrid_dav_provider.py +++ b/src/repdav/textgrid_dav_provider.py @@ -2,18 +2,22 @@ """ import io import logging +import threading +from pprint import pformat +from tgclients.auth import TextgridAuth +from tgclients.config import TextgridConfig +from tgclients.crud import TextgridCRUD from wsgidav.dav_provider import DAVCollection, DAVNonCollection, DAVProvider from wsgidav.util import join_uri, pop_path +from repdav.stream_tools import FileLikeQueue -from .tgapi import TextgridAuth, TextgridCRUD, TextgridSearch +from .tgapi import TextgridSearch _logger = logging.getLogger(__name__) -# TODO think about caching with the session ID as key - class TextgridRoot(DAVCollection): """Top level collection that incorporates Textgrid projects. @@ -23,14 +27,15 @@ class TextgridRoot(DAVCollection): def __init__(self, path, environ): DAVCollection.__init__(self, path, environ) self._sid = environ["wsgidav.auth.user_name"] - _logger.debug("MY SID: %s", self._sid) def get_display_info(self): return {"type": "Textgrid root collection"} def get_member_names(self): _logger.debug("Called TextgridRoot.get_member_names(self).") - projects = tuple(TextgridAuth().assigned_projects(self._sid)) + config = TextgridConfig() + auth = TextgridAuth(config) + projects = tuple(auth.list_assigned_projects(self._sid)) _logger.debug("MY PROJECTS: %s", projects) return projects @@ -38,20 +43,32 @@ class TextgridRoot(DAVCollection): _logger.debug("Called TextgridRoot.get_member(self, %s).", name) return TextgridProject(join_uri(self.path, name), self.environ) + def support_etag(self): + """Return True, if this resource supports ETags.""" + return False + + def get_etag(self): + """ + See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag + + This method SHOULD be implemented, especially by non-collections. + """ + return None + # temporary override for debugging def resolve(self, script_name, path_info): """Return a _DAVResource object for the path (None, if not found). `path_info`: is a URL relative to this object. """ - _logger.debug("Called TextgridRoot.resolve(self, %s, %s).", - script_name, path_info) + _logger.debug( + "Called TextgridRoot.resolve(self, %s, %s).", script_name, path_info + ) if path_info in ("", "/"): return self assert path_info.startswith("/") name, rest = pop_path(path_info) res = self.get_member(name) - _logger.debug("TextgridRoot_NAME: %s, REST: %s, RES: %s", - name, rest, res) + _logger.debug("TextgridRoot_NAME: %s, REST: %s, RES: %s", name, rest, res) if res is None or rest in ("", "/"): return res _logger.debug("RES: %s", res) @@ -60,8 +77,7 @@ class TextgridRoot(DAVCollection): class TextgridProject(DAVCollection): def __init__(self, path, environ): - _logger.debug( - "Called TextgridProject.__init__(self, %s, environ).", path) + _logger.debug("Called TextgridProject.__init__(self, %s, environ).", path) DAVCollection.__init__(self, path, environ) self._sid = environ["wsgidav.auth.user_name"] @@ -84,8 +100,8 @@ class TextgridProject(DAVCollection): # # path resolution has to be rewritten before we can work with resource titles resources = TextgridSearch().get_project_contents( - self._sid, self.path.split("/")[-1]) - _logger.debug("RESOURCES: %s", resources) + self._sid, self.path.split("/")[-1] + ) return resources.keys() def get_member(self, name): @@ -102,31 +118,46 @@ class TextgridProject(DAVCollection): def copy_move_single(self, dest_path, is_move): pass + def support_etag(self): + """Return True, if this resource supports ETags.""" + return False + + def get_etag(self): + """ + See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag + + This method SHOULD be implemented, especially by non-collections. + """ + return None + # temporary override for debugging def resolve(self, script_name, path_info): """Return a _DAVResource object for the path (None, if not found). `path_info`: is a URL relative to this object. """ _logger.debug( - "Called TextgridProject.resolve(self, %s, %s).", script_name, path_info) + "Called TextgridProject.resolve(self, %s, %s).", script_name, path_info + ) if path_info in ("", "/"): return self assert path_info.startswith("/") name, rest = pop_path(path_info) res = self.get_member(name) - _logger.debug( - "TextgridProject_NAME: %s, REST: %s, RES: %s", name, rest, res) + _logger.debug("TextgridProject_NAME: %s, REST: %s, RES: %s", name, rest, res) if res is None or rest in ("", "/"): return res return res.resolve(join_uri(script_name, name), rest) -# TODO: merge TextgridProject with TextgridAggregation and/or derive from a common base class. + +# TODO: merge TextgridProject with TextgridAggregation +# and/or derive from a common base class. class TextgridAggregation(DAVCollection): def __init__(self, path, environ, info): _logger.debug( - "Called TextgridAggregation.__init__(self, %s, environ).", path) + "Called TextgridAggregation.__init__(self, %s, environ, info).", path + ) DAVCollection.__init__(self, path, environ) self._sid = environ["wsgidav.auth.user_name"] self._info = info @@ -143,8 +174,9 @@ class TextgridAggregation(DAVCollection): def get_member_names(self): _logger.debug("Called TextgridAggregation.get_member_names(self).") resources = TextgridSearch().get_aggregation_contents( - self._sid, self.path.split("/")[-1]) - #_logger.debug("RESOURCES: %s", resources) + self._sid, self.path.split("/")[-1] + ) + # _logger.debug("RESOURCES: %s", resources) return resources.keys() def get_member(self, name): @@ -161,20 +193,34 @@ class TextgridAggregation(DAVCollection): def copy_move_single(self, dest_path, is_move): pass + def support_etag(self): + """Return True, if this resource supports ETags.""" + return False + + def get_etag(self): + """ + See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag + + This method SHOULD be implemented, especially by non-collections. + """ + return None + # temporary override for debugging def resolve(self, script_name, path_info): """Return a _DAVResource object for the path (None, if not found). `path_info`: is a URL relative to this object. """ _logger.debug( - "Called TextgridAggregation.resolve(self, %s, %s).", script_name, path_info) + "Called TextgridAggregation.resolve(self, %s, %s).", script_name, path_info + ) if path_info in ("", "/"): return self assert path_info.startswith("/") name, rest = pop_path(path_info) res = self.get_member(name) _logger.debug( - "TextgridAggregation_NAME: %s, REST: %s, RES: %s", name, rest, res) + "TextgridAggregation_NAME: %s, REST: %s, RES: %s", name, rest, res + ) if res is None or rest in ("", "/"): return res return res.resolve(join_uri(script_name, name), rest) @@ -184,42 +230,95 @@ class TextgridResource(DAVNonCollection): """Non-Aggregation resources.""" def __init__(self, path, environ, info): - _logger.debug( - "Called TextgridResource.__init__(self, %s, environ).", path) + _logger.debug("Called TextgridResource.__init__(self, %s, environ).", path) DAVNonCollection.__init__(self, path, environ) + self._size = environ.get("CONTENT_LENGTH") self._sid = environ["wsgidav.auth.user_name"] self._info = info + self.upload_thread = None def get_content_length(self): _logger.debug("Called TextgridResource.get_content_length(self).") - # return TextgridCRUD().get_metadata(self._sid, self.path.split("/")[-1])["content-length"] return self._info[self.name]["extent"] def get_content_type(self): _logger.debug("Called TextgridResource.get_content_type(self).") - # return TextgridCRUD().get_metadata(self._sid, self.path.split("/")[-1])["content-type"] return self._info[self.name]["format"] def get_content(self): - return io.BytesIO(TextgridCRUD().get_data(self._sid, self.path.split("/")[-1])) + config = TextgridConfig() + crud = TextgridCRUD(config.crud) + return io.BytesIO(crud.read_data(self.path.split("/")[-1], self._sid).content) + + def get_content_title(self): + _logger.debug("Called TextgridResource.get_content_title(self).") + return self._info[self.name]["title"] def begin_write(self, content_type=None): - pass + _logger.debug( + "Called TextgridResource.begin_write(self, content_type=%s).", content_type + ) + + queue = FileLikeQueue(int(self._size)) + config = TextgridConfig("http://textgridlab.org/") + crud = TextgridCRUD(config.crud) + metadata = crud.read_metadata(self.path.split("/")[-1], self._sid).content + + def worker(): + _logger.debug("Called TextgridResource.begin_write.worker().") + crud.update_resource(self._sid, self.path.split("/")[-1], queue, metadata) + + thread = threading.Thread(target=worker) + thread.setDaemon(True) + thread.start() + self.upload_thread = thread + return queue + + def end_write(self, with_errors): + _logger.debug( + "Called TextgridResource.end_write(self, with_errors=%s)", with_errors + ) + if self.upload_thread: + self.upload_thread.join() + self.upload_thread = None + + def support_etag(self): + """Return True, if this resource supports ETags.""" + return False + + def get_etag(self): + """ + See http://www.webdav.org/specs/rfc4918.html#PROPERTY_getetag + + This method SHOULD be implemented, especially by non-collections. + """ + return None + + # temporary override for debugging + def resolve(self, script_name, path_info): + """Return a _DAVResource object for the path (None, if not found). + `path_info`: is a URL relative to this object. + """ + _logger.debug( + "Called TextgridResource.resolve(self, %s, %s).", script_name, path_info + ) + if path_info in ("", "/"): + return self + return None # ============================================================================ # TextgridResourceProvider # ============================================================================ class TextgridResourceProvider(DAVProvider): - """DAV provider that serves Textgrid resources. - """ - - def __init__(self): - super(TextgridResourceProvider, self).__init__() + """DAV provider that serves Textgrid resources.""" def get_resource_inst(self, path, environ): _logger.debug( - "Called TextgridResourceProvider.get_resource_inst(self, %s, %s).", path, environ) + "Called TextgridResourceProvider.get_resource_inst(self, %s, %s).", + path, + pformat(environ), + ) self._count_get_resource_inst += 1 root = TextgridRoot("/", environ) # an instance of _DAVResource (i.e. either DAVCollection or DAVNonCollection)