Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES/1305.feature
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Change the default chunking value to "infinite". If users wants to, they can still specify a value based on their needs.
Removed the default `chunk_size` value. If necessary, a suitable value should be provided with the call or configured for the profile.
1 change: 1 addition & 0 deletions CHANGES/pulp-glue/+chunk_size.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow to specify `None` for the `chunk_size` of content upload commands to disable chunking.
2 changes: 1 addition & 1 deletion pulp-glue/src/pulp_glue/ansible/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PulpAnsibleCollectionVersionContext(PulpContentContext):
def upload(self, file: t.IO[bytes], **kwargs: t.Any) -> t.Any: # type:ignore
repository: PulpRepositoryContext | None = kwargs.pop("repository", None)
if self.capable("upload"):
chunk_size: int = kwargs.pop("chunk_size", 1000000)
chunk_size: int | None = kwargs.pop("chunk_size", None)
return super().upload(file, chunk_size, repository, **kwargs)
else:
result = self.call("upload", body={"file": file})
Expand Down
68 changes: 34 additions & 34 deletions pulp-glue/src/pulp_glue/common/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import typing as t
import warnings
from contextlib import ExitStack
from pathlib import Path

from packaging.specifiers import SpecifierSet

Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(
fake_mode: bool = False,
verify_ssl: bool | str | None = None,
verify: bool | str | None = None, # Deprecated
chunk_size: int | None = None,
) -> None:
self._api: OpenAPI | None = None
self._api_root: str = api_root
Expand All @@ -273,6 +275,7 @@ def __init__(
self.fake_mode: bool = fake_mode
if self.fake_mode:
self._api_kwargs["dry_run"] = True
self.chunk_size = chunk_size

@classmethod
def from_config_files(
Expand Down Expand Up @@ -1565,6 +1568,7 @@ def __init__(
repository_ctx: PulpRepositoryContext | None = None,
):
super().__init__(pulp_ctx, pulp_href=pulp_href, entity=entity)
assert (repository_ctx is None) or (repository_ctx.pulp_ctx is pulp_ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tell me what the intent is here? I'm missing something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated. But it's a safeguard that the repository this content context is tied to here belongs to the same server (pulp_ctx).

self.repository_ctx = repository_ctx

def list(self, limit: int, offset: int, parameters: dict[str, t.Any]) -> list[t.Any]:
Expand All @@ -1578,6 +1582,29 @@ def find(self, **kwargs: t.Any) -> t.Any:
kwargs["repository_version"] = self.repository_ctx.entity["latest_version_href"]
return super().find(**kwargs)

def _prepare_upload(
self,
body: EntityDefinition,
file: t.IO[bytes],
chunk_size: int | None,
) -> None:
_chunk_size: int | None = chunk_size or self.pulp_ctx.chunk_size
size = os.path.getsize(file.name)
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
if _chunk_size is None or _chunk_size > size:
body["file"] = file
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
self.needs_capability("upload")
from pulp_glue.core.context import PulpUploadContext

upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, _chunk_size)
body["upload"] = upload_href
else:
from pulp_glue.core.context import PulpArtifactContext

artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, _chunk_size)
body["artifact"] = artifact_href

def create(
self,
body: EntityDefinition,
Expand All @@ -1589,24 +1616,10 @@ def create(
file = body.pop("file", None)
chunk_size: int | None = body.pop("chunk_size", None)
if file:
if isinstance(file, str):
if isinstance(file, str | Path):
file = open(file, "rb")
cleanup.enter_context(file)
size = os.path.getsize(file.name)
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
if chunk_size is None or chunk_size > size:
body["file"] = file
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
self.needs_capability("upload")
from pulp_glue.core.context import PulpUploadContext

upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
body["upload"] = upload_href
else:
from pulp_glue.core.context import PulpArtifactContext

artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
body["artifact"] = artifact_href
self._prepare_upload(body, file, chunk_size)
if self.repository_ctx is not None:
body["repository"] = self.repository_ctx
return super().create(body=body, parameters=parameters, non_blocking=non_blocking)
Expand All @@ -1618,7 +1631,7 @@ def delete(self, non_blocking: bool = False) -> None:
def upload(
self,
file: t.IO[bytes],
chunk_size: int,
chunk_size: int | None,
repository: PulpRepositoryContext | None,
**kwargs: t.Any,
) -> t.Any:
Expand All @@ -1629,31 +1642,18 @@ def upload(

Parameters:
file: A file like object that supports `os.path.getsize`.
chunk_size: Size of the chunks to upload independently.
chunk_size: Size of the chunks to upload independently. `None` to disable chunking.
repository: Repository context to add the newly created content to.
kwargs: Extra args specific to the content type, passed to the create call.

Returns:
The result of the create task.
"""
self.needs_capability("upload")
size = os.path.getsize(file.name)
body: dict[str, t.Any] = {**kwargs}
if not self.pulp_ctx.fake_mode: # Skip the uploading part in fake_mode
if chunk_size > size:
body["file"] = file
elif self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
from pulp_glue.core.context import PulpUploadContext

upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
body["upload"] = upload_href
else:
from pulp_glue.core.context import PulpArtifactContext

artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
body["artifact"] = artifact_href
if repository:
body["repository"] = repository
self._prepare_upload(body, file, chunk_size)
if repository is not None:
body["repository"] = repository
return self.create(body=body)


Expand Down
11 changes: 7 additions & 4 deletions pulp-glue/src/pulp_glue/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ class PulpArtifactContext(PulpEntityContext):
ID_PREFIX = "artifacts"

def upload(
self, file: t.IO[bytes], chunk_size: int = 1000000, sha256: str | None = None
self,
file: t.IO[bytes],
chunk_size: int | None = None,
sha256: str | None = None,
) -> t.Any:
size = os.path.getsize(file.name)

sha256_hasher = hashlib.sha256()
for chunk in iter(lambda: file.read(chunk_size), b""):
for chunk in iter(lambda: file.read(10_000_000), b""):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for 10Mb here? Or is it just "a reasonable amount at a time" to hash?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not any sort of chunk to be used on the network. Here we only hash the file to secure the upload later.
So no reason but that the "other" chunk_size now became unreliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I was just curious if 10Mb was "magic" in some way, or just "a reasonable amount" :)

sha256_hasher.update(chunk)
sha256_digest = sha256_hasher.hexdigest()
file.seek(0)
Expand All @@ -71,8 +74,8 @@ def upload(
self._entity = {"pulp_href": "<FAKE_ENTITY>", "sha256": sha256, "size": size}
self._entity_lookup = {}
return self._entity["pulp_href"]
if chunk_size > size:
# if chunk_size is bigger than the file size, just upload it directly
if chunk_size is None or chunk_size > size:
# upload it directly
artifact: dict[str, t.Any] = self.create({"sha256": sha256_digest, "file": file})
self.pulp_href = artifact["pulp_href"]
return artifact["pulp_href"]
Expand Down
27 changes: 4 additions & 23 deletions pulp-glue/src/pulp_glue/rpm/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import typing as t

from pulp_glue.common.context import (
Expand Down Expand Up @@ -156,7 +155,7 @@ def list_iterator(
def upload(
self,
file: t.IO[bytes],
chunk_size: int,
chunk_size: int | None,
repository: PulpRepositoryContext | None,
**kwargs: t.Any,
) -> t.Any:
Expand All @@ -167,42 +166,24 @@ def upload(

Parameters:
file: A file like object that supports `os.path.getsize`.
chunk_size: Size of the chunks to upload independently.
chunk_size: Size of the chunks to upload independently. `None` to disable chunking.
repository: Repository context to add the newly created content to.
kwargs: Extra args specific to the content type, passed to the create call.

Returns:
The result of the create task.
"""
self.needs_capability("upload")
size = os.path.getsize(file.name)
body: dict[str, t.Any] = {**kwargs}

if not self.pulp_ctx.fake_mode:
if chunk_size > size:
# Small file: direct upload
body["file"] = file
else:
# Large file: chunked upload
if self.pulp_ctx.has_plugin(PluginRequirement("core", specifier=">=3.20.0")):
from pulp_glue.core.context import PulpUploadContext

upload_href = PulpUploadContext(self.pulp_ctx).upload_file(file, chunk_size)
body["upload"] = upload_href
else:
from pulp_glue.core.context import PulpArtifactContext

artifact_href = PulpArtifactContext(self.pulp_ctx).upload(file, chunk_size)
body["artifact"] = artifact_href
self._prepare_upload(body, file, chunk_size)

# For rpm plugin >= 3.32.5, use synchronous upload endpoint when no repository is provided
# For older versions, always use the create endpoint (backward compatibility)
if repository is None and self.pulp_ctx.has_plugin(
PluginRequirement("rpm", specifier=">=3.32.5")
):
# "Synchronous upload"
return self.call("upload", body=body)

# Repository is specified or older rpm version: use create endpoint (async path)
if repository is not None:
body["repository"] = repository
return self.create(body=body)
Expand Down
2 changes: 2 additions & 0 deletions src/pulp_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def main(
verbose: int,
background: bool,
refresh_api: bool,
chunk_size: int | None,
dry_run: bool,
timeout: int,
cid: str,
Expand Down Expand Up @@ -231,6 +232,7 @@ def main(
password=password,
oauth2_client_id=client_id,
oauth2_client_secret=client_secret,
chunk_size=chunk_size,
)


Expand Down
23 changes: 21 additions & 2 deletions src/pulp_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@

from pulp_glue.common.i18n import get_translation

from pulp_cli.generic import HEADER_REGEX, REGISTERED_OUTPUT_FORMATTERS, _unset, pulp_group
from pulp_cli.generic import (
HEADER_REGEX,
REGISTERED_OUTPUT_FORMATTERS,
_unset,
chunk_size_callback,
parse_size,
pulp_group,
)

if sys.version_info >= (3, 11):
import tomllib
Expand Down Expand Up @@ -43,6 +50,7 @@
"key",
"verify_ssl",
"format",
"chunk_size",
"dry_run",
"timeout",
"verbose",
Expand Down Expand Up @@ -101,6 +109,12 @@ def headers_callback(
default="json",
help=_("Format of the response"),
),
click.option(
"--chunk-size",
help=_("Chunk size to break up {entity} into. Defaults to not chunking at all."),
default=None,
callback=chunk_size_callback,
),
click.option(
"--dry-run/--force",
default=False,
Expand Down Expand Up @@ -158,6 +172,11 @@ def validate_config(config: dict[str, t.Any], strict: bool = False) -> None:
errors.append(_("'format' is not one of {choices}").format(choices=FORMAT_CHOICES))
if "verify_ssl" in config and not isinstance(config["verify_ssl"], bool):
errors.append(_("'verify_ssl' is not a bool"))
if "chunk_size" in config:
try:
parse_size(config["chunk_size"])
except click.ClickException as e:
errors.append(e.message)
if "dry_run" in config and not isinstance(config["dry_run"], bool):
errors.append(_("'dry_run' is not a bool"))
if "timeout" in config and not isinstance(config["timeout"], int):
Expand Down Expand Up @@ -186,7 +205,7 @@ def validate_config(config: dict[str, t.Any], strict: bool = False) -> None:
missing_settings = (
set(SETTINGS)
- set(config.keys())
- {"plugins", "username", "password", "client_id", "client_secret"}
- {"plugins", "username", "password", "client_id", "client_secret", "chunk_size"}
)
if missing_settings:
errors.append(_("Missing settings: '{}'.").format("','".join(missing_settings)))
Expand Down
33 changes: 29 additions & 4 deletions src/pulp_cli/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
from pulp_glue.common.i18n import get_translation
from pulp_glue.common.openapi import AuthProviderBase

if sys.version_info >= (3, 13):
from warnings import deprecated
else:
T = t.TypeVar("T")

def deprecated(s: str) -> t.Callable[[T], T]:
def _inner(f: T) -> T:
return f

return _inner


try:
from pygments import highlight
from pygments.formatters import Terminal256Formatter
Expand Down Expand Up @@ -67,7 +79,6 @@ def _unset(value: t.Any) -> bool:
translation = get_translation(__package__)
_ = translation.gettext


_AnyCallable = t.Callable[..., t.Any]
FC = t.TypeVar("FC", bound=_AnyCallable | click.Command)

Expand Down Expand Up @@ -156,6 +167,7 @@ def __init__(
password: str | None = None,
oauth2_client_id: str | None = None,
oauth2_client_secret: str | None = None,
chunk_size: int | None = None,
) -> None:
self.username = username
self.password = password
Expand All @@ -172,6 +184,7 @@ def __init__(
background_tasks=background_tasks,
timeout=timeout,
domain=domain,
chunk_size=chunk_size,
)
self.format = format

Expand Down Expand Up @@ -717,9 +730,9 @@ def _callback(
units = {"B": 1, "KB": 10**3, "MB": 10**6, "GB": 10**9, "TB": 10**12}


def parse_size_callback(ctx: click.Context, param: click.Parameter, value: str | None) -> int:
def parse_size(value: str | None) -> int | None:
if value is None:
return 8 * 10**9
return None
size = value.strip().upper()
match = re.match(r"^([0-9]+)\s*([KMGT]?B)?$", size)
if not match:
Expand All @@ -728,6 +741,18 @@ def parse_size_callback(ctx: click.Context, param: click.Parameter, value: str |
return int(float(number) * units[unit])


def chunk_size_callback(
ctx: click.Context, param: click.Parameter, value: str | None
) -> int | None:
if value == "":
# Actually override the default.
return None
Comment on lines +747 to +749
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we push these into parse_size()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We commonly "use" "" to represent resetting over None which was "not specified". I don't think we need that in the settings file.

return parse_size(value)


parse_size_callback = deprecated("Use 'chunk_size_callback' instead.")(chunk_size_callback)


def null_callback(ctx: click.Context, param: click.Parameter, value: str | None) -> str | None:
if value == "":
return "null"
Expand Down Expand Up @@ -1220,7 +1245,7 @@ def _type_callback(ctx: click.Context, param: click.Parameter, value: str | None
"--chunk-size",
help=_("Chunk size to break up {entity} into. Defaults to not chunking at all."),
default=None,
callback=parse_size_callback,
callback=chunk_size_callback,
)

pulp_created_gte_option = pulp_option(
Expand Down
Loading