From 947b4585a918a6b9bb23dbf8334ba4d6cacdddb5 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 18:56:27 +0100 Subject: [PATCH 1/9] tests: split base test cases into joinable/readable/writable tests --- upath/tests/cases.py | 984 ++++++++++++++++++++++++------------------- 1 file changed, 554 insertions(+), 430 deletions(-) diff --git a/upath/tests/cases.py b/upath/tests/cases.py index 16255fe0..4b6c0c41 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -16,37 +16,244 @@ from upath._stat import UPathStatResult from upath.types import StatResultType +# ============================================================================= +# JoinablePathTests: Tests for pure path operations (no I/O) +# ============================================================================= -class BaseTests: - SUPPORTS_EMPTY_DIRS = True + +class JoinablePathTests: + """Tests for JoinablePath interface. + + These tests verify pure path operations that don't require filesystem access: + - Path parsing and components (parts, parents, name, stem, suffix, etc.) + - Path manipulation (with_name, with_suffix, with_stem, joinpath, etc.) + - Path comparison and hashing + - Serialization (pickling) + - URI handling + """ path: UPath - def test_cwd(self): - with pytest.raises(NotImplementedError): - self.path.cwd() + def test_is_absolute(self): + assert self.path.is_absolute() is True - def test_home(self): - with pytest.raises(NotImplementedError): - self.path.home() + def test_parents(self): + p = self.path.joinpath("folder1", "file1.txt") + assert p.parents[0] == p.parent + assert p.parents[1] == p.parent.parent + assert p.parents[0].name == "folder1" + assert p.parents[1].name == self.path.name + + def test_with_name(self): + path = self.path / "file.txt" + path = path.with_name("file.zip") + assert path.name == "file.zip" + + def test_with_suffix(self): + path = self.path / "file.txt" + path = path.with_suffix(".zip") + assert path.suffix == ".zip" + + def test_suffix(self): + path = self.path / "no_suffix" + assert path.suffix == "" + path = self.path / "file.txt" + assert path.suffix == ".txt" + path = self.path / "archive.tar.gz" + assert path.suffix == ".gz" + + def test_suffixes(self): + path = self.path / "no_suffix" + assert path.suffixes == [] + path = self.path / "file.txt" + assert path.suffixes == [".txt"] + path = self.path / "archive.tar.gz" + assert path.suffixes == [".tar", ".gz"] + + def test_with_stem(self): + if sys.version_info < (3, 9): + pytest.skip("with_stem only available on py3.9+") + path = self.path / "file.txt" + path = path.with_stem("document") + assert path.stem == "document" + + def test_repr_after_with_name(self): + p = self.path.joinpath("file.txt").with_name("file.zip") + assert "file.zip" in repr(p) + + def test_repr_after_with_suffix(self): + p = self.path.joinpath("file.txt").with_suffix(".zip") + assert "file.zip" in repr(p) + + def test_child_path(self): + path_str = self.path.__vfspath__() + path_a = UPath( + path_str, "folder", protocol=self.path.protocol, **self.path.storage_options + ) + path_b = self.path / "folder" + + assert str(path_a) == str(path_b) + assert path_a.root == path_b.root + assert path_a.drive == path_b.drive + + def test_copy_path(self): + path = self.path + copy_path = UPath(path) + + assert type(path) is type(copy_path) + assert str(path) == str(copy_path) + assert path.drive == copy_path.drive + assert path.root == copy_path.root + assert path.parts == copy_path.parts + assert path.fs.storage_options == copy_path.fs.storage_options + + def test_pickling(self): + path = self.path + pickled_path = pickle.dumps(path) + recovered_path = pickle.loads(pickled_path) + + assert type(path) is type(recovered_path) + assert str(path) == str(recovered_path) + assert path.fs.storage_options == recovered_path.fs.storage_options + + def test_pickling_child_path(self): + path = self.path / "subfolder" / "subsubfolder" + pickled_path = pickle.dumps(path) + recovered_path = pickle.loads(pickled_path) + + assert type(path) is type(recovered_path) + assert str(path) == str(recovered_path) + assert path.drive == recovered_path.drive + assert path.root == recovered_path.root + assert path.parts == recovered_path.parts + assert path.fs.storage_options == recovered_path.fs.storage_options + assert path.storage_options == recovered_path.storage_options + + def test_as_uri(self): + # test that we can reconstruct the path from the uri + p0 = self.path + uri = p0.as_uri() + p1 = UPath(uri, **p0.storage_options) + assert p0 == p1 + + def test_protocol(self): + protocol = self.path.protocol + protocols = [p] if isinstance((p := type(self.path.fs).protocol), str) else p + print(protocol, protocols) + assert protocol in protocols + + def test_storage_options(self): + storage_options = self.path.storage_options + assert storage_options == self.path.fs.storage_options + + def test_hashable(self): + assert hash(self.path) + + def test_storage_options_dont_affect_hash(self): + cls = type(self.path) + p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p1 = cls(str(self.path), test_extra=2, **self.path.storage_options) + assert hash(p0) == hash(p1) + + def test_eq(self): + cls = type(self.path) + p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p1 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p2 = cls(str(self.path), test_extra=2, **self.path.storage_options) + assert p0 == p1 + assert p0 != p2 + assert p1 != p2 + + def test_relative_to(self): + base = self.path + child = self.path / "folder1" / "file1.txt" + relative = child.relative_to(base) + assert str(relative) == "folder1/file1.txt" + + def test_trailing_slash_joinpath_is_identical(self): + # setup + cls = type(self.path) + protocol = self.path.protocol + path = self.path.path + sopts = self.path.storage_options + if not path: + path = "something" + path_with_slash = "something/" + elif path.endswith("/"): + path_with_slash = path + path = path.removeprefix("/") + else: + path_with_slash = path + "/" + key = "key/" + + # test + a = cls(path_with_slash + key, protocol=protocol, **sopts) + b = cls(path_with_slash, key, protocol=protocol, **sopts) + c = cls(path_with_slash, protocol=protocol, **sopts).joinpath(key) + d = cls(path_with_slash, protocol=protocol, **sopts) / key + assert a.path == b.path == c.path == d.path + + def test_trailing_slash_is_stripped(self): + has_meaningful_trailing_slash = getattr( + self.path.parser, "has_meaningful_trailing_slash", False + ) + if has_meaningful_trailing_slash: + assert not self.path.joinpath("key").path.endswith("/") + assert self.path.joinpath("key/").path.endswith("/") + else: + assert not self.path.joinpath("key").path.endswith("/") + assert not self.path.joinpath("key/").path.endswith("/") + + def test_parents_are_absolute(self): + # this is a cross implementation compatible way to ensure that + # the path representing the root is absolute + is_absolute = [p.is_absolute() for p in self.path.parents] + assert all(is_absolute) + + def test_private_url_attr_in_sync(self): + p = self.path + p1 = self.path.joinpath("c") + p2 = self.path / "c" + assert p1._url == p2._url + assert p1._url != p._url + assert p1.protocol == p2.protocol + + +# ============================================================================= +# ReadablePathTests: Tests for readable path operations +# ============================================================================= + + +class ReadablePathTests: + """Tests for ReadablePath interface. + + These tests verify operations that read from the filesystem: + - File/directory existence and type checks (exists, is_dir, is_file, etc.) + - File metadata (stat, info) + - Reading file contents (read_bytes, read_text, open for reading) + - Directory listing (iterdir, glob, rglob) + - Copy operations (read source) + """ + + path: UPath def test_stat(self): - stat = self.path.stat() + stat_ = self.path.stat() # for debugging os.stat_result compatibility - attrs = {attr for attr in dir(stat) if attr.startswith("st_")} + attrs = {attr for attr in dir(stat_) if attr.startswith("st_")} print(attrs) - assert isinstance(stat, StatResultType) - assert len(tuple(stat)) == os.stat_result.n_sequence_fields + assert isinstance(stat_, StatResultType) + assert len(tuple(stat_)) == os.stat_result.n_sequence_fields with warnings.catch_warnings(): warnings.simplefilter("error") for idx in range(os.stat_result.n_sequence_fields): - assert isinstance(stat[idx], int) + assert isinstance(stat_[idx], int) for attr in UPathStatResult._fields + UPathStatResult._fields_extra: - assert hasattr(stat, attr) + assert hasattr(stat_, attr) def test_stat_dir_st_mode(self): base = self.path.stat() # base folder @@ -60,10 +267,6 @@ def test_stat_st_size(self): file1 = self.path.joinpath("file1.txt").stat() assert file1.st_size == 11 - def test_chmod(self): - with pytest.raises(NotImplementedError): - self.path.joinpath("file1.txt").chmod(777) - @pytest.mark.parametrize( "url, expected", [("file1.txt", True), ("fakefile.txt", False)] ) @@ -103,10 +306,6 @@ def test_glob(self, pathlib_base, pattern): print(mock_glob_normalized, path_glob_normalized) assert mock_glob_normalized == path_glob_normalized - def test_group(self): - with pytest.raises(NotImplementedError): - self.path.group() - def test_is_dir(self): assert self.path.is_dir() @@ -119,9 +318,6 @@ def test_is_file(self): assert path_exists.is_file() assert not (self.path / "not-existing-file.txt").is_file() - def test_is_absolute(self): - assert self.path.is_absolute() is True - def test_is_mount(self): try: self.path.is_mount() @@ -177,48 +373,164 @@ def test_iterdir_trailing_slash(self): files_slash = list(self.path.joinpath("folder1/").iterdir()) assert files_noslash == files_slash - def test_parents(self): - p = self.path.joinpath("folder1", "file1.txt") - assert p.is_file() - assert p.parents[0] == p.parent - assert p.parents[1] == p.parent.parent - assert p.parents[0].name == "folder1" - assert p.parents[1].name == self.path.name - - def test_lchmod(self): - try: - self.path.lchmod(mode=0o777) - except UnsupportedOperation: - pass - def test_lstat(self): with pytest.warns(UserWarning, match=r"[A-Za-z]+.stat"): st = self.path.lstat() assert st is not None - def test_mkdir(self): - new_dir = self.path.joinpath("new_dir") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - assert new_dir.exists() + def test_open(self): + p = self.path.joinpath("file1.txt") + with p.open(mode="r") as f: + assert f.read() == "hello world" + with p.open(mode="rb") as f: + assert f.read() == b"hello world" - def test_mkdir_exists_ok_true(self): - new_dir = self.path.joinpath("new_dir_may_exists") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - new_dir.mkdir(exist_ok=True) + def test_open_buffering(self): + p = self.path.joinpath("file1.txt") + p.open(buffering=-1) - def test_mkdir_exists_ok_false(self): - new_dir = self.path.joinpath("new_dir_may_not_exists") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - with pytest.raises(FileExistsError): - new_dir.mkdir(exist_ok=False) + def test_open_block_size(self): + p = self.path.joinpath("file1.txt") + with p.open(mode="r", block_size=8192) as f: + assert f.read() == "hello world" - def test_mkdir_parents_true_exists_ok_true(self): + def test_open_errors(self): + p = self.path.joinpath("file1.txt") + with p.open(mode="r", encoding="ascii", errors="strict") as f: + assert f.read() == "hello world" + + def test_read_bytes(self, pathlib_base): + mock = self.path.joinpath("file2.txt") + pl = pathlib_base.joinpath("file2.txt") + assert mock.read_bytes() == pl.read_bytes() + + def test_read_text(self, local_testdir): + upath = self.path.joinpath("file1.txt") + assert ( + upath.read_text() == Path(local_testdir).joinpath("file1.txt").read_text() + ) + + def test_rglob(self, pathlib_base): + pattern = "*.txt" + result = [*self.path.rglob(pattern)] + expected = [*pathlib_base.rglob(pattern)] + assert len(result) == len(expected) + + def test_samefile(self): + f1 = self.path.joinpath("file1.txt") + f2 = self.path.joinpath("file2.txt") + + assert f1.samefile(f2) is False + assert f1.samefile(f2.path) is False + assert f1.samefile(f1) is True + assert f1.samefile(f1.path) is True + + def test_info(self): + p0 = self.path.joinpath("file1.txt") + p1 = self.path.joinpath("folder1") + + assert p0.info.exists() is True + assert p0.info.is_file() is True + assert p0.info.is_dir() is False + assert p0.info.is_symlink() is False + assert p1.info.exists() is True + assert p1.info.is_file() is False + assert p1.info.is_dir() is True + assert p1.info.is_symlink() is False + + def test_copy_local(self, tmp_path: Path): + target = UPath(tmp_path) / "target-file1.txt" + + source = self.path / "file1.txt" + content = source.read_text() + source.copy(target) + assert target.exists() + assert target.read_text() == content + + def test_copy_into_local(self, tmp_path: Path): + target_dir = UPath(tmp_path) / "target-dir" + target_dir.mkdir() + + source = self.path / "file1.txt" + content = source.read_text() + source.copy_into(target_dir) + target = target_dir / "file1.txt" + assert target.exists() + assert target.read_text() == content + + def test_copy_memory(self, clear_fsspec_memory_cache): + target = UPath("memory:///target-file1.txt") + source = self.path / "file1.txt" + content = source.read_text() + source.copy(target) + assert target.exists() + assert target.read_text() == content + + def test_copy_into_memory(self, clear_fsspec_memory_cache): + target_dir = UPath("memory:///target-dir") + target_dir.mkdir() + + source = self.path / "file1.txt" + content = source.read_text() + source.copy_into(target_dir) + target = target_dir / "file1.txt" + assert target.exists() + assert target.read_text() == content + + def test_read_with_fsspec(self): + p = self.path.joinpath("file2.txt") + + protocol = p.protocol + storage_options = p.storage_options + path = p.path + + fs = filesystem(protocol, **storage_options) + with fs.open(path) as f: + assert f.read() == b"hello world" + + +# ============================================================================= +# WritablePathTests: Tests for writable path operations +# ============================================================================= + + +class WritablePathTests: + """Tests for WritablePath interface. + + These tests verify operations that write to the filesystem: + - Creating directories (mkdir) + - Creating files (touch) + - Writing file contents (write_bytes, write_text) + - Removing files/directories (unlink, rmdir) + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath + + def test_mkdir(self): + new_dir = self.path.joinpath("new_dir") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + assert new_dir.exists() + + def test_mkdir_exists_ok_true(self): + new_dir = self.path.joinpath("new_dir_may_exists") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + new_dir.mkdir(exist_ok=True) + + def test_mkdir_exists_ok_false(self): + new_dir = self.path.joinpath("new_dir_may_not_exists") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + with pytest.raises(FileExistsError): + new_dir.mkdir(exist_ok=False) + + def test_mkdir_parents_true_exists_ok_true(self): new_dir = self.path.joinpath("parent", "new_dir_may_not_exist") new_dir.mkdir(parents=True) if not self.SUPPORTS_EMPTY_DIRS: @@ -233,45 +545,78 @@ def test_mkdir_parents_true_exists_ok_false(self): with pytest.raises(FileExistsError): new_dir.mkdir(parents=True, exist_ok=False) - def test_open(self): - p = self.path.joinpath("file1.txt") - with p.open(mode="r") as f: - assert f.read() == "hello world" - with p.open(mode="rb") as f: - assert f.read() == b"hello world" + def test_touch_exists_ok_false(self): + f = self.path.joinpath("file1.txt") + assert f.exists() + with pytest.raises(FileExistsError): + f.touch(exist_ok=False) - def test_open_buffering(self): - p = self.path.joinpath("file1.txt") - p.open(buffering=-1) + def test_touch_exists_ok_true(self): + f = self.path.joinpath("file1.txt") + assert f.exists() + data = f.read_text() + f.touch(exist_ok=True) + assert f.read_text() == data - def test_open_block_size(self): - p = self.path.joinpath("file1.txt") - with p.open(mode="r", block_size=8192) as f: - assert f.read() == "hello world" + def test_touch(self): + path = self.path.joinpath("test_touch.txt") + assert not path.exists() + path.touch() + assert path.exists() - def test_open_errors(self): - p = self.path.joinpath("file1.txt") - with p.open(mode="r", encoding="ascii", errors="strict") as f: - assert f.read() == "hello world" + def test_touch_unlink(self): + path = self.path.joinpath("test_touch.txt") + path.touch() + assert path.exists() + path.unlink() + assert not path.exists() - def test_owner(self): - with pytest.raises(NotImplementedError): - self.path.owner() + # should raise FileNotFoundError since file is missing + with pytest.raises(FileNotFoundError): + path.unlink() - def test_read_bytes(self, pathlib_base): - mock = self.path.joinpath("file2.txt") - pl = pathlib_base.joinpath("file2.txt") - assert mock.read_bytes() == pl.read_bytes() + # file doesn't exists, but missing_ok is True + path.unlink(missing_ok=True) - def test_read_text(self, local_testdir): - upath = self.path.joinpath("file1.txt") - assert ( - upath.read_text() == Path(local_testdir).joinpath("file1.txt").read_text() - ) + def test_write_bytes(self, pathlib_base): + fn = "test_write_bytes.txt" + s = b"hello_world" + path = self.path.joinpath(fn) + path.write_bytes(s) + assert path.read_bytes() == s - def test_readlink(self): - with pytest.raises(NotImplementedError): - self.path.readlink() + def test_write_text(self, pathlib_base): + fn = "test_write_text.txt" + s = "hello_world" + path = self.path.joinpath(fn) + path.write_text(s) + assert path.read_text() == s + + def test_symlink_to(self): + pass + + def test_link_to(self): + pass + + +# ============================================================================= +# ReadWritePathTests: Tests requiring both read and write operations +# ============================================================================= + + +class ReadWritePathTests: + """Tests requiring both ReadablePath and WritablePath interfaces. + + These tests verify operations that need both read and write access: + - Rename/move operations + - File system setup/teardown + - Operations that verify write results by reading + - rmdir operations + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath def test_rename(self): p_source = self.path.joinpath("file1.txt") @@ -376,91 +721,25 @@ def test_replace(self): def test_resolve(self): pass - def test_rglob(self, pathlib_base): - pattern = "*.txt" - result = [*self.path.rglob(pattern)] - expected = [*pathlib_base.rglob(pattern)] - assert len(result) == len(expected) - - def test_symlink_to(self): - pass + def test_rmdir_no_dir(self): + p = self.path.joinpath("file1.txt") + with pytest.raises(NotADirectoryError): + p.rmdir() - def test_touch_exists_ok_false(self): - f = self.path.joinpath("file1.txt") - assert f.exists() - with pytest.raises(FileExistsError): - f.touch(exist_ok=False) + def test_iterdir_no_dir(self): + p = self.path.joinpath("file1.txt") + assert p.is_file() + with pytest.raises(NotADirectoryError): + _ = list(p.iterdir()) - def test_touch_exists_ok_true(self): - f = self.path.joinpath("file1.txt") - assert f.exists() - data = f.read_text() - f.touch(exist_ok=True) - assert f.read_text() == data + def test_rmdir_not_empty(self): + p = self.path.joinpath("folder1") + with pytest.raises(OSError, match="not empty"): + p.rmdir(recursive=False) - def test_touch(self): - path = self.path.joinpath("test_touch.txt") - assert not path.exists() - path.touch() - assert path.exists() - - def test_touch_unlink(self): - path = self.path.joinpath("test_touch.txt") - path.touch() - assert path.exists() - path.unlink() - assert not path.exists() - - # should raise FileNotFoundError since file is missing - with pytest.raises(FileNotFoundError): - path.unlink() - - # file doesn't exists, but missing_ok is True - path.unlink(missing_ok=True) - - def test_link_to(self): - pass - - def test_write_bytes(self, pathlib_base): - fn = "test_write_bytes.txt" - s = b"hello_world" - path = self.path.joinpath(fn) - path.write_bytes(s) - assert path.read_bytes() == s - - def test_write_text(self, pathlib_base): - fn = "test_write_text.txt" - s = "hello_world" - path = self.path.joinpath(fn) - path.write_text(s) - assert path.read_text() == s - - def prepare_file_system(self): - self.make_top_folder() - self.make_test_files() - - def make_top_folder(self): - self.path.mkdir(parents=True, exist_ok=True) - - def make_test_files(self): - folder1 = self.path.joinpath("folder1") - folder1.mkdir(exist_ok=True) - folder1_files = ["file1.txt", "file2.txt"] - for f in folder1_files: - p = folder1.joinpath(f) - p.touch() - p.write_text(f) - - file1 = self.path.joinpath("file1.txt") - file1.touch() - file1.write_text("hello world") - file2 = self.path.joinpath("file2.txt") - file2.touch() - file2.write_bytes(b"hello world") - - def test_fsspec_compat(self): - fs = self.path.fs - content = b"a,b,c\n1,2,3\n4,5,6" + def test_fsspec_compat(self): + fs = self.path.fs + content = b"a,b,c\n1,2,3\n4,5,6" upath1 = self.path / "output1.csv" p1 = upath1.path @@ -482,222 +761,6 @@ def test_fsspec_compat(self): assert upath2.read_bytes() == content upath2.unlink() - def test_pickling(self): - path = self.path - pickled_path = pickle.dumps(path) - recovered_path = pickle.loads(pickled_path) - - assert type(path) is type(recovered_path) - assert str(path) == str(recovered_path) - assert path.fs.storage_options == recovered_path.fs.storage_options - - def test_pickling_child_path(self): - path = self.path / "subfolder" / "subsubfolder" - pickled_path = pickle.dumps(path) - recovered_path = pickle.loads(pickled_path) - - assert type(path) is type(recovered_path) - assert str(path) == str(recovered_path) - assert path.drive == recovered_path.drive - assert path.root == recovered_path.root - assert path.parts == recovered_path.parts - assert path.fs.storage_options == recovered_path.fs.storage_options - assert path.storage_options == recovered_path.storage_options - - def test_child_path(self): - path_str = self.path.__vfspath__() - path_a = UPath( - path_str, "folder", protocol=self.path.protocol, **self.path.storage_options - ) - path_b = self.path / "folder" - - assert str(path_a) == str(path_b) - assert path_a.root == path_b.root - assert path_a.drive == path_b.drive - - def test_copy_path(self): - path = self.path - copy_path = UPath(path) - - assert type(path) is type(copy_path) - assert str(path) == str(copy_path) - assert path.drive == copy_path.drive - assert path.root == copy_path.root - assert path.parts == copy_path.parts - assert path.fs.storage_options == copy_path.fs.storage_options - - def test_with_name(self): - path = self.path / "file.txt" - path = path.with_name("file.zip") - assert path.name == "file.zip" - - def test_with_suffix(self): - path = self.path / "file.txt" - path = path.with_suffix(".zip") - assert path.suffix == ".zip" - - def test_suffix(self): - path = self.path / "no_suffix" - assert path.suffix == "" - path = self.path / "file.txt" - assert path.suffix == ".txt" - path = self.path / "archive.tar.gz" - assert path.suffix == ".gz" - - def test_suffixes(self): - path = self.path / "no_suffix" - assert path.suffixes == [] - path = self.path / "file.txt" - assert path.suffixes == [".txt"] - path = self.path / "archive.tar.gz" - assert path.suffixes == [".tar", ".gz"] - - def test_with_stem(self): - if sys.version_info < (3, 9): - pytest.skip("with_stem only available on py3.9+") - path = self.path / "file.txt" - path = path.with_stem("document") - assert path.stem == "document" - - def test_repr_after_with_name(self): - p = self.path.joinpath("file.txt").with_name("file.zip") - assert "file.zip" in repr(p) - - def test_repr_after_with_suffix(self): - p = self.path.joinpath("file.txt").with_suffix(".zip") - assert "file.zip" in repr(p) - - def test_rmdir_no_dir(self): - p = self.path.joinpath("file1.txt") - with pytest.raises(NotADirectoryError): - p.rmdir() - - def test_iterdir_no_dir(self): - p = self.path.joinpath("file1.txt") - assert p.is_file() - with pytest.raises(NotADirectoryError): - _ = list(p.iterdir()) - - def test_rmdir_not_empty(self): - p = self.path.joinpath("folder1") - with pytest.raises(OSError, match="not empty"): - p.rmdir(recursive=False) - - def test_private_url_attr_in_sync(self): - p = self.path - p1 = self.path.joinpath("c") - p2 = self.path / "c" - assert p1._url == p2._url - assert p1._url != p._url - assert p1.protocol == p2.protocol - - def test_as_uri(self): - # test that we can reconstruct the path from the uri - p0 = self.path - uri = p0.as_uri() - p1 = UPath(uri, **p0.storage_options) - assert p0 == p1 - - def test_protocol(self): - protocol = self.path.protocol - protocols = [p] if isinstance((p := type(self.path.fs).protocol), str) else p - print(protocol, protocols) - assert protocol in protocols - - def test_storage_options(self): - storage_options = self.path.storage_options - assert storage_options == self.path.fs.storage_options - - def test_read_with_fsspec(self): - p = self.path.joinpath("file2.txt") - - protocol = p.protocol - storage_options = p.storage_options - path = p.path - - fs = filesystem(protocol, **storage_options) - with fs.open(path) as f: - assert f.read() == b"hello world" - - def test_hashable(self): - assert hash(self.path) - - def test_storage_options_dont_affect_hash(self): - cls = type(self.path) - p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p1 = cls(str(self.path), test_extra=2, **self.path.storage_options) - assert hash(p0) == hash(p1) - - def test_eq(self): - cls = type(self.path) - p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p1 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p2 = cls(str(self.path), test_extra=2, **self.path.storage_options) - assert p0 == p1 - assert p0 != p2 - assert p1 != p2 - - def test_samefile(self): - f1 = self.path.joinpath("file1.txt") - f2 = self.path.joinpath("file2.txt") - - assert f1.samefile(f2) is False - assert f1.samefile(f2.path) is False - assert f1.samefile(f1) is True - assert f1.samefile(f1.path) is True - - def test_info(self): - p0 = self.path.joinpath("file1.txt") - p1 = self.path.joinpath("folder1") - - assert p0.info.exists() is True - assert p0.info.is_file() is True - assert p0.info.is_dir() is False - assert p0.info.is_symlink() is False - assert p1.info.exists() is True - assert p1.info.is_file() is False - assert p1.info.is_dir() is True - assert p1.info.is_symlink() is False - - def test_copy_local(self, tmp_path: Path): - target = UPath(tmp_path) / "target-file1.txt" - - source = self.path / "file1.txt" - content = source.read_text() - source.copy(target) - assert target.exists() - assert target.read_text() == content - - def test_copy_into_local(self, tmp_path: Path): - target_dir = UPath(tmp_path) / "target-dir" - target_dir.mkdir() - - source = self.path / "file1.txt" - content = source.read_text() - source.copy_into(target_dir) - target = target_dir / "file1.txt" - assert target.exists() - assert target.read_text() == content - - def test_copy_memory(self, clear_fsspec_memory_cache): - target = UPath("memory:///target-file1.txt") - source = self.path / "file1.txt" - content = source.read_text() - source.copy(target) - assert target.exists() - assert target.read_text() == content - - def test_copy_into_memory(self, clear_fsspec_memory_cache): - target_dir = UPath("memory:///target-dir") - target_dir.mkdir() - - source = self.path / "file1.txt" - content = source.read_text() - source.copy_into(target_dir) - target = target_dir / "file1.txt" - assert target.exists() - assert target.read_text() == content - def test_move_local(self, tmp_path: Path): target = UPath(tmp_path) / "target-file1.txt" @@ -741,48 +804,109 @@ def test_move_into_memory(self, clear_fsspec_memory_cache): assert target.read_text() == content assert not source.exists() - def test_relative_to(self): - base = self.path - child = self.path / "folder1" / "file1.txt" - relative = child.relative_to(base) - assert str(relative) == "folder1/file1.txt" + def prepare_file_system(self): + self.make_top_folder() + self.make_test_files() - def test_trailing_slash_joinpath_is_identical(self): - # setup - cls = type(self.path) - protocol = self.path.protocol - path = self.path.path - sopts = self.path.storage_options - if not path: - path = "something" - path_with_slash = "something/" - elif path.endswith("/"): - path_with_slash = path - path = path.removeprefix("/") - else: - path_with_slash = path + "/" - key = "key/" + def make_top_folder(self): + self.path.mkdir(parents=True, exist_ok=True) - # test - a = cls(path_with_slash + key, protocol=protocol, **sopts) - b = cls(path_with_slash, key, protocol=protocol, **sopts) - c = cls(path_with_slash, protocol=protocol, **sopts).joinpath(key) - d = cls(path_with_slash, protocol=protocol, **sopts) / key - assert a.path == b.path == c.path == d.path + def make_test_files(self): + folder1 = self.path.joinpath("folder1") + folder1.mkdir(exist_ok=True) + folder1_files = ["file1.txt", "file2.txt"] + for f in folder1_files: + p = folder1.joinpath(f) + p.touch() + p.write_text(f) - def test_trailing_slash_is_stripped(self): - has_meaningful_trailing_slash = getattr( - self.path.parser, "has_meaningful_trailing_slash", False - ) - if has_meaningful_trailing_slash: - assert not self.path.joinpath("key").path.endswith("/") - assert self.path.joinpath("key/").path.endswith("/") - else: - assert not self.path.joinpath("key").path.endswith("/") - assert not self.path.joinpath("key/").path.endswith("/") + file1 = self.path.joinpath("file1.txt") + file1.touch() + file1.write_text("hello world") + file2 = self.path.joinpath("file2.txt") + file2.touch() + file2.write_bytes(b"hello world") - def test_parents_are_absolute(self): - # this is a cross implementation compatible way to ensure that - # the path representing the root is absolute - is_absolute = [p.is_absolute() for p in self.path.parents] - assert all(is_absolute) + +# ============================================================================= +# UPathNotImplementedTests: Tests for UPath-specific unsupported operations +# ============================================================================= + + +class UPathNotImplementedTests: + """Tests for UPath operations that are typically not implemented. + + These tests verify that operations not supported by most UPath + implementations raise the appropriate exceptions (NotImplementedError + or UnsupportedOperation). + """ + + path: UPath + + def test_cwd(self): + with pytest.raises(NotImplementedError): + self.path.cwd() + + def test_home(self): + with pytest.raises(NotImplementedError): + self.path.home() + + def test_chmod(self): + with pytest.raises(NotImplementedError): + self.path.joinpath("file1.txt").chmod(777) + + def test_group(self): + with pytest.raises(NotImplementedError): + self.path.group() + + def test_lchmod(self): + try: + self.path.lchmod(mode=0o777) + except UnsupportedOperation: + pass + + def test_owner(self): + with pytest.raises(NotImplementedError): + self.path.owner() + + def test_readlink(self): + with pytest.raises(NotImplementedError): + self.path.readlink() + + +# ============================================================================= +# BaseTests: Composite test suite for full UPath functionality +# ============================================================================= + + +class BaseTests( + JoinablePathTests, + ReadablePathTests, + WritablePathTests, + ReadWritePathTests, + UPathNotImplementedTests, +): + """Comprehensive test suite combining all path operation tests. + + This class composes all the individual test suites for testing UPath + implementations that support full read/write functionality. For UPath + subclasses with limited functionality (e.g., read-only), use the + appropriate subset of test classes: + + - JoinablePathTests: Pure path operations (no I/O) + - ReadablePathTests: Read-only operations + - WritablePathTests: Write-only operations + - ReadWritePathTests: Operations requiring both read and write + - UPathNotImplementedTests: Tests for unsupported operations + + Example usage for a read-only UPath: + + class TestMyReadOnlyPath(JoinablePathTests, ReadablePathTests): + @pytest.fixture(autouse=True) + def setup(self, ...): + self.path = MyReadOnlyUPath(...) + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath From f7379f04e0f2d5fb4f375d0780e708d0899714cc Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 21:28:51 +0100 Subject: [PATCH 2/9] tests: cleanup cases more --- upath/tests/cases.py | 105 +++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 68 deletions(-) diff --git a/upath/tests/cases.py b/upath/tests/cases.py index 4b6c0c41..bed17e34 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -16,10 +16,6 @@ from upath._stat import UPathStatResult from upath.types import StatResultType -# ============================================================================= -# JoinablePathTests: Tests for pure path operations (no I/O) -# ============================================================================= - class JoinablePathTests: """Tests for JoinablePath interface. @@ -219,11 +215,6 @@ def test_private_url_attr_in_sync(self): assert p1.protocol == p2.protocol -# ============================================================================= -# ReadablePathTests: Tests for readable path operations -# ============================================================================= - - class ReadablePathTests: """Tests for ReadablePath interface. @@ -378,6 +369,14 @@ def test_lstat(self): st = self.path.lstat() assert st is not None + def test_cwd(self): + with pytest.raises(UnsupportedOperation): + self.path.cwd() + + def test_home(self): + with pytest.raises(UnsupportedOperation): + self.path.home() + def test_open(self): p = self.path.joinpath("file1.txt") with p.open(mode="r") as f: @@ -488,6 +487,18 @@ def test_read_with_fsspec(self): with fs.open(path) as f: assert f.read() == b"hello world" + def test_readlink(self): + with pytest.raises(UnsupportedOperation): + self.path.readlink() + + def test_group(self): + with pytest.raises(UnsupportedOperation): + self.path.group() + + def test_owner(self): + with pytest.raises(UnsupportedOperation): + self.path.owner() + # ============================================================================= # WritablePathTests: Tests for writable path operations @@ -592,16 +603,27 @@ def test_write_text(self, pathlib_base): path.write_text(s) assert path.read_text() == s + def test_chmod(self): + with pytest.raises(NotImplementedError): + self.path.joinpath("file1.txt").chmod(777) + + def test_lchmod(self): + with pytest.raises(UnsupportedOperation): + self.path.lchmod(mode=0o777) + def test_symlink_to(self): - pass + with pytest.raises(UnsupportedOperation): + self.path.joinpath("link").symlink_to("target") - def test_link_to(self): - pass + def test_hardlink_to(self): + with pytest.raises(UnsupportedOperation): + self.path.joinpath("link").hardlink_to("target") + if sys.version_info < (3, 12): -# ============================================================================= -# ReadWritePathTests: Tests requiring both read and write operations -# ============================================================================= + def test_link_to(self): + with pytest.raises(UnsupportedOperation): + self.path.link_to("link") class ReadWritePathTests: @@ -828,63 +850,11 @@ def make_test_files(self): file2.write_bytes(b"hello world") -# ============================================================================= -# UPathNotImplementedTests: Tests for UPath-specific unsupported operations -# ============================================================================= - - -class UPathNotImplementedTests: - """Tests for UPath operations that are typically not implemented. - - These tests verify that operations not supported by most UPath - implementations raise the appropriate exceptions (NotImplementedError - or UnsupportedOperation). - """ - - path: UPath - - def test_cwd(self): - with pytest.raises(NotImplementedError): - self.path.cwd() - - def test_home(self): - with pytest.raises(NotImplementedError): - self.path.home() - - def test_chmod(self): - with pytest.raises(NotImplementedError): - self.path.joinpath("file1.txt").chmod(777) - - def test_group(self): - with pytest.raises(NotImplementedError): - self.path.group() - - def test_lchmod(self): - try: - self.path.lchmod(mode=0o777) - except UnsupportedOperation: - pass - - def test_owner(self): - with pytest.raises(NotImplementedError): - self.path.owner() - - def test_readlink(self): - with pytest.raises(NotImplementedError): - self.path.readlink() - - -# ============================================================================= -# BaseTests: Composite test suite for full UPath functionality -# ============================================================================= - - class BaseTests( JoinablePathTests, ReadablePathTests, WritablePathTests, ReadWritePathTests, - UPathNotImplementedTests, ): """Comprehensive test suite combining all path operation tests. @@ -897,7 +867,6 @@ class BaseTests( - ReadablePathTests: Read-only operations - WritablePathTests: Write-only operations - ReadWritePathTests: Operations requiring both read and write - - UPathNotImplementedTests: Tests for unsupported operations Example usage for a read-only UPath: From 34e4af25255bfe95ef610b4a709338d1d85b4352 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 21:29:29 +0100 Subject: [PATCH 3/9] tests: better github rate limit catching --- upath/tests/implementations/test_github.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/upath/tests/implementations/test_github.py b/upath/tests/implementations/test_github.py index a5318917..2032e191 100644 --- a/upath/tests/implementations/test_github.py +++ b/upath/tests/implementations/test_github.py @@ -32,6 +32,8 @@ def wrapped_method(self, *args, **kwargs): except AssertionError as e: if "nodename nor servname provided, or not known" in str(e): pytest.xfail(reason="No internet connection") + if "rate limit exceeded" in str(e): + pytest.xfail("GitHub API rate limit exceeded") raise except requests.exceptions.ConnectionError: pytest.xfail(reason="No internet connection") From 9970cd96f0fe942849abbdc9ad194616298b31a8 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 21:35:13 +0100 Subject: [PATCH 4/9] tests: update symlink and hardlink tests --- upath/tests/cases.py | 6 ------ upath/tests/test_extensions.py | 12 ++++++++++++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/upath/tests/cases.py b/upath/tests/cases.py index bed17e34..8026b15a 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -619,12 +619,6 @@ def test_hardlink_to(self): with pytest.raises(UnsupportedOperation): self.path.joinpath("link").hardlink_to("target") - if sys.version_info < (3, 12): - - def test_link_to(self): - with pytest.raises(UnsupportedOperation): - self.path.link_to("link") - class ReadWritePathTests: """Tests requiring both ReadablePath and WritablePath interfaces. diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index c3976957..e1e3a7ee 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -130,6 +130,18 @@ def test_cwd(self): with pytest.raises(UnsupportedOperation): type(self.path).cwd() + def test_lchmod(self): + self.path.lchmod(mode=0o777) + + def test_symlink_to(self): + self.path.joinpath("link").symlink_to(self.path) + + def test_hardlink_to(self): + try: + self.path.joinpath("link").hardlink_to(self.path) + except PermissionError: + pass # hardlink may require elevated permissions + def test_custom_subclass(): From 9b02db6c4467eb874e0381171f0afe37035df84b Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 21:36:20 +0100 Subject: [PATCH 5/9] upath: small fixes for hardlink_to backports --- upath/extensions.py | 5 +++++ upath/implementations/local.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/upath/extensions.py b/upath/extensions.py index 84d56798..d75dd43b 100644 --- a/upath/extensions.py +++ b/upath/extensions.py @@ -16,6 +16,7 @@ from urllib.parse import SplitResult from fsspec import AbstractFileSystem +from pathlib_abc import vfspath from upath._chain import Chain from upath._chain import ChainSegment @@ -160,6 +161,8 @@ def symlink_to( target: ReadablePathLike, target_is_directory: bool = False, ) -> None: + if not isinstance(target, str): + target = vfspath(target) self.__wrapped__.symlink_to(target, target_is_directory=target_is_directory) def mkdir( @@ -431,6 +434,8 @@ def is_relative_to(self, other, /, *_deprecated) -> bool: # type: ignore[overri return self.__wrapped__.is_relative_to(other, *_deprecated) def hardlink_to(self, target: ReadablePathLike) -> None: + if not isinstance(target, str): + target = vfspath(target) return self.__wrapped__.hardlink_to(target) def match(self, pattern: str, *, case_sensitive: bool | None = None) -> bool: diff --git a/upath/implementations/local.py b/upath/implementations/local.py index 7506b435..739c78aa 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -642,7 +642,7 @@ def relative_to( # type: ignore[override] def hardlink_to(self, target: ReadablePathLike) -> None: try: - os.link(target, self) # type: ignore[arg-type] + os.link(os.fspath(target), os.fspath(self)) # type: ignore[arg-type] except AttributeError: raise UnsupportedOperation("hardlink operation not supported") From c14d656e0c400b13cc71fcfd5c6f5cc6844cf13d Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 23:09:18 +0100 Subject: [PATCH 6/9] tests: lchmod might not be available --- upath/tests/test_extensions.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index e1e3a7ee..8f2d256f 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -1,5 +1,6 @@ import os import sys +from contextlib import nullcontext import pytest @@ -131,7 +132,12 @@ def test_cwd(self): type(self.path).cwd() def test_lchmod(self): - self.path.lchmod(mode=0o777) + if hasattr(os, "lchmod") and os.lchmod in os.supports_follow_symlinks: + cm = nullcontext() + else: + cm = pytest.raises(UnsupportedOperation) + with cm: + self.path.lchmod(mode=0o777) def test_symlink_to(self): self.path.joinpath("link").symlink_to(self.path) From eb95cf6f0bdc0e29cd1c1b9ad00b9e6622a0b78e Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sat, 20 Dec 2025 23:47:20 +0100 Subject: [PATCH 7/9] tests: better check for lchmod test --- upath/tests/test_extensions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index 8f2d256f..55cebddd 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -132,7 +132,8 @@ def test_cwd(self): type(self.path).cwd() def test_lchmod(self): - if hasattr(os, "lchmod") and os.lchmod in os.supports_follow_symlinks: + # see: https://github.com/python/cpython/issues/108660#issuecomment-1854645898 + if hasattr(os, "lchmod") or os.chmod in os.supports_follow_symlinks: cm = nullcontext() else: cm = pytest.raises(UnsupportedOperation) From e7d128151389479dfc728316f8e81e3de6b22111 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sun, 21 Dec 2025 00:49:55 +0100 Subject: [PATCH 8/9] tests: adjust lchmod --- upath/tests/test_extensions.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index 55cebddd..5ca38d91 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -132,13 +132,19 @@ def test_cwd(self): type(self.path).cwd() def test_lchmod(self): + # setup + a = self.path.joinpath("a") + b = self.path.joinpath("b") + a.touch() + b.symlink_to(a) + # see: https://github.com/python/cpython/issues/108660#issuecomment-1854645898 if hasattr(os, "lchmod") or os.chmod in os.supports_follow_symlinks: cm = nullcontext() else: cm = pytest.raises(UnsupportedOperation) with cm: - self.path.lchmod(mode=0o777) + b.lchmod(mode=0o777) def test_symlink_to(self): self.path.joinpath("link").symlink_to(self.path) From a2b2d8857b221f2f324ab5d7477629fcea4b96a3 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Sun, 21 Dec 2025 01:01:04 +0100 Subject: [PATCH 9/9] tests: lchmod raises notimplemented on linux --- upath/tests/test_extensions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index 5ca38d91..fdd802a5 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -142,7 +142,7 @@ def test_lchmod(self): if hasattr(os, "lchmod") or os.chmod in os.supports_follow_symlinks: cm = nullcontext() else: - cm = pytest.raises(UnsupportedOperation) + cm = pytest.raises((UnsupportedOperation, NotImplementedError)) with cm: b.lchmod(mode=0o777)