Commit 10ac7d78 authored by mhellka's avatar mhellka
Browse files

Added support for resumeable uploads via PATCH.

Fallback gracefully to simple retry if server does not support PATCH.
parent 84c5b873
......@@ -4,15 +4,17 @@ here.
"""
import os
import ssl
import threading
import typing
import requests
from requests import HTTPError
from pycdstar3._utils import PATH_TYPES, url_split_auth, IntervalTimer
from pycdstar3.model import ApiError, JsonObject, FileDownload, FormUpdate
__all__ = "CDStar", "CDStarVault", "FormUpdate", "ApiError"
__all__ = "CDStar", "CDStarVault", "FormUpdate", "ApiError", "ResumeableUpload"
class CDStar:
......@@ -554,3 +556,83 @@ class CDStarFile:
)
# TODO: Implement meee
class ResumeableUpload:
""" A utility class for file uploads that can retry or resume failed uploads. """
def __init__(self, client, vault, archive, file, fp, total):
self.archive = archive
self.vault = vault
self.client = client
self.file = file
self.fp = fp
self.total = total
self.headers = {}
self.last_error = None
self.errors = 0
def type(self, mime_type):
self.headers["Content-Type"] = mime_type
def upload(self):
""" Upload as much as possible and return FileInfo on success.
On recoverable errors, None is returned and the next call will
resume the upload, if supported. Callers should implement some kind
of timeout or max-retry policy to prevent infinite loops. """
if not self.client.tx:
raise RuntimeError("Resumeable uploads require a valid transaction")
offset = 0
# Find out the correct offset after any error. Skip this step for first request.
try:
if self.last_error is not None:
offset = self.find_offset()
headers = self.headers.copy()
if offset > 0:
headers["Patch-Offset"] = str(offset)
headers["Patch-Length"] = str(self.total)
self.fp.seek(offset)
result = self.client.rest(
"PATCH" if offset else "PUT",
self.vault,
self.archive,
self.file,
data=self.fp,
headers=headers,
)
if not result: # 204 NO CONTENT -> incomplete
raise RuntimeError("Unexpected end of file: " + repr(self.fp))
return result
except (ConnectionError, HTTPError, ssl.SSLError, ApiError) as e:
if isinstance(e, ApiError) and e.error != "InvalidPatchOffset":
raise
self.last_error = e
self.errors += 1
return False
def find_offset(self, offset):
""" Issue a HEAD request to find the current offset. """
head = self.client.raw(
"HEAD",
self.vault,
self.archive,
self.file,
expect_status=(200, 404),
)
# Upload incomplete
if head.status_code == 200 and "Patch-Offset" in head.headers:
return int(head.headers["Patch-Offset"])
# Upload completed
if self.total == int(head.headers["Content-Length"]):
return self.total
# 404 or no PATCH support
return 0
......@@ -16,9 +16,11 @@ nothing is committed on remote side and you can simply re-run the same command.
"""
import os
import time
from contextlib import ExitStack
from pycdstar3 import FormUpdate
from pycdstar3.api import ResumeableUpload
from pycdstar3.cli import CliError
from pycdstar3.cli._utils import hbytes, KvArgType, globtype
import collections
......@@ -116,6 +118,12 @@ def register(subparsers):
parser.add_argument(
"-%", "--progress", action="store_true", help="Show progress bar."
)
parser.add_argument(
"--retry",
type=int,
default=10,
help="Retry N times on recoverable upload " "errors (default: 10)",
)
parser.add_argument("ARCHIVE", help="Archive ID, or 'new' to create a new archive")
parser.add_argument("PATH", nargs="+", help="Files or directories to upload")
......@@ -253,7 +261,6 @@ def command(ctx, args): # noqa: C901
ctx.print("{}Uploading {} files ({}) ...", dryrun, len(uploads), hbytes(total))
println = ctx.print
replace = args.force or args.update
wrap = False
# Prepare progress bar, if applicable
......@@ -267,14 +274,13 @@ def command(ctx, args): # noqa: C901
unit_divisor=1024,
dynamic_ncols=True,
file=ctx.print.file,
miniters=1,
)
stack.enter_context(pbar)
println = pbar.write
def wrap(fp):
for chunk in iter(lambda: fp.read(1024 * 64), b""):
pbar.update(len(chunk))
yield chunk
return SeekableTQDMWrapper(fp, pbar)
for i, upload in enumerate(sorted(uploads.values())):
prefix = "[{}/{}] ".format(i + 1, len(uploads))
......@@ -283,13 +289,56 @@ def command(ctx, args): # noqa: C901
if dryrun:
continue
with open(upload.local, "rb") as fp:
fp = wrap(fp) if wrap else fp
client.put_file(vault, archive, upload.remote, fp, replace=replace)
ctx.print(
"\n{}Done! Uploaded {} files ({}) to archive: {}",
dryrun,
len(uploads),
hbytes(total),
archive,
)
if wrap:
fp = wrap(fp)
upload = ResumeableUpload(
client, vault, archive, upload.remote, fp, upload.stats.st_size
)
while True:
result = upload.upload()
if result:
break
elif upload.errors < args.retry:
sleep = errors = upload.errors
println(
"WARN: Upload interrupted: {!r}\n"
"WARN: Will retry in {} seconds ({}/{})".format(
upload.last_error, sleep, errors, args.retry
)
)
time.sleep(sleep)
else:
raise upload.last_error
ctx.print(
"\n{}Done! Uploaded {} files ({}) to archive: {}",
dryrun,
len(uploads),
hbytes(total),
archive,
)
class SeekableTQDMWrapper:
def __init__(self, fp, pbar):
self.fp = fp
self.pbar = pbar
def tell(self):
return self.fp.tell()
def seek(self, offset, whence=0):
current = self.fp.tell()
self.fp.seek(offset, whence)
new = self.fp.tell()
if current != new:
self.pbar.update(new - current)
def read(self, n):
chunk = self.fp.read(n)
if chunk:
self.pbar.update(len(chunk))
return chunk
def __iter__(self):
return iter(lambda: self.read(1024 * 64), b"")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment