diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index f2addf6f9d53..2f2d3ac432e0 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1810,7 +1810,7 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--job_server_timeout', '--job-server-timeout', # For backwards compatibility. - default=300, + default=600, type=int, help=( 'Job service request timeout in seconds. The timeout ' diff --git a/sdks/python/apache_beam/runners/portability/job_server.py b/sdks/python/apache_beam/runners/portability/job_server.py index 9fdaabd1a177..a08caa060549 100644 --- a/sdks/python/apache_beam/runners/portability/job_server.py +++ b/sdks/python/apache_beam/runners/portability/job_server.py @@ -88,7 +88,11 @@ def start(self): self._endpoint = self._job_server.start() self._started = True atexit.register(self.stop) - signal.signal(signal.SIGINT, self._sigint_handler) + try: + signal.signal(signal.SIGINT, self._sigint_handler) + except Exception as e: + logging.warning("Unable to install signal handler for SIGINT: %s", e) + pass return self._endpoint def stop(self): diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index ff1a0d9c46aa..5581b48c9700 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -85,15 +85,16 @@ def _next_id(self): def register(self): owner = self._next_id() - self._live_owners.add(owner) + with self._lock: + self._live_owners.add(owner) return owner def purge(self, owner): - if owner not in self._live_owners: - raise ValueError(f"{owner} not in {self._live_owners}") - self._live_owners.remove(owner) to_delete = [] with self._lock: + if owner not in self._live_owners: + raise ValueError(f"{owner} not in {self._live_owners}") + self._live_owners.remove(owner) for key, entry in list(self._cache.items()): if owner in entry.owners: entry.owners.remove(owner) @@ -105,9 +106,9 @@ def purge(self, owner): self._destructor(value) def get(self, *key): - if not self._live_owners: - raise RuntimeError("At least one owner must be registered.") with self._lock: + if not self._live_owners: + raise RuntimeError("At least one owner must be registered.") if key not in self._cache: self._cache[key] = _SharedCacheEntry(self._constructor(*key), set()) for owner in self._live_owners: @@ -439,7 +440,7 @@ def path_to_beam_jar( def _download_jar_to_cache( cls, download_url, cached_jar_path, user_agent=None): """Downloads a jar from the given URL to the specified cache path. - + Args: download_url (str): The URL to download from. cached_jar_path (str): The local path where the jar should be cached. diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 176c84c9966b..15f9e0317450 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -38,6 +38,7 @@ # It is recommended to import setuptools prior to importing distutils to avoid # using legacy behavior from distutils. # https://setuptools.readthedocs.io/en/latest/history.html#v48-0-0 +# Add something here to retrigger tests. from distutils.errors import DistutilsError # isort:skip