Skip to content
Snippets Groups Projects
Verified Commit 1b551e9b authored by Stefan Hynek's avatar Stefan Hynek :drooling_face:
Browse files

feat(stream_tools): add a modified version of wsgidav's FileLikeQueue

provide iterator api and a `__len__` method; read and write according to the length of the incoming stream
parent 5ad6f4d2
No related branches found
No related tags found
1 merge request!8Resolve "integrate with tgclients lib"
# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2009-2022 Martin Wendt and contributors <https://github.com/mar10/wsgidav>
# SPDX-FileCopyrightText: 2022 Stefan Hynek
#
# SPDX-License-Identifier: MIT
#
# Original file: https://github.com/mar10/wsgidav/blob/d22ada5db70812ac9201c6861c53ce5cf2157342/wsgidav/stream_tools.py
"""Implement the FileLikeQueue helper class.
This helper class is intended to handle use cases where an incoming PUT
request should be directly streamed to a remote target.
Usage: return an instance of this class to`begin_write` and pass it to the
consumer at the same time::
def begin_write(self, contentType=None):
queue = FileLikeQueue(length)
requests.post(..., data=queue)
return queue
"""
import logging
import queue
_logger = logging.getLogger(__name__)
# ============================================================================
# FileLikeQueue
# ============================================================================
class FileLikeQueue:
"""A queue for chunks that behaves like a file-like.
read() and write() are typically called from different threads.
This helper class is intended to handle use cases where an incoming PUT
request should be directly streamed to a remote target:
def begin_write(self, contentType=None):
# Create a proxy buffer
queue = FileLikeQueue(length)
# ... and use it as source for the consumer:
requests.post(..., data=queue)
# pass it to the PUT handler as target
return queue
"""
def __init__(self, length: int, max_size: int=0):
self.is_closed = False
self.queue = queue.Queue(max_size)
self.unread = b""
self.len = length
def __len__(self) -> int:
_logger.debug(
"Called FileLikeQueue.__len__(self). Returned %s", self.len)
return self.len
#: File API
def read(self, size: int=0) -> bytes:
"""Read a chunk of bytes from queue.
This method blocks until the requested size become available.
However, if close() was called, '' is returned immediately.
Args:
size: Chunk length.
size = 0: Read next chunk (arbitrary length)
> 0: Read one chunk of `size` bytes (or less if stream was closed)
< 0: Read all bytes as single chunk (i.e. blocks until stream is closed)
Returns:
Chunk in bytes.
"""
_logger.debug(
"Called FileLikeQueue.read(self, size=%s).", size)
res = self.unread
self.unread = b""
# Get next chunk, cumulating requested size as needed
while res == b"" or size < 0 or (size > 0 and len(res) < size):
try:
# Read pending data, blocking if neccessary
# (but handle the case that close() is called while waiting)
res += self.queue.get(True, 0.1)
except queue.Empty:
# There was no pending data: wait for more, unless close() was called
if self.is_closed:
break
# Deliver `size` bytes from buffer
if len(res) > size > 0:
self.unread = res[size:]
res = res[:size]
# Reduce the Queue's length by the number of bytes read
self.len -= len(res)
return res
def write(self, chunk: bytes):
"""Put a chunk of bytes (or an iterable) to the queue.
May block if max_size number of chunks is reached.
Args:
chunk: A chunk of bytes or an iterable
"""
_logger.debug(
"Called FileLikeQueue.write(self, chunk=%s).", len(chunk))
if self.is_closed:
raise ValueError("Cannot write to closed object")
# Add chunk to queue (blocks if queue is full)
if isinstance(chunk, (str, bytes)):
self.queue.put(chunk)
else: # if not a string, assume an iterable
for o in chunk:
self.queue.put(o)
def close(self):
"""Close the Queue for incoming data.
"""
_logger.debug("Called FileLikeQueue.close(self).")
self.is_closed = True
#: Iterator API
def __iter__(self):
_logger.debug("Called FileLikeQueue.__iter__(self).")
return self
def __next__(self):
_logger.debug("Called FileLikeQueue.__next__(self).")
result = self.read()
if not result:
raise StopIteration
return result
next = __next__ # Python 2.x
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment