From c5eaf081a357b17e4f44f5ed0c9b04aea1347010 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 14 May 2026 16:01:53 -0700 Subject: [PATCH 1/4] fix(ingestion): cap OM and Snowflake socket waits so a silently dropped TCP connection no longer hangs the worker A Snowflake metadata ingestion run was observed to silently pause: the Airflow pod stayed alive (no OOM, no crash), no further API calls hit the OpenMetadata server, no further log lines appeared. The pod-side log ended at `Source.GET executed in 59.057ms` -- a successful HTTP GET to the OM server -- with the next expected `Source executed` / `Sink.PATCH` lines never reaching the server. Streamable-log shipping continued from its daemon thread for ~13 minutes (draining its buffer) and then went quiet too. Root cause is a structural one: HTTP calls from the connector to the OM server, and the SQLAlchemy/Snowflake connection itself, both go out with no read timeout. `requests.Session()` and the Snowflake driver keep their TCP connections alive in pools; intermediate network devices (K8s NAT, ingress NLB, AWS NLB default 350s, Azure LB default 4 min) silently sever idle sockets. The next call grabs a stale connection from the pool and blocks in `socket.recv` waiting for bytes that will never arrive. Linux TCP keepalive defaults to 2 hours. Three minimal changes: - `ingestion/src/metadata/ingestion/ometa/client.py`: default `ClientConfig.timeout` to `(10, 300)` (10s connect, 5min read) instead of `None`. The existing `if self._timeout` guard still honours an explicit `None` override for callers that need it. - `ingestion/src/metadata/ingestion/source/database/snowflake/connection.py`: `setdefault` `client_session_keep_alive=True` and `network_timeout=600` on the connection arguments. User-supplied values in `connectionArguments` still win. - `ingestion/setup.py`: add `py-spy>=0.3.14` to `base_requirements` so the next time a worker hangs we can `py-spy dump --pid ` in place, without first having to install anything in the container. After this, the same kind of dropped socket surfaces as a `requests.exceptions.ReadTimeout` (caught by the existing `except Exception` in `_one_request`, logged, and the workflow continues) instead of wedging the pod indefinitely. SSE streaming uses its own httpx client and is unaffected. --- ingestion/setup.py | 3 +++ ingestion/src/metadata/ingestion/ometa/client.py | 7 +++++-- .../ingestion/source/database/snowflake/connection.py | 6 ++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index dfad58c4e707..4c2a138a0b08 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -154,6 +154,9 @@ "kubernetes>=21.0.0", # Kubernetes client for secrets manager "memory-profiler", "mypy_extensions>=0.4.3", + # Ships in every container so a hung worker can be sampled in place + # (`py-spy dump --pid `) without first installing anything. + "py-spy>=0.3.14", VERSIONS["pydantic"], VERSIONS["pydantic-settings"], VERSIONS["pymysql"], diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index de64869ea4ba..c4ce1bc29db1 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -15,7 +15,7 @@ import time import traceback from datetime import datetime, timezone -from typing import Any, Callable, Dict, List, Optional, Union # noqa: UP035 +from typing import Any, Callable, Dict, List, Optional, Tuple, Union # noqa: UP035 import requests from requests.exceptions import HTTPError, JSONDecodeError @@ -118,7 +118,10 @@ class ClientConfig(ConfigModel): verify: Optional[Union[bool, str]] = None # noqa: UP007, UP045 cookies: Optional[Any] = None # noqa: UP045 ttl_cache: int = 60 - timeout: Optional[int] = None # noqa: UP045 + # (connect, read) seconds. Default prevents indefinite hangs when a pooled + # socket is silently severed (NAT/LB idle reaping). Override with None to + # disable, or pass a single int to use the same value for both. + timeout: Optional[Union[int, Tuple[int, int]]] = (10, 300) # noqa: UP045 cert: Optional[Union[str, tuple]] = None # noqa: UP007, UP045 diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index 0047c3a6a5d9..e839bf606e20 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -205,6 +205,12 @@ def get_connection(self) -> Engine: if keep_alive := self._get_client_session_keep_alive(): connection.connectionArguments.root["client_session_keep_alive"] = keep_alive + # Safe defaults: prevent indefinite hangs when the Snowflake socket is + # silently severed (NAT/LB idle reaping in K8s/hybrid runners). Any + # value the user supplied via connectionArguments wins via setdefault. + connection.connectionArguments.root.setdefault("client_session_keep_alive", True) + connection.connectionArguments.root.setdefault("network_timeout", 600) + engine = create_generic_db_connection( connection=connection, get_connection_url_fn=self.get_connection_url, From 20a72d024d98352ffaed8c2f9bd5075484009257 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 14 May 2026 16:08:32 -0700 Subject: [PATCH 2/4] =?UTF-8?q?review:=20address=20PR=20comments=20?= =?UTF-8?q?=E2=80=94=20defer=20keep=5Falive=20to=20UI=20toggle,=20move=20p?= =?UTF-8?q?y-spy=20to=20Dockerfile?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - snowflake/connection.py: drop the setdefault("client_session_keep_alive", True). The IngestionPipeline UI already exposes a clientSessionKeepAlive toggle (handled at lines 205-206); imposing a default of True here would silently override a user who unchecks it. Hang protection is now covered by network_timeout=600 alone, which bounds any silently-severed Snowflake socket to a 10-minute ReadTimeout instead of hanging the worker forever. Keep-alive only matters for Snowflake-side session expiry, which is a separate concern from the original hang. - setup.py + ingestion/Dockerfile: move py-spy from base_requirements into the ingestion Dockerfile. The original goal — "available in the running container without pip install" — is met without forcing a native Rust binary on dev laptops, CI, and non-container installs that won't have CAP_SYS_PTRACE anyway. --- ingestion/Dockerfile | 6 ++++++ ingestion/setup.py | 3 --- .../ingestion/source/database/snowflake/connection.py | 8 ++++---- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index 5e49e5e0f07c..c262933aaf74 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -102,6 +102,12 @@ RUN [ $(uname -m) = "x86_64" ] \ && pip install "openmetadata-ingestion[db2]~=${RI_VERSION}" \ || echo "DB2 not supported on ARM architectures." +# Ship py-spy so a hung worker can be sampled in place +# (`py-spy dump --pid `) without first installing anything in the pod. +# Container-only — kept out of setup.py to avoid forcing a native binary on +# dev laptops / CI / non-container installs. +RUN pip install "py-spy>=0.3.14" + # bump python-daemon for https://github.com/apache/airflow/pull/29916 RUN pip install "python-daemon>=3.0.0" # remove all airflow providers except for docker, cncf kubernetes, and standard (required in Airflow 3.x) diff --git a/ingestion/setup.py b/ingestion/setup.py index 4c2a138a0b08..dfad58c4e707 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -154,9 +154,6 @@ "kubernetes>=21.0.0", # Kubernetes client for secrets manager "memory-profiler", "mypy_extensions>=0.4.3", - # Ships in every container so a hung worker can be sampled in place - # (`py-spy dump --pid `) without first installing anything. - "py-spy>=0.3.14", VERSIONS["pydantic"], VERSIONS["pydantic-settings"], VERSIONS["pymysql"], diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index e839bf606e20..7efed861c5f9 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -205,10 +205,10 @@ def get_connection(self) -> Engine: if keep_alive := self._get_client_session_keep_alive(): connection.connectionArguments.root["client_session_keep_alive"] = keep_alive - # Safe defaults: prevent indefinite hangs when the Snowflake socket is - # silently severed (NAT/LB idle reaping in K8s/hybrid runners). Any - # value the user supplied via connectionArguments wins via setdefault. - connection.connectionArguments.root.setdefault("client_session_keep_alive", True) + # Bound the Snowflake socket so a silently-severed TCP connection + # (NAT/LB idle reaping in K8s/hybrid runners) surfaces as a network + # error within 10 minutes instead of hanging the worker indefinitely. + # User-supplied connectionArguments win via setdefault. connection.connectionArguments.root.setdefault("network_timeout", 600) engine = create_generic_db_connection( From 6ef0f906839f70ab50391ac1aec24e6d8f8b56f9 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 14 May 2026 20:18:24 -0700 Subject: [PATCH 3/4] fix py check --- ingestion/src/metadata/ingestion/ometa/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index c4ce1bc29db1..d0988d3ceddf 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -15,7 +15,7 @@ import time import traceback from datetime import datetime, timezone -from typing import Any, Callable, Dict, List, Optional, Tuple, Union # noqa: UP035 +from typing import Any, Callable, Dict, List, Optional, Union # noqa: UP035 import requests from requests.exceptions import HTTPError, JSONDecodeError @@ -121,7 +121,7 @@ class ClientConfig(ConfigModel): # (connect, read) seconds. Default prevents indefinite hangs when a pooled # socket is silently severed (NAT/LB idle reaping). Override with None to # disable, or pass a single int to use the same value for both. - timeout: Optional[Union[int, Tuple[int, int]]] = (10, 300) # noqa: UP045 + timeout: Optional[int | tuple[int, int]] = (10, 300) # noqa: UP045 cert: Optional[Union[str, tuple]] = None # noqa: UP007, UP045 From b3635e7f1fbb1697e5a530d3ad7a20a1124b93e4 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 15 May 2026 07:32:53 +0200 Subject: [PATCH 4/4] fix(ingestion): guard connectionArguments.root before setdefault to fix basedpyright static-check The new network_timeout line called .setdefault() directly on connection.connectionArguments.root, which is statically Optional[Dict] (ConnectionArguments RootModel defaults to root=None). basedpyright flags this as reportOptionalMemberAccess (configured as "error"), a new violation not in the baseline, failing CI's --baselinemode=discard. Guard with 'is not None' (matching the existing pattern at line 219). Uses 'is not None' rather than truthiness because init_empty_connection_arguments() yields an empty dict which is falsy, so a truthiness check would skip setting the default in the common no-user-args case. Co-Authored-By: Claude Opus 4.7 --- .../metadata/ingestion/source/database/snowflake/connection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index 7efed861c5f9..f8510c2a939b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -209,7 +209,8 @@ def get_connection(self) -> Engine: # (NAT/LB idle reaping in K8s/hybrid runners) surfaces as a network # error within 10 minutes instead of hanging the worker indefinitely. # User-supplied connectionArguments win via setdefault. - connection.connectionArguments.root.setdefault("network_timeout", 600) + if connection.connectionArguments.root is not None: + connection.connectionArguments.root.setdefault("network_timeout", 600) engine = create_generic_db_connection( connection=connection,