From 8a99e8e6296c410d15ff2f5246b8a28d493a455b Mon Sep 17 00:00:00 2001 From: Oleksander Piskun Date: Tue, 17 Feb 2026 14:35:13 +0200 Subject: [PATCH 1/5] fix: use FileLock with atomic rename for safe concurrent model downloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When KEDA scales up multiple ExApp pods sharing storage, concurrent downloads of the same model can corrupt files. Use OS-level FileLock (auto-releases on process death) with temp file + atomic os.replace() to ensure the final file is always complete or absent, never partial. Unlike SoftFileLock, FileLock cannot leave stale locks after pod SIGKILL. No lock added for snapshot_download — huggingface_hub handles this internally. --- nc_py_api/ex_app/integration_fastapi.py | 28 ++++++++++++++++--------- pyproject.toml | 1 + 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/nc_py_api/ex_app/integration_fastapi.py b/nc_py_api/ex_app/integration_fastapi.py index 42b3bf94..7aa6939a 100644 --- a/nc_py_api/ex_app/integration_fastapi.py +++ b/nc_py_api/ex_app/integration_fastapi.py @@ -20,6 +20,7 @@ status, ) from fastapi.responses import JSONResponse, PlainTextResponse +from filelock import FileLock from starlette.requests import HTTPConnection, Request from starlette.types import ASGIApp, Receive, Scope, Send @@ -205,7 +206,8 @@ def __fetch_model_as_file( current_progress: int, progress_for_task: int, nc: NextcloudApp, model_path: str, download_options: dict ) -> str: result_path = download_options.pop("save_path", urlparse(model_path).path.split("/")[-1]) - with niquests.get(model_path, stream=True) as response: + tmp_path = result_path + ".tmp" + with FileLock(result_path + ".lock", timeout=7200), niquests.get(model_path, stream=True) as response: if not response.ok: raise ModelFetchError( f"Downloading of '{model_path}' failed, returned ({response.status_code}) {response.text}" @@ -232,15 +234,21 @@ def __fetch_model_as_file( nc.set_init_status(min(current_progress + progress_for_task, 99)) return result_path - with builtins.open(result_path, "wb") as file: - last_progress = current_progress - for chunk in response.iter_raw(-1): - downloaded_size += file.write(chunk) - if total_size: - new_progress = min(current_progress + int(progress_for_task * downloaded_size / total_size), 99) - if new_progress != last_progress: - nc.set_init_status(new_progress) - last_progress = new_progress + try: + with builtins.open(tmp_path, "wb") as file: + last_progress = current_progress + for chunk in response.iter_raw(-1): + downloaded_size += file.write(chunk) + if total_size: + new_progress = min(current_progress + int(progress_for_task * downloaded_size / total_size), 99) + if new_progress != last_progress: + nc.set_init_status(new_progress) + last_progress = new_progress + os.replace(tmp_path, result_path) + except BaseException: + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise return result_path diff --git a/pyproject.toml b/pyproject.toml index 69dfc528..ae184769 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dynamic = [ ] dependencies = [ "fastapi>=0.109.2", + "filelock>=3.16,<4", "niquests>=3,<4", "pydantic>=2.1.1", "python-dotenv>=1", From fb2f362cfd766f9b0b5f9bee3fd83299ac8730e1 Mon Sep 17 00:00:00 2001 From: Oleksander Piskun Date: Wed, 18 Feb 2026 18:28:23 +0200 Subject: [PATCH 2/5] add tests Signed-off-by: Oleksander Piskun --- .pre-commit-config.yaml | 6 +- pyproject.toml | 5 + tests_unit/test_fetch_model_file.py | 199 ++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 tests_unit/test_fetch_model_file.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a83573b5..79e8284d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,7 +20,8 @@ repos: nc_py_api/| benchmarks/| examples/| - tests/ + tests/| + tests_unit/ ) - repo: https://github.com/psf/black @@ -32,7 +33,8 @@ repos: nc_py_api/| benchmarks/| examples/| - tests/ + tests/| + tests_unit/ ) - repo: https://github.com/tox-dev/pyproject-fmt diff --git a/pyproject.toml b/pyproject.toml index ae184769..c3896533 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,6 +164,10 @@ lint.extend-per-file-ignores."tests/**/*.py" = [ "S", "UP", ] +lint.extend-per-file-ignores."tests_unit/**/*.py" = [ + "D", + "S", +] lint.mccabe.max-complexity = 16 [tool.isort] @@ -214,6 +218,7 @@ messages_control.disable = [ minversion = "6.0" testpaths = [ "tests", + "tests_unit", ] filterwarnings = [ "ignore::DeprecationWarning", diff --git a/tests_unit/test_fetch_model_file.py b/tests_unit/test_fetch_model_file.py new file mode 100644 index 00000000..2936f210 --- /dev/null +++ b/tests_unit/test_fetch_model_file.py @@ -0,0 +1,199 @@ +"""Tests for model file download with FileLock and atomic rename.""" + +import hashlib +import os +from threading import Thread +from unittest import mock + +import pytest +from filelock import FileLock + +from nc_py_api._exceptions import ModelFetchError +from nc_py_api.ex_app.integration_fastapi import fetch_models_task + + +class FakeResponse: + """Mock HTTP response for niquests.get() with streaming support.""" + + def __init__(self, content: bytes, etag: str = "", status_code: int = 200, ok: bool = True): + self.content = content + self.status_code = status_code + self.ok = ok + self.text = "" if ok else "Not Found" + self.history = [] + sha = hashlib.sha256(content).hexdigest() + self.headers = { + "Content-Length": str(len(content)), + "ETag": etag or f'"{sha}"', + } + + def iter_raw(self, _chunk_size): + yield self.content + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + +def _mock_nc(): + nc = mock.MagicMock() + nc.set_init_status = mock.MagicMock() + return nc + + +class TestFetchModelAsFile: + """Tests for __fetch_model_as_file via fetch_models_task.""" + + def test_download_creates_file(self, tmp_path): + content = b"model-data-abc" + save_path = str(tmp_path / "model.bin") + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=FakeResponse(content)): + fetch_models_task(_mock_nc(), {"https://example.com/model.bin": {"save_path": save_path}}, 0) + + assert os.path.isfile(save_path) + with open(save_path, "rb") as f: + assert f.read() == content + + def test_no_tmp_file_remains_after_success(self, tmp_path): + save_path = str(tmp_path / "model.bin") + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=FakeResponse(b"data")): + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + assert not os.path.exists(save_path + ".tmp") + + def test_lock_file_released_after_download(self, tmp_path): + save_path = str(tmp_path / "model.bin") + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=FakeResponse(b"data")): + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + lock_path = save_path + ".lock" + # Lock file may or may not exist (FileLock implementation detail), + # but it must not be held — acquiring it should succeed immediately. + lock = FileLock(lock_path, timeout=1) + lock.acquire() + lock.release() + + def test_skips_download_when_file_matches_etag(self, tmp_path): + content = b"existing-model-data" + sha = hashlib.sha256(content).hexdigest() + etag = f'"{sha}"' + save_path = str(tmp_path / "model.bin") + with open(save_path, "wb") as f: + f.write(content) + + call_count = {"iter_raw": 0} + original_iter_raw = FakeResponse.iter_raw + + def tracking_iter_raw(self, chunk_size): + call_count["iter_raw"] += 1 + yield from original_iter_raw(self, chunk_size) + + resp = FakeResponse(content, etag=etag) + resp.iter_raw = lambda cs: tracking_iter_raw(resp, cs) + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=resp): + fetch_models_task(_mock_nc(), {"https://example.com/model.bin": {"save_path": save_path}}, 0) + + assert call_count["iter_raw"] == 0 + with open(save_path, "rb") as f: + assert f.read() == content + + def test_tmp_file_cleaned_up_on_download_error(self, tmp_path): + save_path = str(tmp_path / "model.bin") + + def failing_iter_raw(_chunk_size): + yield b"partial" + raise ConnectionError("network down") + + resp = FakeResponse(b"full-content") + resp.iter_raw = failing_iter_raw + + with ( + mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=resp), + pytest.raises(ModelFetchError), + ): + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + assert not os.path.exists(save_path + ".tmp") + assert not os.path.exists(save_path) + + def test_original_file_untouched_on_download_error(self, tmp_path): + save_path = str(tmp_path / "model.bin") + with open(save_path, "wb") as f: + f.write(b"original-good-data") + + def failing_iter_raw(_chunk_size): + yield b"partial" + raise ConnectionError("network down") + + resp = FakeResponse(b"new-content", etag='"different-etag"') + resp.iter_raw = failing_iter_raw + + with ( + mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=resp), + pytest.raises(ModelFetchError), + ): + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + with open(save_path, "rb") as f: + assert f.read() == b"original-good-data" + + def test_http_error_raises_model_fetch_error(self, tmp_path): + save_path = str(tmp_path / "model.bin") + resp = FakeResponse(b"", status_code=404, ok=False) + + with ( + mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=resp), + pytest.raises(ModelFetchError), + ): + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + def test_concurrent_downloads_do_not_corrupt(self, tmp_path): + save_path = str(tmp_path / "model.bin") + errors = [] + + def download(): + try: + fetch_models_task(_mock_nc(), {"https://example.com/m.bin": {"save_path": save_path}}, 0) + except Exception as e: # noqa pylint: disable=broad-exception-caught + errors.append(e) + + # Patch once around both threads to avoid mock.patch context manager + # race: independent per-thread patches can restore the original + # function while the other thread still needs the mock. + responses = iter([FakeResponse(b"A" * 10000), FakeResponse(b"B" * 10000)]) + + def mock_get(_url, **_kwargs): + return next(responses) + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", side_effect=mock_get): + t1 = Thread(target=download) + t2 = Thread(target=download) + t1.start() + t2.start() + t1.join(timeout=60) + t2.join(timeout=60) + + assert not errors, f"Threads raised errors: {errors}" + assert os.path.isfile(save_path) + with open(save_path, "rb") as f: + data = f.read() + # File must be entirely one content or the other — never mixed + assert data in (b"A" * 10000, b"B" * 10000) + + def test_progress_updates_sent(self, tmp_path): + save_path = str(tmp_path / "model.bin") + nc = _mock_nc() + + with mock.patch("nc_py_api.ex_app.integration_fastapi.niquests.get", return_value=FakeResponse(b"data")): + fetch_models_task(nc, {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + # set_init_status should be called at least for completion (100) + assert nc.set_init_status.called + # Last call should be 100 (completion) + assert nc.set_init_status.call_args_list[-1] == mock.call(100) From 0807acddbc6865516db4126db75a439bff6ce5a0 Mon Sep 17 00:00:00 2001 From: Oleksander Piskun Date: Thu, 19 Feb 2026 15:16:52 +0200 Subject: [PATCH 3/5] fix(deps): bump filelock minimum to 3.20.3 for CVE-2025-68146 and CVE-2026-22701 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5e97c41d..904019be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ dynamic = [ ] dependencies = [ "fastapi>=0.109.2", - "filelock>=3.16,<4", + "filelock>=3.20.3,<4", "niquests>=3,<4", "pydantic>=2.1.1", "python-dotenv>=1", From d1d4d9378f1138646fb97153e41e59e9b324afa6 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Fri, 20 Feb 2026 15:33:10 +0200 Subject: [PATCH 4/5] fix: handle FileLock timeout explicitly, rename misleading var, align lint ignores MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Catch filelock.Timeout separately from generic errors so operators see an actionable message when a pod is stuck holding the download lock - Rename each_history → redirect_resp (response.history entries are redirect responses, not "histories") - Add E402 and UP to tests_unit/ ruff ignores to match tests/ config - Add test for FileLockTimeout → ModelFetchError propagation --- nc_py_api/ex_app/integration_fastapi.py | 92 ++++++++++++++----------- pyproject.toml | 2 + tests_unit/test_fetch_model_file.py | 15 ++++ 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/nc_py_api/ex_app/integration_fastapi.py b/nc_py_api/ex_app/integration_fastapi.py index 7aa6939a..b89c2513 100644 --- a/nc_py_api/ex_app/integration_fastapi.py +++ b/nc_py_api/ex_app/integration_fastapi.py @@ -21,6 +21,7 @@ ) from fastapi.responses import JSONResponse, PlainTextResponse from filelock import FileLock +from filelock import Timeout as FileLockTimeout from starlette.requests import HTTPConnection, Request from starlette.types import ASGIApp, Receive, Scope, Send @@ -207,48 +208,55 @@ def __fetch_model_as_file( ) -> str: result_path = download_options.pop("save_path", urlparse(model_path).path.split("/")[-1]) tmp_path = result_path + ".tmp" - with FileLock(result_path + ".lock", timeout=7200), niquests.get(model_path, stream=True) as response: - if not response.ok: - raise ModelFetchError( - f"Downloading of '{model_path}' failed, returned ({response.status_code}) {response.text}" - ) - downloaded_size = 0 - linked_etag = "" - for each_history in response.history: - linked_etag = each_history.headers.get("X-Linked-ETag", "") - if linked_etag: - break - if not linked_etag: - linked_etag = response.headers.get("X-Linked-ETag", response.headers.get("ETag", "")) - total_size = int(response.headers.get("Content-Length")) - try: - existing_size = os.path.getsize(result_path) - except OSError: - existing_size = 0 - if linked_etag and total_size == existing_size: - with builtins.open(result_path, "rb") as file: - sha256_hash = hashlib.sha256() - for byte_block in iter(lambda: file.read(4096), b""): - sha256_hash.update(byte_block) - if f'"{sha256_hash.hexdigest()}"' == linked_etag: - nc.set_init_status(min(current_progress + progress_for_task, 99)) - return result_path - - try: - with builtins.open(tmp_path, "wb") as file: - last_progress = current_progress - for chunk in response.iter_raw(-1): - downloaded_size += file.write(chunk) - if total_size: - new_progress = min(current_progress + int(progress_for_task * downloaded_size / total_size), 99) - if new_progress != last_progress: - nc.set_init_status(new_progress) - last_progress = new_progress - os.replace(tmp_path, result_path) - except BaseException: - if os.path.exists(tmp_path): - os.remove(tmp_path) - raise + try: + with FileLock(result_path + ".lock", timeout=7200), niquests.get(model_path, stream=True) as response: + if not response.ok: + raise ModelFetchError( + f"Downloading of '{model_path}' failed, returned ({response.status_code}) {response.text}" + ) + downloaded_size = 0 + linked_etag = "" + for redirect_resp in response.history: + linked_etag = redirect_resp.headers.get("X-Linked-ETag", "") + if linked_etag: + break + if not linked_etag: + linked_etag = response.headers.get("X-Linked-ETag", response.headers.get("ETag", "")) + total_size = int(response.headers.get("Content-Length")) + try: + existing_size = os.path.getsize(result_path) + except OSError: + existing_size = 0 + if linked_etag and total_size == existing_size: + with builtins.open(result_path, "rb") as file: + sha256_hash = hashlib.sha256() + for byte_block in iter(lambda: file.read(4096), b""): + sha256_hash.update(byte_block) + if f'"{sha256_hash.hexdigest()}"' == linked_etag: + nc.set_init_status(min(current_progress + progress_for_task, 99)) + return result_path + + try: + with builtins.open(tmp_path, "wb") as file: + last_progress = current_progress + for chunk in response.iter_raw(-1): + downloaded_size += file.write(chunk) + if total_size: + new_progress = min( + current_progress + int(progress_for_task * downloaded_size / total_size), 99 + ) + if new_progress != last_progress: + nc.set_init_status(new_progress) + last_progress = new_progress + os.replace(tmp_path, result_path) + except BaseException: + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise + except FileLockTimeout as exc: + raise ModelFetchError( + f"Timed out waiting for lock on '{result_path}' after 7200s — another process may be stuck downloading" + ) from exc return result_path diff --git a/pyproject.toml b/pyproject.toml index 904019be..d6be19df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -151,7 +151,9 @@ lint.extend-per-file-ignores."tests/**/*.py" = [ ] lint.extend-per-file-ignores."tests_unit/**/*.py" = [ "D", + "E402", "S", + "UP", ] lint.mccabe.max-complexity = 16 diff --git a/tests_unit/test_fetch_model_file.py b/tests_unit/test_fetch_model_file.py index 2936f210..2692e1cd 100644 --- a/tests_unit/test_fetch_model_file.py +++ b/tests_unit/test_fetch_model_file.py @@ -7,6 +7,7 @@ import pytest from filelock import FileLock +from filelock import Timeout as FileLockTimeout from nc_py_api._exceptions import ModelFetchError from nc_py_api.ex_app.integration_fastapi import fetch_models_task @@ -186,6 +187,20 @@ def mock_get(_url, **_kwargs): # File must be entirely one content or the other — never mixed assert data in (b"A" * 10000, b"B" * 10000) + def test_filelock_timeout_raises_model_fetch_error(self, tmp_path): + save_path = str(tmp_path / "model.bin") + lock = FileLock(save_path + ".lock") + nc = _mock_nc() + + with ( + mock.patch("nc_py_api.ex_app.integration_fastapi.FileLock", side_effect=FileLockTimeout(lock)), + pytest.raises(ModelFetchError), + ): + fetch_models_task(nc, {"https://example.com/m.bin": {"save_path": save_path}}, 0) + + status_msg = nc.set_init_status.call_args_list[-1][0][1] + assert "Timed out waiting for lock" in status_msg + def test_progress_updates_sent(self, tmp_path): save_path = str(tmp_path / "model.bin") nc = _mock_nc() From 0ea6f98cbc3322cb3872e9e3d84282f8f2c75907 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Fri, 20 Feb 2026 16:33:12 +0200 Subject: [PATCH 5/5] fix: handle missing Content-Length header, fix docstring param name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Default Content-Length to 0 when absent to avoid int(None) TypeError - Guard ETag skip with `and total_size` so 0==0 doesn't open a nonexistent file - Fix docstring: models_to_fetch → models to match actual param name --- nc_py_api/ex_app/integration_fastapi.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nc_py_api/ex_app/integration_fastapi.py b/nc_py_api/ex_app/integration_fastapi.py index b89c2513..38e682fe 100644 --- a/nc_py_api/ex_app/integration_fastapi.py +++ b/nc_py_api/ex_app/integration_fastapi.py @@ -160,7 +160,7 @@ def fetch_models_task(nc: NextcloudApp, models: dict[str, dict], progress_init_s """Use for cases when you want to define custom `/init` but still need to easy download models. :param nc: NextcloudApp instance. - :param models_to_fetch: Dictionary describing which models should be downloaded of the form: + :param models: Dictionary describing which models should be downloaded of the form: .. code-block:: python { "model_url_1": { @@ -222,12 +222,12 @@ def __fetch_model_as_file( break if not linked_etag: linked_etag = response.headers.get("X-Linked-ETag", response.headers.get("ETag", "")) - total_size = int(response.headers.get("Content-Length")) + total_size = int(response.headers.get("Content-Length", 0)) try: existing_size = os.path.getsize(result_path) except OSError: existing_size = 0 - if linked_etag and total_size == existing_size: + if linked_etag and total_size and total_size == existing_size: with builtins.open(result_path, "rb") as file: sha256_hash = hashlib.sha256() for byte_block in iter(lambda: file.read(4096), b""):