Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 260 additions & 5 deletions test/test_sshfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ def test_sshfs(
tst_truncate_path(mnt_dir)
tst_truncate_fd(mnt_dir)
tst_open_unlink(mnt_dir)
tst_open_writeonly_read(mnt_dir)
tst_access(mnt_dir)
tst_mkdir_exist(mnt_dir)
tst_readdir_repeated(mnt_dir)
tst_rename_sibling(mnt_dir)
except Exception as exc:
cleanup(mount_process, mnt_dir)
raise exc
Expand Down Expand Up @@ -224,6 +229,8 @@ def tst_rename(mnt_dir):
os.rename(src_name, dst_name)

assert not os.path.exists(src_name)
assert os.path.basename(src_name) not in os.listdir(mnt_dir)
assert os.path.basename(dst_name) in os.listdir(mnt_dir)
with open(dst_name, "rb") as fh:
assert fh.read() == data

Expand All @@ -244,6 +251,7 @@ def tst_rename_over(mnt_dir):
os.rename(src_name, dst_name)

assert not os.path.exists(src_name)
assert os.path.basename(src_name) not in os.listdir(mnt_dir)
with open(dst_name, "rb") as fh:
assert fh.read() == src_data

Expand Down Expand Up @@ -296,6 +304,9 @@ def tst_symlink(mnt_dir):
assert fstat.st_nlink == 1
assert linkname in os.listdir(mnt_dir)

os.unlink(fullname)
assert linkname not in os.listdir(mnt_dir)


def tst_create(mnt_dir):
name = name_generator()
Expand Down Expand Up @@ -392,7 +403,7 @@ def tst_open_unlink(mnt_dir):
os.unlink(fullname)
with pytest.raises(OSError) as exc_info:
os.stat(fullname)
assert exc_info.value.errno == errno.ENOENT
assert exc_info.value.errno == errno.ENOENT
assert name not in os.listdir(mnt_dir)
fh.write(data2)
fh.seek(0)
Expand All @@ -415,6 +426,83 @@ def tst_statvfs(src_dir, mnt_dir):
assert vfs.f_namemax > 0


def tst_open_writeonly_read(mnt_dir):
name = pjoin(mnt_dir, name_generator())
fd = os.open(name, os.O_CREAT | os.O_WRONLY)
try:
os.write(fd, b"hello")
with pytest.raises(OSError) as exc_info:
os.read(fd, 10)
assert exc_info.value.errno == errno.EBADF
finally:
os.close(fd)
os.unlink(name)


def tst_access(mnt_dir):
filename = pjoin(mnt_dir, name_generator())
with open(filename, "wb") as fh:
fh.write(b"test")
os.chmod(filename, 0o644)
assert os.access(filename, os.R_OK)
if os.getuid() != 0:
assert not os.access(filename, os.X_OK)
os.unlink(filename)


def tst_mkdir_exist(mnt_dir):
name = name_generator()
fullname = pjoin(mnt_dir, name)
os.mkdir(fullname)
with pytest.raises(OSError) as exc_info:
os.mkdir(fullname)
assert exc_info.value.errno == errno.EEXIST
os.rmdir(fullname)


def tst_readdir_repeated(mnt_dir):
dirname = pjoin(mnt_dir, name_generator())
os.mkdir(dirname)
names = []
for i in range(5):
n = name_generator()
names.append(n)
with open(pjoin(dirname, n), "wb") as fh:
fh.write(b"x")

# Verify repeated directory listings return consistent results
listing1 = sorted(os.listdir(dirname))
listing2 = sorted(os.listdir(dirname))
assert listing1 == sorted(names)
assert listing1 == listing2

for n in names:
os.unlink(pjoin(dirname, n))
os.rmdir(dirname)


def tst_rename_sibling(mnt_dir):
# Verify renaming one file doesn't break access to a sibling
name_a = pjoin(mnt_dir, name_generator())
name_b = pjoin(mnt_dir, name_generator())
name_c = pjoin(mnt_dir, name_generator())

with open(name_a, "wb") as fh:
fh.write(b"aaa")
with open(name_b, "wb") as fh:
fh.write(b"bbb")

os.rename(name_a, name_c)

assert not os.path.exists(name_a)
assert os.path.exists(name_b)
with open(name_b, "rb") as fh:
assert fh.read() == b"bbb"

os.unlink(name_b)
os.unlink(name_c)


def tst_link(mnt_dir, cache_timeout):
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
Expand Down Expand Up @@ -455,6 +543,10 @@ def tst_link(mnt_dir, cache_timeout):
assert os.path.basename(name2) not in os.listdir(mnt_dir)
with pytest.raises(FileNotFoundError):
os.lstat(name2)
if cache_timeout:
safe_sleep(cache_timeout + 1)
fstat1 = os.lstat(name1)
assert fstat1.st_nlink == 1

os.unlink(name1)

Expand Down Expand Up @@ -508,6 +600,10 @@ def tst_truncate_path(mnt_dir):
with open(filename, "rb") as fh:
assert fh.read(size) == TEST_DATA[: size - 1024]

# Truncate to zero
os.truncate(filename, 0)
assert os.stat(filename).st_size == 0

os.unlink(filename)


Expand All @@ -533,6 +629,10 @@ def tst_truncate_fd(mnt_dir):
fh.seek(0)
assert fh.read(size) == TEST_DATA[: size - 1024]

# Truncate to zero via fd
os.ftruncate(fd, 0)
assert os.fstat(fd).st_size == 0


def tst_utimens(mnt_dir, tol=0):
filename = pjoin(mnt_dir, name_generator())
Expand Down Expand Up @@ -573,7 +673,7 @@ def tst_utimens_now(mnt_dir):
def tst_passthrough(src_dir, mnt_dir, cache_timeout):
name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
mnt_name = pjoin(mnt_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(src_name, "w") as fh:
Expand All @@ -582,11 +682,16 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout):
if cache_timeout:
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
src_st = os.stat(src_name)
mnt_st = os.stat(mnt_name)
assert src_st.st_size == mnt_st.st_size
assert src_st.st_uid == mnt_st.st_uid
assert src_st.st_gid == mnt_st.st_gid
assert abs(src_st.st_mtime - mnt_st.st_mtime) <= 1

name = name_generator()
src_name = pjoin(src_dir, name)
mnt_name = pjoin(src_dir, name)
mnt_name = pjoin(mnt_dir, name)
assert name not in os.listdir(src_dir)
assert name not in os.listdir(mnt_dir)
with open(mnt_name, "w") as fh:
Expand All @@ -595,4 +700,154 @@ def tst_passthrough(src_dir, mnt_dir, cache_timeout):
if cache_timeout:
safe_sleep(cache_timeout + 1)
assert name in os.listdir(mnt_dir)
assert os.stat(src_name) == os.stat(mnt_name)
src_st = os.stat(src_name)
mnt_st = os.stat(mnt_name)
assert src_st.st_size == mnt_st.st_size
assert src_st.st_uid == mnt_st.st_uid
assert src_st.st_gid == mnt_st.st_gid
assert abs(src_st.st_mtime - mnt_st.st_mtime) <= 1


def _check_ssh_localhost():
try:
res = subprocess.call(
["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "KbdInteractiveAuthentication=no",
"-o", "ChallengeResponseAuthentication=no",
"-o", "PasswordAuthentication=no",
"localhost", "--", "true"],
stdin=subprocess.DEVNULL, timeout=10,
)
except subprocess.TimeoutExpired:
res = 1
if res != 0:
pytest.fail("Unable to ssh into localhost without password prompt.")


_mount_ctr = [0]


def _mount_sshfs(tmpdir, extra_opts=None):
"""Helper to mount sshfs with custom options. Returns (mount_process, mnt_dir, src_dir)."""
_check_ssh_localhost()
_mount_ctr[0] += 1
mnt_dir = str(tmpdir.mkdir(f"mnt{_mount_ctr[0]}"))
src_dir = str(tmpdir.mkdir(f"src{_mount_ctr[0]}"))

cmdline = base_cmdline + [
pjoin(basename, "sshfs"),
"-f",
f"localhost:{src_dir}",
mnt_dir,
"-o", "entry_timeout=0",
"-o", "attr_timeout=0",
]
if extra_opts:
for opt in extra_opts:
cmdline += ["-o", opt]

new_env = dict(os.environ)
new_env["G_DEBUG"] = "fatal-warnings"

mount_process = subprocess.Popen(cmdline, env=new_env)
try:
wait_for_mount(mount_process, mnt_dir)
except:
cleanup(mount_process, mnt_dir)
raise
return mount_process, mnt_dir, src_dir


def test_disable_hardlink(tmpdir, capfd):
capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0)

