forked from pulp/pulp_python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_crud_content_unit.py
More file actions
149 lines (126 loc) · 6.58 KB
/
test_crud_content_unit.py
File metadata and controls
149 lines (126 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import pytest
from urllib.parse import urljoin
from pypi_simple import PyPISimple
from pulpcore.tests.functional.utils import PulpTaskError
from pulp_python.tests.functional.constants import (
PYTHON_FIXTURES_URL,
PYTHON_PACKAGE_DATA,
PYTHON_EGG_FILENAME,
PYTHON_EGG_URL,
PYTHON_SM_FIXTURE_CHECKSUMS,
)
def test_content_crud(
python_bindings, pulpcore_bindings, python_repo_factory, download_python_file, monitor_task
):
"""Test CRUD python content unit."""
monitor_task(pulpcore_bindings.OrphansCleanupApi.cleanup({"orphan_protection_time": 0}).task)
python_file = download_python_file(PYTHON_EGG_FILENAME, PYTHON_EGG_URL)
artifact = pulpcore_bindings.ArtifactsApi.create(python_file)
# Test create
content_body = {"relative_path": PYTHON_EGG_FILENAME, "artifact": artifact.pulp_href}
response = python_bindings.ContentPackagesApi.create(**content_body)
task = monitor_task(response.task)
content = python_bindings.ContentPackagesApi.read(task.created_resources[0])
for k, v in PYTHON_PACKAGE_DATA.items():
assert getattr(content, k) == v
# Test read
result = python_bindings.ContentPackagesApi.list(filename=content.filename)
assert result.count == 1
assert result.results[0] == content
# Test partial update
with pytest.raises(AttributeError) as e:
python_bindings.ContentPackagesApi.partial_update(content.pulp_href, {"filename": "te"})
assert "object has no attribute 'partial_update'" in e.value.args[0]
# Test delete
with pytest.raises(AttributeError) as e:
python_bindings.ContentPackagesApi.delete(content.pulp_href)
assert "object has no attribute 'delete'" in e.value.args[0]
monitor_task(pulpcore_bindings.OrphansCleanupApi.cleanup({"orphan_protection_time": 0}).task)
# Test create w/ file
content_body = {"relative_path": PYTHON_EGG_FILENAME, "file": python_file}
response = python_bindings.ContentPackagesApi.create(**content_body)
task = monitor_task(response.task)
content = python_bindings.ContentPackagesApi.read(task.created_resources[0])
for k, v in PYTHON_PACKAGE_DATA.items():
assert getattr(content, k) == v
monitor_task(pulpcore_bindings.OrphansCleanupApi.cleanup({"orphan_protection_time": 0}).task)
# Test create w/ file & repository
repo = python_repo_factory()
response = python_bindings.ContentPackagesApi.create(repository=repo.pulp_href, **content_body)
task = monitor_task(response.task)
assert len(task.created_resources) == 2
content_search = python_bindings.ContentPackagesApi.list(
repository_version_added=task.created_resources[0]
)
content = python_bindings.ContentPackagesApi.read(content_search.results[0].pulp_href)
for k, v in PYTHON_PACKAGE_DATA.items():
assert getattr(content, k) == v
# Test duplicate upload
content_body = {"relative_path": PYTHON_EGG_FILENAME, "file": python_file}
response = python_bindings.ContentPackagesApi.create(**content_body)
task = monitor_task(response.task)
assert task.created_resources[0] == content.pulp_href
# Test upload same filename w/ different artifact
second_python_url = urljoin(urljoin(PYTHON_FIXTURES_URL, "packages/"), "aiohttp-3.3.0.tar.gz")
second_python_file = download_python_file("aiohttp-3.3.0.tar.gz", second_python_url)
content_body = {"relative_path": PYTHON_EGG_FILENAME, "file": second_python_file}
response = python_bindings.ContentPackagesApi.create(**content_body)
task = monitor_task(response.task)
content2 = python_bindings.ContentPackagesApi.read(task.created_resources[0])
assert content2.pulp_href != content.pulp_href
# Test upload same filename w/ different artifacts in same repo
# repo already has EGG_FILENAME w/ EGG_ARTIFACT, not upload EGG_FILENAME w/ AIO_ARTIFACT
# and see that repo will only have the second content unit inside after upload
response = python_bindings.ContentPackagesApi.create(repository=repo.pulp_href, **content_body)
task = monitor_task(response.task)
assert len(task.created_resources) == 2
assert content2.pulp_href in task.created_resources
repo_ver2 = task.created_resources[0]
content_list = python_bindings.ContentPackagesApi.list(repository_version=repo_ver2)
assert content_list.count == 1
assert content_list.results[0].pulp_href == content2.pulp_href
# Test upload w/ mismatched sha256
# If we don't perform orphan cleanup here, the upload will fail with a different error :hmm:
monitor_task(python_bindings.RepositoriesPythonApi.delete(repo.pulp_href).task)
monitor_task(pulpcore_bindings.OrphansCleanupApi.cleanup({"orphan_protection_time": 0}).task)
mismatch_sha256 = PYTHON_SM_FIXTURE_CHECKSUMS["aiohttp-3.3.0.tar.gz"]
content_body = {
"relative_path": PYTHON_EGG_FILENAME, "file": python_file, "sha256": mismatch_sha256
}
with pytest.raises(PulpTaskError) as e:
response = python_bindings.ContentPackagesApi.create(**content_body)
monitor_task(response.task)
msg = "The uploaded artifact's sha256 checksum does not match the one provided"
assert msg in e.value.task.error["description"]
@pytest.mark.parallel
def test_upload_metadata_23_spec(python_content_factory):
"""Test that packages using metadata spec 2.3 can be uploaded to pulp."""
filename = "urllib3-2.2.2-py3-none-any.whl"
with PyPISimple() as client:
page = client.get_project_page("urllib3")
for package in page.packages:
if package.filename == filename:
content = python_content_factory(filename, url=package.url)
assert content.metadata_version == "2.3"
break
@pytest.mark.parallel
def test_upload_requires_python(python_content_factory):
filename = "pip-24.3.1-py3-none-any.whl"
with PyPISimple() as client:
page = client.get_project_page("pip")
for package in page.packages:
if package.filename == filename:
content = python_content_factory(filename, url=package.url)
assert content.requires_python == ">=3.8"
break
@pytest.mark.parallel
def test_upload_metadata_24_spec(python_content_factory):
"""Test that packages using metadata spec 2.4 can be uploaded to pulp."""
filename = "urllib3-2.3.0-py3-none-any.whl"
with PyPISimple() as client:
page = client.get_project_page("urllib3")
for package in page.packages:
if package.filename == filename:
content = python_content_factory(filename, url=package.url)
assert content.metadata_version == "2.4"
break