From 1b551e9bad9b6793c09c3ba722d3a704e6b62d9e Mon Sep 17 00:00:00 2001 From: Stefan Hynek <stefan.hynek@uni-goettingen.de> Date: Wed, 16 Feb 2022 14:10:18 +0100 Subject: [PATCH] feat(stream_tools): add a modified version of wsgidav's FileLikeQueue provide iterator api and a `__len__` method; read and write according to the length of the incoming stream --- src/repdav/stream_tools.py | 138 +++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 src/repdav/stream_tools.py diff --git a/src/repdav/stream_tools.py b/src/repdav/stream_tools.py new file mode 100644 index 0000000..390a101 --- /dev/null +++ b/src/repdav/stream_tools.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# SPDX-FileCopyrightText: 2009-2022 Martin Wendt and contributors <https://github.com/mar10/wsgidav> +# SPDX-FileCopyrightText: 2022 Stefan Hynek +# +# SPDX-License-Identifier: MIT +# +# Original file: https://github.com/mar10/wsgidav/blob/d22ada5db70812ac9201c6861c53ce5cf2157342/wsgidav/stream_tools.py +"""Implement the FileLikeQueue helper class. + +This helper class is intended to handle use cases where an incoming PUT +request should be directly streamed to a remote target. + +Usage: return an instance of this class to`begin_write` and pass it to the +consumer at the same time:: + + def begin_write(self, contentType=None): + queue = FileLikeQueue(length) + requests.post(..., data=queue) + return queue + +""" +import logging +import queue + +_logger = logging.getLogger(__name__) + + +# ============================================================================ +# FileLikeQueue +# ============================================================================ + + +class FileLikeQueue: + """A queue for chunks that behaves like a file-like. + + read() and write() are typically called from different threads. + + This helper class is intended to handle use cases where an incoming PUT + request should be directly streamed to a remote target: + + def begin_write(self, contentType=None): + # Create a proxy buffer + queue = FileLikeQueue(length) + # ... and use it as source for the consumer: + requests.post(..., data=queue) + # pass it to the PUT handler as target + return queue + """ + + def __init__(self, length: int, max_size: int=0): + self.is_closed = False + self.queue = queue.Queue(max_size) + self.unread = b"" + self.len = length + + def __len__(self) -> int: + _logger.debug( + "Called FileLikeQueue.__len__(self). Returned %s", self.len) + return self.len + + #: File API + def read(self, size: int=0) -> bytes: + """Read a chunk of bytes from queue. + + This method blocks until the requested size become available. + However, if close() was called, '' is returned immediately. + + Args: + size: Chunk length. + size = 0: Read next chunk (arbitrary length) + > 0: Read one chunk of `size` bytes (or less if stream was closed) + < 0: Read all bytes as single chunk (i.e. blocks until stream is closed) + + Returns: + Chunk in bytes. + """ + _logger.debug( + "Called FileLikeQueue.read(self, size=%s).", size) + + res = self.unread + self.unread = b"" + # Get next chunk, cumulating requested size as needed + while res == b"" or size < 0 or (size > 0 and len(res) < size): + try: + # Read pending data, blocking if neccessary + # (but handle the case that close() is called while waiting) + res += self.queue.get(True, 0.1) + except queue.Empty: + # There was no pending data: wait for more, unless close() was called + if self.is_closed: + break + # Deliver `size` bytes from buffer + if len(res) > size > 0: + self.unread = res[size:] + res = res[:size] + # Reduce the Queue's length by the number of bytes read + self.len -= len(res) + + return res + + def write(self, chunk: bytes): + """Put a chunk of bytes (or an iterable) to the queue. + + May block if max_size number of chunks is reached. + + Args: + chunk: A chunk of bytes or an iterable + """ + _logger.debug( + "Called FileLikeQueue.write(self, chunk=%s).", len(chunk)) + if self.is_closed: + raise ValueError("Cannot write to closed object") + # Add chunk to queue (blocks if queue is full) + if isinstance(chunk, (str, bytes)): + self.queue.put(chunk) + else: # if not a string, assume an iterable + for o in chunk: + self.queue.put(o) + + def close(self): + """Close the Queue for incoming data. + """ + _logger.debug("Called FileLikeQueue.close(self).") + self.is_closed = True + + #: Iterator API + def __iter__(self): + _logger.debug("Called FileLikeQueue.__iter__(self).") + return self + + def __next__(self): + _logger.debug("Called FileLikeQueue.__next__(self).") + result = self.read() + if not result: + raise StopIteration + return result + + next = __next__ # Python 2.x -- GitLab