diff --git a/test/test_sshfs.py b/test/test_sshfs.py index 7ee53e2..2fb6c24 100755 --- a/test/test_sshfs.py +++ b/test/test_sshfs.py @@ -163,6 +163,11 @@ def test_sshfs( tst_truncate_path(mnt_dir) tst_truncate_fd(mnt_dir) tst_open_unlink(mnt_dir) + tst_open_writeonly_read(mnt_dir) + tst_access(mnt_dir) + tst_mkdir_exist(mnt_dir) + tst_readdir_repeated(mnt_dir) + tst_rename_sibling(mnt_dir) except Exception as exc: cleanup(mount_process, mnt_dir) raise exc @@ -224,6 +229,8 @@ def tst_rename(mnt_dir): os.rename(src_name, dst_name) assert not os.path.exists(src_name) + assert os.path.basename(src_name) not in os.listdir(mnt_dir) + assert os.path.basename(dst_name) in os.listdir(mnt_dir) with open(dst_name, "rb") as fh: assert fh.read() == data @@ -244,6 +251,7 @@ def tst_rename_over(mnt_dir): os.rename(src_name, dst_name) assert not os.path.exists(src_name) + assert os.path.basename(src_name) not in os.listdir(mnt_dir) with open(dst_name, "rb") as fh: assert fh.read() == src_data @@ -296,6 +304,9 @@ def tst_symlink(mnt_dir): assert fstat.st_nlink == 1 assert linkname in os.listdir(mnt_dir) + os.unlink(fullname) + assert linkname not in os.listdir(mnt_dir) + def tst_create(mnt_dir): name = name_generator() @@ -392,7 +403,7 @@ def tst_open_unlink(mnt_dir): os.unlink(fullname) with pytest.raises(OSError) as exc_info: os.stat(fullname) - assert exc_info.value.errno == errno.ENOENT + assert exc_info.value.errno == errno.ENOENT assert name not in os.listdir(mnt_dir) fh.write(data2) fh.seek(0) @@ -415,6 +426,83 @@ def tst_statvfs(src_dir, mnt_dir): assert vfs.f_namemax > 0 +def tst_open_writeonly_read(mnt_dir): + name = pjoin(mnt_dir, name_generator()) + fd = os.open(name, os.O_CREAT | os.O_WRONLY) + try: + os.write(fd, b"hello") + with pytest.raises(OSError) as exc_info: + os.read(fd, 10) + assert exc_info.value.errno == errno.EBADF + finally: + os.close(fd) + os.unlink(name) + + +def tst_access(mnt_dir): + filename = pjoin(mnt_dir, name_generator()) + with open(filename, "wb") as fh: + fh.write(b"test") + os.chmod(filename, 0o644) + assert os.access(filename, os.R_OK) + if os.getuid() != 0: + assert not os.access(filename, os.X_OK) + os.unlink(filename) + + +def tst_mkdir_exist(mnt_dir): + name = name_generator() + fullname = pjoin(mnt_dir, name) + os.mkdir(fullname) + with pytest.raises(OSError) as exc_info: + os.mkdir(fullname) + assert exc_info.value.errno == errno.EEXIST + os.rmdir(fullname) + + +def tst_readdir_repeated(mnt_dir): + dirname = pjoin(mnt_dir, name_generator()) + os.mkdir(dirname) + names = [] + for i in range(5): + n = name_generator() + names.append(n) + with open(pjoin(dirname, n), "wb") as fh: + fh.write(b"x") + + # Verify repeated directory listings return consistent results + listing1 = sorted(os.listdir(dirname)) + listing2 = sorted(os.listdir(dirname)) + assert listing1 == sorted(names) + assert listing1 == listing2 + + for n in names: + os.unlink(pjoin(dirname, n)) + os.rmdir(dirname) + + +def tst_rename_sibling(mnt_dir): + # Verify renaming one file doesn't break access to a sibling + name_a = pjoin(mnt_dir, name_generator()) + name_b = pjoin(mnt_dir, name_generator()) + name_c = pjoin(mnt_dir, name_generator()) + + with open(name_a, "wb") as fh: + fh.write(b"aaa") + with open(name_b, "wb") as fh: + fh.write(b"bbb") + + os.rename(name_a, name_c) + + assert not os.path.exists(name_a) + assert os.path.exists(name_b) + with open(name_b, "rb") as fh: + assert fh.read() == b"bbb" + + os.unlink(name_b) + os.unlink(name_c) + + def tst_link(mnt_dir, cache_timeout): name1 = pjoin(mnt_dir, name_generator()) name2 = pjoin(mnt_dir, name_generator()) @@ -455,6 +543,10 @@ def tst_link(mnt_dir, cache_timeout): assert os.path.basename(name2) not in os.listdir(mnt_dir) with pytest.raises(FileNotFoundError): os.lstat(name2) + if cache_timeout: + safe_sleep(cache_timeout + 1) + fstat1 = os.lstat(name1) + assert fstat1.st_nlink == 1 os.unlink(name1) @@ -508,6 +600,10 @@ def tst_truncate_path(mnt_dir): with open(filename, "rb") as fh: assert fh.read(size) == TEST_DATA[: size - 1024] + # Truncate to zero + os.truncate(filename, 0) + assert os.stat(filename).st_size == 0 + os.unlink(filename) @@ -533,6 +629,10 @@ def tst_truncate_fd(mnt_dir): fh.seek(0) assert fh.read(size) == TEST_DATA[: size - 1024] + # Truncate to zero via fd + os.ftruncate(fd, 0) + assert os.fstat(fd).st_size == 0 + def tst_utimens(mnt_dir, tol=0): filename = pjoin(mnt_dir, name_generator()) @@ -573,7 +673,7 @@ def tst_utimens_now(mnt_dir): def tst_passthrough(src_dir, mnt_dir, cache_timeout): name = name_generator() src_name = pjoin(src_dir, name) - mnt_name = pjoin(src_dir, name) + mnt_name = pjoin(mnt_dir, name) assert name not in os.listdir(src_dir) assert name not in os.listdir(mnt_dir) with open(src_name, "w") as fh: @@ -582,11 +682,16 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout): if cache_timeout: safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) - assert os.stat(src_name) == os.stat(mnt_name) + src_st = os.stat(src_name) + mnt_st = os.stat(mnt_name) + assert src_st.st_size == mnt_st.st_size + assert src_st.st_uid == mnt_st.st_uid + assert src_st.st_gid == mnt_st.st_gid + assert abs(src_st.st_mtime - mnt_st.st_mtime) <= 1 name = name_generator() src_name = pjoin(src_dir, name) - mnt_name = pjoin(src_dir, name) + mnt_name = pjoin(mnt_dir, name) assert name not in os.listdir(src_dir) assert name not in os.listdir(mnt_dir) with open(mnt_name, "w") as fh: @@ -595,4 +700,154 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout): if cache_timeout: safe_sleep(cache_timeout + 1) assert name in os.listdir(mnt_dir) - assert os.stat(src_name) == os.stat(mnt_name) + src_st = os.stat(src_name) + mnt_st = os.stat(mnt_name) + assert src_st.st_size == mnt_st.st_size + assert src_st.st_uid == mnt_st.st_uid + assert src_st.st_gid == mnt_st.st_gid + assert abs(src_st.st_mtime - mnt_st.st_mtime) <= 1 + + +def _check_ssh_localhost(): + try: + res = subprocess.call( + ["ssh", "-o", "StrictHostKeyChecking=no", + "-o", "KbdInteractiveAuthentication=no", + "-o", "ChallengeResponseAuthentication=no", + "-o", "PasswordAuthentication=no", + "localhost", "--", "true"], + stdin=subprocess.DEVNULL, timeout=10, + ) + except subprocess.TimeoutExpired: + res = 1 + if res != 0: + pytest.fail("Unable to ssh into localhost without password prompt.") + + +_mount_ctr = [0] + + +def _mount_sshfs(tmpdir, extra_opts=None): + """Helper to mount sshfs with custom options. Returns (mount_process, mnt_dir, src_dir).""" + _check_ssh_localhost() + _mount_ctr[0] += 1 + mnt_dir = str(tmpdir.mkdir(f"mnt{_mount_ctr[0]}")) + src_dir = str(tmpdir.mkdir(f"src{_mount_ctr[0]}")) + + cmdline = base_cmdline + [ + pjoin(basename, "sshfs"), + "-f", + f"localhost:{src_dir}", + mnt_dir, + "-o", "entry_timeout=0", + "-o", "attr_timeout=0", + ] + if extra_opts: + for opt in extra_opts: + cmdline += ["-o", opt] + + new_env = dict(os.environ) + new_env["G_DEBUG"] = "fatal-warnings" + + mount_process = subprocess.Popen(cmdline, env=new_env) + try: + wait_for_mount(mount_process, mnt_dir) + except: + cleanup(mount_process, mnt_dir) + raise + return mount_process, mnt_dir, src_dir + + +def test_disable_hardlink(tmpdir, capfd): + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) + + # Control: verify hardlinks work without disable_hardlink. + # If the server lacks the extension, skip this test entirely. + mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, []) + try: + name1 = pjoin(mnt_dir, name_generator()) + name2 = pjoin(mnt_dir, name_generator()) + with open(name1, "wb") as fh: + fh.write(b"test") + try: + os.link(name1, name2) + except OSError: + os.unlink(name1) + pytest.skip("server does not support hardlink extension") + os.unlink(name2) + os.unlink(name1) + except Exception: + cleanup(mount_process, mnt_dir) + raise + else: + umount(mount_process, mnt_dir) + + # Now test with disable_hardlink — links should fail + mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["disable_hardlink"]) + try: + name1 = pjoin(mnt_dir, name_generator()) + name2 = pjoin(mnt_dir, name_generator()) + with open(name1, "wb") as fh: + fh.write(b"test") + with pytest.raises(OSError) as exc_info: + os.link(name1, name2) + assert exc_info.value.errno in (errno.ENOSYS, errno.EPERM) + os.unlink(name1) + except Exception: + cleanup(mount_process, mnt_dir) + raise + else: + umount(mount_process, mnt_dir) + + +def test_follow_symlinks(tmpdir, capfd): + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) + mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["follow_symlinks"]) + try: + target_name = name_generator() + target = pjoin(src_dir, target_name) + with open(target, "wb") as fh: + fh.write(b"symlink target data") + + link = pjoin(src_dir, name_generator()) + os.symlink(target_name, link) + + mnt_link = pjoin(mnt_dir, os.path.basename(link)) + # With follow_symlinks, stat should return the target's attributes + # and the entry should appear as a regular file, not a symlink + fstat = os.lstat(mnt_link) + assert stat.S_ISREG(fstat.st_mode) + with open(mnt_link, "rb") as fh: + assert fh.read() == b"symlink target data" + + os.unlink(link) + os.unlink(target) + except Exception: + cleanup(mount_process, mnt_dir) + raise + else: + umount(mount_process, mnt_dir) + + +def test_direct_io(tmpdir, capfd): + capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0) + mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["direct_io"]) + try: + name = name_generator() + mnt_name = pjoin(mnt_dir, name) + src_name = pjoin(src_dir, name) + data = b"direct io test data\n" * 100 + + with open(mnt_name, "wb") as fh: + fh.write(data) + with open(mnt_name, "rb") as fh: + assert fh.read() == data + with open(src_name, "rb") as fh: + assert fh.read() == data + + os.unlink(mnt_name) + except Exception: + cleanup(mount_process, mnt_dir) + raise + else: + umount(mount_process, mnt_dir)