# Control: verify hardlinks work without disable_hardlink.
# If the server lacks the extension, skip this test entirely.
mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, [])
try:
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
with open(name1, "wb") as fh:
fh.write(b"test")
try:
os.link(name1, name2)
except OSError:
os.unlink(name1)
pytest.skip("server does not support hardlink extension")
os.unlink(name2)
os.unlink(name1)
except Exception:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)

# Now test with disable_hardlink — links should fail
mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["disable_hardlink"])
try:
name1 = pjoin(mnt_dir, name_generator())
name2 = pjoin(mnt_dir, name_generator())
with open(name1, "wb") as fh:
fh.write(b"test")
with pytest.raises(OSError) as exc_info:
os.link(name1, name2)
assert exc_info.value.errno in (errno.ENOSYS, errno.EPERM)
os.unlink(name1)
except Exception:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)


def test_follow_symlinks(tmpdir, capfd):
capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0)
mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["follow_symlinks"])
try:
target_name = name_generator()
target = pjoin(src_dir, target_name)
with open(target, "wb") as fh:
fh.write(b"symlink target data")

link = pjoin(src_dir, name_generator())
os.symlink(target_name, link)

mnt_link = pjoin(mnt_dir, os.path.basename(link))
# With follow_symlinks, stat should return the target's attributes
# and the entry should appear as a regular file, not a symlink
fstat = os.lstat(mnt_link)
assert stat.S_ISREG(fstat.st_mode)
with open(mnt_link, "rb") as fh:
assert fh.read() == b"symlink target data"

os.unlink(link)
os.unlink(target)
except Exception:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)


def test_direct_io(tmpdir, capfd):
capfd.register_output(r"^Warning: Permanently added 'localhost' .+", count=0)
mount_process, mnt_dir, src_dir = _mount_sshfs(tmpdir, ["direct_io"])
try:
name = name_generator()
mnt_name = pjoin(mnt_dir, name)
src_name = pjoin(src_dir, name)
data = b"direct io test data\n" * 100

with open(mnt_name, "wb") as fh:
fh.write(data)
with open(mnt_name, "rb") as fh:
assert fh.read() == data
with open(src_name, "rb") as fh:
assert fh.read() == data

os.unlink(mnt_name)
except Exception:
cleanup(mount_process, mnt_dir)
raise
else:
umount(mount_process, mnt_dir)
Loading