Commit b1d45bb1 authored by Marcel Hellkamp's avatar Marcel Hellkamp
Browse files

Implemented "put --force/update/skip" and client-side conflict checking.

The "put" command is now save and fail-fast by default. Conflicting
files will stop the entire upload before it starts. Pass --skip to
get the old behavior.
parent e62f8960
......@@ -6,10 +6,10 @@ command. It works a bit as a swiss army knife for simple archive creation or
manipulation. For everything not covered here, there are more specialized commands
available.
File upload is save by default: Existing remote files are not overwritten.
You can --force re-upload or just --update newer files. Note that --update relies on a
working clock on both local and remote side, as it compares the file modification times
only.
Existing remote files are not overwritten by default to prevent accidental data
loss. The command will fail if there are any file name conflicts. To change that,
you can --force, --update or --skip conflicting files instead. Note that
--update will overwrite remote files based on modification time.
All uploads and changes are wrapped in a transaction. If something goes wrong,
nothing is committed on remote side and you can simply re-run the same command.
......@@ -21,6 +21,8 @@ from contextlib import ExitStack
from pycdstar3 import FormUpdate, ApiError
from pycdstar3.cli import CliError
from pycdstar3.cli._utils import hbytes, kvtype, globtype
import collections
import iso8601
def register(subparsers):
......@@ -29,8 +31,15 @@ def register(subparsers):
)
_grp = parser.add_mutually_exclusive_group()
# _grp.add_argument("-u", "--update", action="store_true",
# help="Update outdated remote files. (default: skip)")
_grp.add_argument(
"-s", "--skip", action="store_true", help="Skip files that exist remotely."
)
_grp.add_argument(
"-u",
"--update",
action="store_true",
help="Update remote files if local file is newer, skip others.",
)
_grp.add_argument(
"-f",
"--force",
......@@ -102,7 +111,7 @@ def register(subparsers):
"-n",
"--dry-run",
action="store_true",
help="Do not upload or change anything, just print what would have been done",
help="Just simulate an upload and print what would have been done.",
)
parser.add_argument(
"-%", "--progress", action="store_true", help="Show progress bar."
......@@ -132,18 +141,18 @@ def remote_name(base, file, prefix="", flatten=False):
target = os.path.basename(target)
if prefix:
target = os.path.join(prefix, target)
if target != os.path.normpath(target):
raise ValueError("Unable to upload relative path: {!r}".format(target))
raise CliError("Unable to upload relative path: {!r}".format(target))
return target
UploadTask = collections.namedtuple("UploadTask", "local remote stats")
def command(ctx, args): # noqa: C901
client = ctx.client
vault = ctx.vault
archive = args.ARCHIVE
force = args.force
prefix = args.prefix
prefix = prefix.lstrip("/")
......@@ -151,70 +160,107 @@ def command(ctx, args): # noqa: C901
inc_rules = args.include or []
exc_rules = args.exclude or []
uploads = {}
total = 0
for path in args.PATH:
for file in collect_files(path, args.include_hidden):
if inc_rules and not any(rule.match(file) for rule in inc_rules):
ctx.print.v("Skipping: {} (not included)", file)
elif any(rule.match(file) for rule in exc_rules):
ctx.print.v("Skipping: {} (excluded)", file)
else:
fstat = os.stat(file)
total += fstat.st_size
uploads[remote_name(".", file, prefix, args.flat)] = file, fstat
dryrun = "[dryrun] " if args.dry_run else ""
# Collect meta changes
meta = {}
for key, val in args.meta or []:
meta.setdefault(key, []).append(val)
# Collect ACL changes
acl = {}
for key, val in args.acl or []:
acl.setdefault(key, []).append(val)
if args.dry_run:
for target in sorted(uploads):
ctx.print(target)
ctx.print(
"\nWould upload {} files ({}) to {}",
len(uploads),
hbytes(total),
"new archive" if archive == "new" else "archive: " + vault + "/" + archive,
)
return
form = FormUpdate()
for key, values in meta.items():
form.meta(key, *values)
for key, values in acl.items():
form.acl(key, *values)
# Collect files to upload
uploads = {}
for path in args.PATH:
for file in collect_files(path, args.include_hidden):
if inc_rules and not any(rule.match(file) for rule in inc_rules):
ctx.print.v("Not included: {}", file)
elif any(rule.match(file) for rule in exc_rules):
ctx.print.v("Excluded: {}", file)
else:
fstat = os.stat(file)
rname = remote_name(".", file, prefix, args.flat)
task = UploadTask(file, rname, fstat)
if rname in uploads:
a, b = uploads[rname].local, task.local
msg = "Conflicting uploads: Both {!r} and {!r} map to {!r}"
raise CliError(msg.format(a, b, rname))
uploads[rname] = task
with ExitStack() as stack:
stack.enter_context(client.begin(autocommit=True))
if not dryrun:
stack.enter_context(client.begin(autocommit=True))
# Create archive if necessary, or check if it exists
created = False
if archive == "new":
archive = client.create_archive(vault, form=form)["id"]
ctx.print("Created new archive: /{}/{} ", vault, archive)
elif form.fields:
client.update_archive(vault, archive, form=form)
ctx.print("Updated archive: /{}/{} ", vault, archive)
if not dryrun:
archive = client.create_archive(vault, form=form)["id"]
ctx.print("{}Created new archive: ", dryrun, archive)
created = True
elif not client.exists(vault, archive):
raise CliError("Archive /{}/{} does not exist.".format(vault, archive))
ctx.print(
"Uploading {} files ({}) to archive: /{}/{}",
len(uploads),
hbytes(total),
vault,
archive,
)
pbar = None
if args.progress and not ctx.print.quiet:
from tqdm import tqdm
total = sum(stat.st_size for (file, stat) in uploads.values())
pbar = tqdm(
raise CliError("Archive does not exist: {}".format(archive))
# In any but --force mode, check for upload conflicts
if uploads and not created and not args.force:
ctx.print.v("Fetching remote file list...")
for remote in client.iter_files(vault, archive):
rname = remote["name"]
if rname not in uploads:
continue
if args.skip:
ctx.print.v("Skipping (--skip): {}", rname)
del uploads[rname]
elif args.update:
lmtime = uploads[rname].stats.st_mtime
rmtime = iso8601.parse_date(remote["modified"]).timestamp()
if lmtime <= rmtime:
ctx.print.v("Skipping (--update): {}", rname)
del uploads[rname]
else:
msg = "Remote file exists: {!r}\n".format(rname)
msg += "Enable --force, --update or --skip and try again."
raise CliError(msg)
# Update archive metadata
if meta:
ctx.print("{}Updating archive metadata ...", dryrun)
form = FormUpdate()
for key, values in meta.items():
form.meta(key, *values)
if not dryrun:
client.update_archive(vault, archive, form=form)
# Update archive ACLs
if acl:
ctx.print("{}Updating archive ACL ...", dryrun)
form = FormUpdate()
for key, values in acl.items():
form.acl(key, *values)
if not dryrun:
client.update_archive(vault, archive, form=form)
# Early exit if we do not have any files to upload
if not uploads:
ctx.print("{}Done!", dryrun)
return
# Start file upload ...
total = sum(e.stats.st_size for e in uploads.values())
ctx.print("{}Uploading {} files ({}) ...", dryrun, len(uploads), hbytes(total))
println = ctx.print
replace = args.force or args.update
wrap = False
# Prepare progress bar, if applicable
if args.progress and not (ctx.print.quiet or dryrun):
import tqdm
pbar = tqdm.tqdm(
total=total,
unit="b",
unit_scale=True,
......@@ -223,36 +269,27 @@ def command(ctx, args): # noqa: C901
file=ctx.print.file,
)
stack.enter_context(pbar)
println = pbar.write
def wrap(fp):
for chunk in iter(fp.read(1024 * 64), b""):
pbar.update(len(chunk))
yield chunk
for i, upload in enumerate(sorted(uploads.values())):
prefix = "[{}/{}] ".format(i + 1, len(uploads))
suffix = " (" + hbytes(upload.stats.st_size) + ")"
println(dryrun + prefix + upload.remote + suffix)
if dryrun:
continue
with open(upload.local, "rb") as fp:
fp = wrap(fp) if wrap else fp
client.put_file(vault, archive, upload.remote, fp, replace=replace)
for i, target in enumerate(sorted(uploads)):
file, stat = uploads[target]
with open(file, "rb") as fp:
line = "[{}/{}] {} ({})".format(
i + 1, len(uploads), target, hbytes(stat.st_size)
)
if pbar:
pbar.write(line)
read = fp.read
chunks = iter(lambda: read(1024 * 64), b"")
chunks = (chunk for chunk in chunks if not pbar.update(len(chunk)))
fp = chunks
else:
ctx.print(line)
try:
client.put_file(vault, archive, target, fp, replace=force)
except ApiError as e:
if e.status == 412:
# TODO: Make more specific as soon as CDSTAR returns a proper
# error code.
ctx.print.warn("Upload failed (file exists): {}", target)
continue
raise
ctx.print(
"Done! Uploaded {} files ({}) to archive: /{}/{}",
len(uploads),
hbytes(total),
vault,
archive,
)
ctx.print(
"\n{}Done! Uploaded {} files ({}) to archive: {}",
dryrun,
len(uploads),
hbytes(total),
archive,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment