diff --git a/upath/tests/cases.py b/upath/tests/cases.py index 16255fe0..8026b15a 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -17,36 +17,234 @@ from upath.types import StatResultType -class BaseTests: - SUPPORTS_EMPTY_DIRS = True +class JoinablePathTests: + """Tests for JoinablePath interface. + + These tests verify pure path operations that don't require filesystem access: + - Path parsing and components (parts, parents, name, stem, suffix, etc.) + - Path manipulation (with_name, with_suffix, with_stem, joinpath, etc.) + - Path comparison and hashing + - Serialization (pickling) + - URI handling + """ path: UPath - def test_cwd(self): - with pytest.raises(NotImplementedError): - self.path.cwd() + def test_is_absolute(self): + assert self.path.is_absolute() is True - def test_home(self): - with pytest.raises(NotImplementedError): - self.path.home() + def test_parents(self): + p = self.path.joinpath("folder1", "file1.txt") + assert p.parents[0] == p.parent + assert p.parents[1] == p.parent.parent + assert p.parents[0].name == "folder1" + assert p.parents[1].name == self.path.name + + def test_with_name(self): + path = self.path / "file.txt" + path = path.with_name("file.zip") + assert path.name == "file.zip" + + def test_with_suffix(self): + path = self.path / "file.txt" + path = path.with_suffix(".zip") + assert path.suffix == ".zip" + + def test_suffix(self): + path = self.path / "no_suffix" + assert path.suffix == "" + path = self.path / "file.txt" + assert path.suffix == ".txt" + path = self.path / "archive.tar.gz" + assert path.suffix == ".gz" + + def test_suffixes(self): + path = self.path / "no_suffix" + assert path.suffixes == [] + path = self.path / "file.txt" + assert path.suffixes == [".txt"] + path = self.path / "archive.tar.gz" + assert path.suffixes == [".tar", ".gz"] + + def test_with_stem(self): + if sys.version_info < (3, 9): + pytest.skip("with_stem only available on py3.9+") + path = self.path / "file.txt" + path = path.with_stem("document") + assert path.stem == "document" + + def test_repr_after_with_name(self): + p = self.path.joinpath("file.txt").with_name("file.zip") + assert "file.zip" in repr(p) + + def test_repr_after_with_suffix(self): + p = self.path.joinpath("file.txt").with_suffix(".zip") + assert "file.zip" in repr(p) + + def test_child_path(self): + path_str = self.path.__vfspath__() + path_a = UPath( + path_str, "folder", protocol=self.path.protocol, **self.path.storage_options + ) + path_b = self.path / "folder" + + assert str(path_a) == str(path_b) + assert path_a.root == path_b.root + assert path_a.drive == path_b.drive + + def test_copy_path(self): + path = self.path + copy_path = UPath(path) + + assert type(path) is type(copy_path) + assert str(path) == str(copy_path) + assert path.drive == copy_path.drive + assert path.root == copy_path.root + assert path.parts == copy_path.parts + assert path.fs.storage_options == copy_path.fs.storage_options + + def test_pickling(self): + path = self.path + pickled_path = pickle.dumps(path) + recovered_path = pickle.loads(pickled_path) + + assert type(path) is type(recovered_path) + assert str(path) == str(recovered_path) + assert path.fs.storage_options == recovered_path.fs.storage_options + + def test_pickling_child_path(self): + path = self.path / "subfolder" / "subsubfolder" + pickled_path = pickle.dumps(path) + recovered_path = pickle.loads(pickled_path) + + assert type(path) is type(recovered_path) + assert str(path) == str(recovered_path) + assert path.drive == recovered_path.drive + assert path.root == recovered_path.root + assert path.parts == recovered_path.parts + assert path.fs.storage_options == recovered_path.fs.storage_options + assert path.storage_options == recovered_path.storage_options + + def test_as_uri(self): + # test that we can reconstruct the path from the uri + p0 = self.path + uri = p0.as_uri() + p1 = UPath(uri, **p0.storage_options) + assert p0 == p1 + + def test_protocol(self): + protocol = self.path.protocol + protocols = [p] if isinstance((p := type(self.path.fs).protocol), str) else p + print(protocol, protocols) + assert protocol in protocols + + def test_storage_options(self): + storage_options = self.path.storage_options + assert storage_options == self.path.fs.storage_options + + def test_hashable(self): + assert hash(self.path) + + def test_storage_options_dont_affect_hash(self): + cls = type(self.path) + p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p1 = cls(str(self.path), test_extra=2, **self.path.storage_options) + assert hash(p0) == hash(p1) + + def test_eq(self): + cls = type(self.path) + p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p1 = cls(str(self.path), test_extra=1, **self.path.storage_options) + p2 = cls(str(self.path), test_extra=2, **self.path.storage_options) + assert p0 == p1 + assert p0 != p2 + assert p1 != p2 + + def test_relative_to(self): + base = self.path + child = self.path / "folder1" / "file1.txt" + relative = child.relative_to(base) + assert str(relative) == "folder1/file1.txt" + + def test_trailing_slash_joinpath_is_identical(self): + # setup + cls = type(self.path) + protocol = self.path.protocol + path = self.path.path + sopts = self.path.storage_options + if not path: + path = "something" + path_with_slash = "something/" + elif path.endswith("/"): + path_with_slash = path + path = path.removeprefix("/") + else: + path_with_slash = path + "/" + key = "key/" + + # test + a = cls(path_with_slash + key, protocol=protocol, **sopts) + b = cls(path_with_slash, key, protocol=protocol, **sopts) + c = cls(path_with_slash, protocol=protocol, **sopts).joinpath(key) + d = cls(path_with_slash, protocol=protocol, **sopts) / key + assert a.path == b.path == c.path == d.path + + def test_trailing_slash_is_stripped(self): + has_meaningful_trailing_slash = getattr( + self.path.parser, "has_meaningful_trailing_slash", False + ) + if has_meaningful_trailing_slash: + assert not self.path.joinpath("key").path.endswith("/") + assert self.path.joinpath("key/").path.endswith("/") + else: + assert not self.path.joinpath("key").path.endswith("/") + assert not self.path.joinpath("key/").path.endswith("/") + + def test_parents_are_absolute(self): + # this is a cross implementation compatible way to ensure that + # the path representing the root is absolute + is_absolute = [p.is_absolute() for p in self.path.parents] + assert all(is_absolute) + + def test_private_url_attr_in_sync(self): + p = self.path + p1 = self.path.joinpath("c") + p2 = self.path / "c" + assert p1._url == p2._url + assert p1._url != p._url + assert p1.protocol == p2.protocol + + +class ReadablePathTests: + """Tests for ReadablePath interface. + + These tests verify operations that read from the filesystem: + - File/directory existence and type checks (exists, is_dir, is_file, etc.) + - File metadata (stat, info) + - Reading file contents (read_bytes, read_text, open for reading) + - Directory listing (iterdir, glob, rglob) + - Copy operations (read source) + """ + + path: UPath def test_stat(self): - stat = self.path.stat() + stat_ = self.path.stat() # for debugging os.stat_result compatibility - attrs = {attr for attr in dir(stat) if attr.startswith("st_")} + attrs = {attr for attr in dir(stat_) if attr.startswith("st_")} print(attrs) - assert isinstance(stat, StatResultType) - assert len(tuple(stat)) == os.stat_result.n_sequence_fields + assert isinstance(stat_, StatResultType) + assert len(tuple(stat_)) == os.stat_result.n_sequence_fields with warnings.catch_warnings(): warnings.simplefilter("error") for idx in range(os.stat_result.n_sequence_fields): - assert isinstance(stat[idx], int) + assert isinstance(stat_[idx], int) for attr in UPathStatResult._fields + UPathStatResult._fields_extra: - assert hasattr(stat, attr) + assert hasattr(stat_, attr) def test_stat_dir_st_mode(self): base = self.path.stat() # base folder @@ -60,10 +258,6 @@ def test_stat_st_size(self): file1 = self.path.joinpath("file1.txt").stat() assert file1.st_size == 11 - def test_chmod(self): - with pytest.raises(NotImplementedError): - self.path.joinpath("file1.txt").chmod(777) - @pytest.mark.parametrize( "url, expected", [("file1.txt", True), ("fakefile.txt", False)] ) @@ -103,10 +297,6 @@ def test_glob(self, pathlib_base, pattern): print(mock_glob_normalized, path_glob_normalized) assert mock_glob_normalized == path_glob_normalized - def test_group(self): - with pytest.raises(NotImplementedError): - self.path.group() - def test_is_dir(self): assert self.path.is_dir() @@ -119,9 +309,6 @@ def test_is_file(self): assert path_exists.is_file() assert not (self.path / "not-existing-file.txt").is_file() - def test_is_absolute(self): - assert self.path.is_absolute() is True - def test_is_mount(self): try: self.path.is_mount() @@ -177,68 +364,25 @@ def test_iterdir_trailing_slash(self): files_slash = list(self.path.joinpath("folder1/").iterdir()) assert files_noslash == files_slash - def test_parents(self): - p = self.path.joinpath("folder1", "file1.txt") - assert p.is_file() - assert p.parents[0] == p.parent - assert p.parents[1] == p.parent.parent - assert p.parents[0].name == "folder1" - assert p.parents[1].name == self.path.name - - def test_lchmod(self): - try: - self.path.lchmod(mode=0o777) - except UnsupportedOperation: - pass - def test_lstat(self): with pytest.warns(UserWarning, match=r"[A-Za-z]+.stat"): st = self.path.lstat() assert st is not None - def test_mkdir(self): - new_dir = self.path.joinpath("new_dir") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - assert new_dir.exists() - - def test_mkdir_exists_ok_true(self): - new_dir = self.path.joinpath("new_dir_may_exists") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - new_dir.mkdir(exist_ok=True) + def test_cwd(self): + with pytest.raises(UnsupportedOperation): + self.path.cwd() - def test_mkdir_exists_ok_false(self): - new_dir = self.path.joinpath("new_dir_may_not_exists") - new_dir.mkdir() - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - with pytest.raises(FileExistsError): - new_dir.mkdir(exist_ok=False) + def test_home(self): + with pytest.raises(UnsupportedOperation): + self.path.home() - def test_mkdir_parents_true_exists_ok_true(self): - new_dir = self.path.joinpath("parent", "new_dir_may_not_exist") - new_dir.mkdir(parents=True) - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - new_dir.mkdir(parents=True, exist_ok=True) - - def test_mkdir_parents_true_exists_ok_false(self): - new_dir = self.path.joinpath("parent", "new_dir_may_exist") - new_dir.mkdir(parents=True) - if not self.SUPPORTS_EMPTY_DIRS: - new_dir.joinpath(".file").touch() - with pytest.raises(FileExistsError): - new_dir.mkdir(parents=True, exist_ok=False) - - def test_open(self): - p = self.path.joinpath("file1.txt") - with p.open(mode="r") as f: - assert f.read() == "hello world" - with p.open(mode="rb") as f: - assert f.read() == b"hello world" + def test_open(self): + p = self.path.joinpath("file1.txt") + with p.open(mode="r") as f: + assert f.read() == "hello world" + with p.open(mode="rb") as f: + assert f.read() == b"hello world" def test_open_buffering(self): p = self.path.joinpath("file1.txt") @@ -254,10 +398,6 @@ def test_open_errors(self): with p.open(mode="r", encoding="ascii", errors="strict") as f: assert f.read() == "hello world" - def test_owner(self): - with pytest.raises(NotImplementedError): - self.path.owner() - def test_read_bytes(self, pathlib_base): mock = self.path.joinpath("file2.txt") pl = pathlib_base.joinpath("file2.txt") @@ -269,10 +409,231 @@ def test_read_text(self, local_testdir): upath.read_text() == Path(local_testdir).joinpath("file1.txt").read_text() ) + def test_rglob(self, pathlib_base): + pattern = "*.txt" + result = [*self.path.rglob(pattern)] + expected = [*pathlib_base.rglob(pattern)] + assert len(result) == len(expected) + + def test_samefile(self): + f1 = self.path.joinpath("file1.txt") + f2 = self.path.joinpath("file2.txt") + + assert f1.samefile(f2) is False + assert f1.samefile(f2.path) is False + assert f1.samefile(f1) is True + assert f1.samefile(f1.path) is True + + def test_info(self): + p0 = self.path.joinpath("file1.txt") + p1 = self.path.joinpath("folder1") + + assert p0.info.exists() is True + assert p0.info.is_file() is True + assert p0.info.is_dir() is False + assert p0.info.is_symlink() is False + assert p1.info.exists() is True + assert p1.info.is_file() is False + assert p1.info.is_dir() is True + assert p1.info.is_symlink() is False + + def test_copy_local(self, tmp_path: Path): + target = UPath(tmp_path) / "target-file1.txt" + + source = self.path / "file1.txt" + content = source.read_text() + source.copy(target) + assert target.exists() + assert target.read_text() == content + + def test_copy_into_local(self, tmp_path: Path): + target_dir = UPath(tmp_path) / "target-dir" + target_dir.mkdir() + + source = self.path / "file1.txt" + content = source.read_text() + source.copy_into(target_dir) + target = target_dir / "file1.txt" + assert target.exists() + assert target.read_text() == content + + def test_copy_memory(self, clear_fsspec_memory_cache): + target = UPath("memory:///target-file1.txt") + source = self.path / "file1.txt" + content = source.read_text() + source.copy(target) + assert target.exists() + assert target.read_text() == content + + def test_copy_into_memory(self, clear_fsspec_memory_cache): + target_dir = UPath("memory:///target-dir") + target_dir.mkdir() + + source = self.path / "file1.txt" + content = source.read_text() + source.copy_into(target_dir) + target = target_dir / "file1.txt" + assert target.exists() + assert target.read_text() == content + + def test_read_with_fsspec(self): + p = self.path.joinpath("file2.txt") + + protocol = p.protocol + storage_options = p.storage_options + path = p.path + + fs = filesystem(protocol, **storage_options) + with fs.open(path) as f: + assert f.read() == b"hello world" + def test_readlink(self): - with pytest.raises(NotImplementedError): + with pytest.raises(UnsupportedOperation): self.path.readlink() + def test_group(self): + with pytest.raises(UnsupportedOperation): + self.path.group() + + def test_owner(self): + with pytest.raises(UnsupportedOperation): + self.path.owner() + + +# ============================================================================= +# WritablePathTests: Tests for writable path operations +# ============================================================================= + + +class WritablePathTests: + """Tests for WritablePath interface. + + These tests verify operations that write to the filesystem: + - Creating directories (mkdir) + - Creating files (touch) + - Writing file contents (write_bytes, write_text) + - Removing files/directories (unlink, rmdir) + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath + + def test_mkdir(self): + new_dir = self.path.joinpath("new_dir") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + assert new_dir.exists() + + def test_mkdir_exists_ok_true(self): + new_dir = self.path.joinpath("new_dir_may_exists") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + new_dir.mkdir(exist_ok=True) + + def test_mkdir_exists_ok_false(self): + new_dir = self.path.joinpath("new_dir_may_not_exists") + new_dir.mkdir() + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + with pytest.raises(FileExistsError): + new_dir.mkdir(exist_ok=False) + + def test_mkdir_parents_true_exists_ok_true(self): + new_dir = self.path.joinpath("parent", "new_dir_may_not_exist") + new_dir.mkdir(parents=True) + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + new_dir.mkdir(parents=True, exist_ok=True) + + def test_mkdir_parents_true_exists_ok_false(self): + new_dir = self.path.joinpath("parent", "new_dir_may_exist") + new_dir.mkdir(parents=True) + if not self.SUPPORTS_EMPTY_DIRS: + new_dir.joinpath(".file").touch() + with pytest.raises(FileExistsError): + new_dir.mkdir(parents=True, exist_ok=False) + + def test_touch_exists_ok_false(self): + f = self.path.joinpath("file1.txt") + assert f.exists() + with pytest.raises(FileExistsError): + f.touch(exist_ok=False) + + def test_touch_exists_ok_true(self): + f = self.path.joinpath("file1.txt") + assert f.exists() + data = f.read_text() + f.touch(exist_ok=True) + assert f.read_text() == data + + def test_touch(self): + path = self.path.joinpath("test_touch.txt") + assert not path.exists() + path.touch() + assert path.exists() + + def test_touch_unlink(self): + path = self.path.joinpath("test_touch.txt") + path.touch() + assert path.exists() + path.unlink() + assert not path.exists() + + # should raise FileNotFoundError since file is missing + with pytest.raises(FileNotFoundError): + path.unlink() + + # file doesn't exists, but missing_ok is True + path.unlink(missing_ok=True) + + def test_write_bytes(self, pathlib_base): + fn = "test_write_bytes.txt" + s = b"hello_world" + path = self.path.joinpath(fn) + path.write_bytes(s) + assert path.read_bytes() == s + + def test_write_text(self, pathlib_base): + fn = "test_write_text.txt" + s = "hello_world" + path = self.path.joinpath(fn) + path.write_text(s) + assert path.read_text() == s + + def test_chmod(self): + with pytest.raises(NotImplementedError): + self.path.joinpath("file1.txt").chmod(777) + + def test_lchmod(self): + with pytest.raises(UnsupportedOperation): + self.path.lchmod(mode=0o777) + + def test_symlink_to(self): + with pytest.raises(UnsupportedOperation): + self.path.joinpath("link").symlink_to("target") + + def test_hardlink_to(self): + with pytest.raises(UnsupportedOperation): + self.path.joinpath("link").hardlink_to("target") + + +class ReadWritePathTests: + """Tests requiring both ReadablePath and WritablePath interfaces. + + These tests verify operations that need both read and write access: + - Rename/move operations + - File system setup/teardown + - Operations that verify write results by reading + - rmdir operations + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath + def test_rename(self): p_source = self.path.joinpath("file1.txt") p_target = self.path.joinpath("file1_renamed.txt") @@ -376,91 +737,25 @@ def test_replace(self): def test_resolve(self): pass - def test_rglob(self, pathlib_base): - pattern = "*.txt" - result = [*self.path.rglob(pattern)] - expected = [*pathlib_base.rglob(pattern)] - assert len(result) == len(expected) - - def test_symlink_to(self): - pass + def test_rmdir_no_dir(self): + p = self.path.joinpath("file1.txt") + with pytest.raises(NotADirectoryError): + p.rmdir() - def test_touch_exists_ok_false(self): - f = self.path.joinpath("file1.txt") - assert f.exists() - with pytest.raises(FileExistsError): - f.touch(exist_ok=False) + def test_iterdir_no_dir(self): + p = self.path.joinpath("file1.txt") + assert p.is_file() + with pytest.raises(NotADirectoryError): + _ = list(p.iterdir()) - def test_touch_exists_ok_true(self): - f = self.path.joinpath("file1.txt") - assert f.exists() - data = f.read_text() - f.touch(exist_ok=True) - assert f.read_text() == data + def test_rmdir_not_empty(self): + p = self.path.joinpath("folder1") + with pytest.raises(OSError, match="not empty"): + p.rmdir(recursive=False) - def test_touch(self): - path = self.path.joinpath("test_touch.txt") - assert not path.exists() - path.touch() - assert path.exists() - - def test_touch_unlink(self): - path = self.path.joinpath("test_touch.txt") - path.touch() - assert path.exists() - path.unlink() - assert not path.exists() - - # should raise FileNotFoundError since file is missing - with pytest.raises(FileNotFoundError): - path.unlink() - - # file doesn't exists, but missing_ok is True - path.unlink(missing_ok=True) - - def test_link_to(self): - pass - - def test_write_bytes(self, pathlib_base): - fn = "test_write_bytes.txt" - s = b"hello_world" - path = self.path.joinpath(fn) - path.write_bytes(s) - assert path.read_bytes() == s - - def test_write_text(self, pathlib_base): - fn = "test_write_text.txt" - s = "hello_world" - path = self.path.joinpath(fn) - path.write_text(s) - assert path.read_text() == s - - def prepare_file_system(self): - self.make_top_folder() - self.make_test_files() - - def make_top_folder(self): - self.path.mkdir(parents=True, exist_ok=True) - - def make_test_files(self): - folder1 = self.path.joinpath("folder1") - folder1.mkdir(exist_ok=True) - folder1_files = ["file1.txt", "file2.txt"] - for f in folder1_files: - p = folder1.joinpath(f) - p.touch() - p.write_text(f) - - file1 = self.path.joinpath("file1.txt") - file1.touch() - file1.write_text("hello world") - file2 = self.path.joinpath("file2.txt") - file2.touch() - file2.write_bytes(b"hello world") - - def test_fsspec_compat(self): - fs = self.path.fs - content = b"a,b,c\n1,2,3\n4,5,6" + def test_fsspec_compat(self): + fs = self.path.fs + content = b"a,b,c\n1,2,3\n4,5,6" upath1 = self.path / "output1.csv" p1 = upath1.path @@ -482,222 +777,6 @@ def test_fsspec_compat(self): assert upath2.read_bytes() == content upath2.unlink() - def test_pickling(self): - path = self.path - pickled_path = pickle.dumps(path) - recovered_path = pickle.loads(pickled_path) - - assert type(path) is type(recovered_path) - assert str(path) == str(recovered_path) - assert path.fs.storage_options == recovered_path.fs.storage_options - - def test_pickling_child_path(self): - path = self.path / "subfolder" / "subsubfolder" - pickled_path = pickle.dumps(path) - recovered_path = pickle.loads(pickled_path) - - assert type(path) is type(recovered_path) - assert str(path) == str(recovered_path) - assert path.drive == recovered_path.drive - assert path.root == recovered_path.root - assert path.parts == recovered_path.parts - assert path.fs.storage_options == recovered_path.fs.storage_options - assert path.storage_options == recovered_path.storage_options - - def test_child_path(self): - path_str = self.path.__vfspath__() - path_a = UPath( - path_str, "folder", protocol=self.path.protocol, **self.path.storage_options - ) - path_b = self.path / "folder" - - assert str(path_a) == str(path_b) - assert path_a.root == path_b.root - assert path_a.drive == path_b.drive - - def test_copy_path(self): - path = self.path - copy_path = UPath(path) - - assert type(path) is type(copy_path) - assert str(path) == str(copy_path) - assert path.drive == copy_path.drive - assert path.root == copy_path.root - assert path.parts == copy_path.parts - assert path.fs.storage_options == copy_path.fs.storage_options - - def test_with_name(self): - path = self.path / "file.txt" - path = path.with_name("file.zip") - assert path.name == "file.zip" - - def test_with_suffix(self): - path = self.path / "file.txt" - path = path.with_suffix(".zip") - assert path.suffix == ".zip" - - def test_suffix(self): - path = self.path / "no_suffix" - assert path.suffix == "" - path = self.path / "file.txt" - assert path.suffix == ".txt" - path = self.path / "archive.tar.gz" - assert path.suffix == ".gz" - - def test_suffixes(self): - path = self.path / "no_suffix" - assert path.suffixes == [] - path = self.path / "file.txt" - assert path.suffixes == [".txt"] - path = self.path / "archive.tar.gz" - assert path.suffixes == [".tar", ".gz"] - - def test_with_stem(self): - if sys.version_info < (3, 9): - pytest.skip("with_stem only available on py3.9+") - path = self.path / "file.txt" - path = path.with_stem("document") - assert path.stem == "document" - - def test_repr_after_with_name(self): - p = self.path.joinpath("file.txt").with_name("file.zip") - assert "file.zip" in repr(p) - - def test_repr_after_with_suffix(self): - p = self.path.joinpath("file.txt").with_suffix(".zip") - assert "file.zip" in repr(p) - - def test_rmdir_no_dir(self): - p = self.path.joinpath("file1.txt") - with pytest.raises(NotADirectoryError): - p.rmdir() - - def test_iterdir_no_dir(self): - p = self.path.joinpath("file1.txt") - assert p.is_file() - with pytest.raises(NotADirectoryError): - _ = list(p.iterdir()) - - def test_rmdir_not_empty(self): - p = self.path.joinpath("folder1") - with pytest.raises(OSError, match="not empty"): - p.rmdir(recursive=False) - - def test_private_url_attr_in_sync(self): - p = self.path - p1 = self.path.joinpath("c") - p2 = self.path / "c" - assert p1._url == p2._url - assert p1._url != p._url - assert p1.protocol == p2.protocol - - def test_as_uri(self): - # test that we can reconstruct the path from the uri - p0 = self.path - uri = p0.as_uri() - p1 = UPath(uri, **p0.storage_options) - assert p0 == p1 - - def test_protocol(self): - protocol = self.path.protocol - protocols = [p] if isinstance((p := type(self.path.fs).protocol), str) else p - print(protocol, protocols) - assert protocol in protocols - - def test_storage_options(self): - storage_options = self.path.storage_options - assert storage_options == self.path.fs.storage_options - - def test_read_with_fsspec(self): - p = self.path.joinpath("file2.txt") - - protocol = p.protocol - storage_options = p.storage_options - path = p.path - - fs = filesystem(protocol, **storage_options) - with fs.open(path) as f: - assert f.read() == b"hello world" - - def test_hashable(self): - assert hash(self.path) - - def test_storage_options_dont_affect_hash(self): - cls = type(self.path) - p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p1 = cls(str(self.path), test_extra=2, **self.path.storage_options) - assert hash(p0) == hash(p1) - - def test_eq(self): - cls = type(self.path) - p0 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p1 = cls(str(self.path), test_extra=1, **self.path.storage_options) - p2 = cls(str(self.path), test_extra=2, **self.path.storage_options) - assert p0 == p1 - assert p0 != p2 - assert p1 != p2 - - def test_samefile(self): - f1 = self.path.joinpath("file1.txt") - f2 = self.path.joinpath("file2.txt") - - assert f1.samefile(f2) is False - assert f1.samefile(f2.path) is False - assert f1.samefile(f1) is True - assert f1.samefile(f1.path) is True - - def test_info(self): - p0 = self.path.joinpath("file1.txt") - p1 = self.path.joinpath("folder1") - - assert p0.info.exists() is True - assert p0.info.is_file() is True - assert p0.info.is_dir() is False - assert p0.info.is_symlink() is False - assert p1.info.exists() is True - assert p1.info.is_file() is False - assert p1.info.is_dir() is True - assert p1.info.is_symlink() is False - - def test_copy_local(self, tmp_path: Path): - target = UPath(tmp_path) / "target-file1.txt" - - source = self.path / "file1.txt" - content = source.read_text() - source.copy(target) - assert target.exists() - assert target.read_text() == content - - def test_copy_into_local(self, tmp_path: Path): - target_dir = UPath(tmp_path) / "target-dir" - target_dir.mkdir() - - source = self.path / "file1.txt" - content = source.read_text() - source.copy_into(target_dir) - target = target_dir / "file1.txt" - assert target.exists() - assert target.read_text() == content - - def test_copy_memory(self, clear_fsspec_memory_cache): - target = UPath("memory:///target-file1.txt") - source = self.path / "file1.txt" - content = source.read_text() - source.copy(target) - assert target.exists() - assert target.read_text() == content - - def test_copy_into_memory(self, clear_fsspec_memory_cache): - target_dir = UPath("memory:///target-dir") - target_dir.mkdir() - - source = self.path / "file1.txt" - content = source.read_text() - source.copy_into(target_dir) - target = target_dir / "file1.txt" - assert target.exists() - assert target.read_text() == content - def test_move_local(self, tmp_path: Path): target = UPath(tmp_path) / "target-file1.txt" @@ -741,48 +820,56 @@ def test_move_into_memory(self, clear_fsspec_memory_cache): assert target.read_text() == content assert not source.exists() - def test_relative_to(self): - base = self.path - child = self.path / "folder1" / "file1.txt" - relative = child.relative_to(base) - assert str(relative) == "folder1/file1.txt" + def prepare_file_system(self): + self.make_top_folder() + self.make_test_files() - def test_trailing_slash_joinpath_is_identical(self): - # setup - cls = type(self.path) - protocol = self.path.protocol - path = self.path.path - sopts = self.path.storage_options - if not path: - path = "something" - path_with_slash = "something/" - elif path.endswith("/"): - path_with_slash = path - path = path.removeprefix("/") - else: - path_with_slash = path + "/" - key = "key/" + def make_top_folder(self): + self.path.mkdir(parents=True, exist_ok=True) - # test - a = cls(path_with_slash + key, protocol=protocol, **sopts) - b = cls(path_with_slash, key, protocol=protocol, **sopts) - c = cls(path_with_slash, protocol=protocol, **sopts).joinpath(key) - d = cls(path_with_slash, protocol=protocol, **sopts) / key - assert a.path == b.path == c.path == d.path + def make_test_files(self): + folder1 = self.path.joinpath("folder1") + folder1.mkdir(exist_ok=True) + folder1_files = ["file1.txt", "file2.txt"] + for f in folder1_files: + p = folder1.joinpath(f) + p.touch() + p.write_text(f) - def test_trailing_slash_is_stripped(self): - has_meaningful_trailing_slash = getattr( - self.path.parser, "has_meaningful_trailing_slash", False - ) - if has_meaningful_trailing_slash: - assert not self.path.joinpath("key").path.endswith("/") - assert self.path.joinpath("key/").path.endswith("/") - else: - assert not self.path.joinpath("key").path.endswith("/") - assert not self.path.joinpath("key/").path.endswith("/") + file1 = self.path.joinpath("file1.txt") + file1.touch() + file1.write_text("hello world") + file2 = self.path.joinpath("file2.txt") + file2.touch() + file2.write_bytes(b"hello world") - def test_parents_are_absolute(self): - # this is a cross implementation compatible way to ensure that - # the path representing the root is absolute - is_absolute = [p.is_absolute() for p in self.path.parents] - assert all(is_absolute) + +class BaseTests( + JoinablePathTests, + ReadablePathTests, + WritablePathTests, + ReadWritePathTests, +): + """Comprehensive test suite combining all path operation tests. + + This class composes all the individual test suites for testing UPath + implementations that support full read/write functionality. For UPath + subclasses with limited functionality (e.g., read-only), use the + appropriate subset of test classes: + + - JoinablePathTests: Pure path operations (no I/O) + - ReadablePathTests: Read-only operations + - WritablePathTests: Write-only operations + - ReadWritePathTests: Operations requiring both read and write + + Example usage for a read-only UPath: + + class TestMyReadOnlyPath(JoinablePathTests, ReadablePathTests): + @pytest.fixture(autouse=True) + def setup(self, ...): + self.path = MyReadOnlyUPath(...) + """ + + SUPPORTS_EMPTY_DIRS = True + + path: UPath