From c67247983b9c8af9b73afe6efe919c841e1c9723 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Tue, 17 Feb 2026 12:21:31 +0100 Subject: [PATCH] fix: use FileLock to avoid downloading models concurrently into shared storage Signed-off-by: Marcel Klehr --- nc_py_api/ex_app/integration_fastapi.py | 86 ++++++++++++++----------- pyproject.toml | 1 + 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/nc_py_api/ex_app/integration_fastapi.py b/nc_py_api/ex_app/integration_fastapi.py index 42b3bf94..9b92e90f 100644 --- a/nc_py_api/ex_app/integration_fastapi.py +++ b/nc_py_api/ex_app/integration_fastapi.py @@ -20,6 +20,7 @@ status, ) from fastapi.responses import JSONResponse, PlainTextResponse +from filelock import SoftFileLock from starlette.requests import HTTPConnection, Request from starlette.types import ASGIApp, Receive, Scope, Send @@ -205,42 +206,43 @@ def __fetch_model_as_file( current_progress: int, progress_for_task: int, nc: NextcloudApp, model_path: str, download_options: dict ) -> str: result_path = download_options.pop("save_path", urlparse(model_path).path.split("/")[-1]) - with niquests.get(model_path, stream=True) as response: - if not response.ok: - raise ModelFetchError( - f"Downloading of '{model_path}' failed, returned ({response.status_code}) {response.text}" - ) - downloaded_size = 0 - linked_etag = "" - for each_history in response.history: - linked_etag = each_history.headers.get("X-Linked-ETag", "") - if linked_etag: - break - if not linked_etag: - linked_etag = response.headers.get("X-Linked-ETag", response.headers.get("ETag", "")) - total_size = int(response.headers.get("Content-Length")) - try: - existing_size = os.path.getsize(result_path) - except OSError: - existing_size = 0 - if linked_etag and total_size == existing_size: - with builtins.open(result_path, "rb") as file: - sha256_hash = hashlib.sha256() - for byte_block in iter(lambda: file.read(4096), b""): - sha256_hash.update(byte_block) - if f'"{sha256_hash.hexdigest()}"' == linked_etag: - nc.set_init_status(min(current_progress + progress_for_task, 99)) - return result_path - - with builtins.open(result_path, "wb") as file: - last_progress = current_progress - for chunk in response.iter_raw(-1): - downloaded_size += file.write(chunk) - if total_size: - new_progress = min(current_progress + int(progress_for_task * downloaded_size / total_size), 99) - if new_progress != last_progress: - nc.set_init_status(new_progress) - last_progress = new_progress + with SoftFileLock(result_path + ".lock"): + with niquests.get(model_path, stream=True) as response: + if not response.ok: + raise ModelFetchError( + f"Downloading of '{model_path}' failed, returned ({response.status_code}) {response.text}" + ) + downloaded_size = 0 + linked_etag = "" + for each_history in response.history: + linked_etag = each_history.headers.get("X-Linked-ETag", "") + if linked_etag: + break + if not linked_etag: + linked_etag = response.headers.get("X-Linked-ETag", response.headers.get("ETag", "")) + total_size = int(response.headers.get("Content-Length")) + try: + existing_size = os.path.getsize(result_path) + except OSError: + existing_size = 0 + if linked_etag and total_size == existing_size: + with builtins.open(result_path, "rb") as file: + sha256_hash = hashlib.sha256() + for byte_block in iter(lambda: file.read(4096), b""): + sha256_hash.update(byte_block) + if f'"{sha256_hash.hexdigest()}"' == linked_etag: + nc.set_init_status(min(current_progress + progress_for_task, 99)) + return result_path + + with builtins.open(result_path, "wb") as file: + last_progress = current_progress + for chunk in response.iter_raw(-1): + downloaded_size += file.write(chunk) + if total_size: + new_progress = min(current_progress + int(progress_for_task * downloaded_size / total_size), 99) + if new_progress != last_progress: + nc.set_init_status(new_progress) + last_progress = new_progress return result_path @@ -265,9 +267,15 @@ def display(self, msg=None, pos=None): workers = download_options.pop("max_workers", 2) cache = download_options.pop("cache_dir", persistent_storage()) - return snapshot_download( - model_name, tqdm_class=TqdmProgress, **download_options, max_workers=workers, cache_dir=cache - ) + safe_model_name = model_name + for sep in (os.sep, os.altsep): + if sep: + safe_model_name = safe_model_name.replace(sep, "_") + lock_path = os.path.join(cache, f"{safe_model_name}.lock") + with SoftFileLock(lock_path): + return snapshot_download( + model_name, tqdm_class=TqdmProgress, **download_options, max_workers=workers, cache_dir=cache + ) def __nc_app(request: HTTPConnection) -> dict: diff --git a/pyproject.toml b/pyproject.toml index 69dfc528..f84d469b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dynamic = [ ] dependencies = [ "fastapi>=0.109.2", + "filelock>=3.24.2,<4", "niquests>=3,<4", "pydantic>=2.1.1", "python-dotenv>=1",