diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index 5e49e5e0f07c..c262933aaf74 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -102,6 +102,12 @@ RUN [ $(uname -m) = "x86_64" ] \ && pip install "openmetadata-ingestion[db2]~=${RI_VERSION}" \ || echo "DB2 not supported on ARM architectures." +# Ship py-spy so a hung worker can be sampled in place +# (`py-spy dump --pid `) without first installing anything in the pod. +# Container-only — kept out of setup.py to avoid forcing a native binary on +# dev laptops / CI / non-container installs. +RUN pip install "py-spy>=0.3.14" + # bump python-daemon for https://github.com/apache/airflow/pull/29916 RUN pip install "python-daemon>=3.0.0" # remove all airflow providers except for docker, cncf kubernetes, and standard (required in Airflow 3.x) diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index de64869ea4ba..d0988d3ceddf 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -118,7 +118,10 @@ class ClientConfig(ConfigModel): verify: Optional[Union[bool, str]] = None # noqa: UP007, UP045 cookies: Optional[Any] = None # noqa: UP045 ttl_cache: int = 60 - timeout: Optional[int] = None # noqa: UP045 + # (connect, read) seconds. Default prevents indefinite hangs when a pooled + # socket is silently severed (NAT/LB idle reaping). Override with None to + # disable, or pass a single int to use the same value for both. + timeout: Optional[int | tuple[int, int]] = (10, 300) # noqa: UP045 cert: Optional[Union[str, tuple]] = None # noqa: UP007, UP045 diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py index 0047c3a6a5d9..f8510c2a939b 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/connection.py @@ -205,6 +205,13 @@ def get_connection(self) -> Engine: if keep_alive := self._get_client_session_keep_alive(): connection.connectionArguments.root["client_session_keep_alive"] = keep_alive + # Bound the Snowflake socket so a silently-severed TCP connection + # (NAT/LB idle reaping in K8s/hybrid runners) surfaces as a network + # error within 10 minutes instead of hanging the worker indefinitely. + # User-supplied connectionArguments win via setdefault. + if connection.connectionArguments.root is not None: + connection.connectionArguments.root.setdefault("network_timeout", 600) + engine = create_generic_db_connection( connection=connection, get_connection_url_fn=self.get_connection_url,