From 1da80bd7c95375312270b5d94373b90f15e24831 Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Thu, 29 Jan 2026 17:47:28 -0800 Subject: [PATCH 1/4] Add support for dp-spark-pip magic to allow synchronous installation of packages in spark --- google/cloud/dataproc_spark_connect/magics.py | 79 ++++++++++++ tests/unit/test_magics.py | 122 ++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 google/cloud/dataproc_spark_connect/magics.py create mode 100644 tests/unit/test_magics.py diff --git a/google/cloud/dataproc_spark_connect/magics.py b/google/cloud/dataproc_spark_connect/magics.py new file mode 100644 index 0000000..31886ec --- /dev/null +++ b/google/cloud/dataproc_spark_connect/magics.py @@ -0,0 +1,79 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dataproc magic implementations.""" + +import shlex +from IPython.core.magic import (Magics, magics_class, line_magic) +from pyspark.sql import SparkSession +from google.cloud.dataproc_spark_connect import DataprocSparkSession + + +@magics_class +class DataprocMagics(Magics): + + def __init__( + self, + shell, + **kwargs, + ): + super().__init__(shell, **kwargs) + + def _parse_command(self, args): + if not args or args[0] != "install": + print("Usage: %dp_spark_pip install ...") + return + + # filter out 'install' and the flags (not currently supported) + packages = [pkg for pkg in args[1:] if not pkg.startswith("-")] + return packages + + @line_magic + def dp_spark_pip(self, line): + """ + Custom magic to install pip packages as Spark Connect artifacts. + Usage: %dp_spark_pip install pandas numpy + """ + try: + packages = self._parse_command(shlex.split(line)) + + if not packages: + print("No packages specified.") + return + + sessions = [ + obj + for obj in self.shell.user_ns.values() + if isinstance(obj, DataprocSparkSession) + ] + + if not sessions: + print( + "No active Spark Sessions found. Please create one first." + ) + return + + print("Installing packages: %s", packages) + for session in sessions: + for package in packages: + session.addArtifacts(package, pypi=True) + + print("Packages successfully added as artifacts.") + except Exception as e: + print(f"Failed to add artifacts: {e}") + + +# To register the magic +def load_ipython_extension(ipython): + ipython.register_magics(DataprocMagics) diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py new file mode 100644 index 0000000..79aa190 --- /dev/null +++ b/tests/unit/test_magics.py @@ -0,0 +1,122 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import unittest +from contextlib import redirect_stdout +from unittest import mock + +from google.cloud.dataproc_spark_connect import DataprocSparkSession +from google.cloud.dataproc_spark_connect.magics import DataprocMagics +from IPython.core.interactiveshell import InteractiveShell +from traitlets.config import Config + + +class DataprocMagicsTest(unittest.TestCase): + + def setUp(self): + self.shell = mock.create_autospec(InteractiveShell, instance=True) + self.shell.user_ns = {} + self.shell.config = Config() + self.magics = DataprocMagics(shell=self.shell) + + def test_parse_command_valid(self): + packages = self.magics._parse_command(["install", "pandas", "numpy"]) + self.assertEqual(packages, ["pandas", "numpy"]) + + def test_parse_command_with_flags(self): + packages = self.magics._parse_command( + ["install", "-U", "pandas", "--upgrade", "numpy"] + ) + self.assertEqual(packages, ["pandas", "numpy"]) + + def test_parse_command_no_install(self): + packages = self.magics._parse_command(["other", "pandas"]) + self.assertIsNone(packages) + + def test_dp_spark_pip_invalid_command(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("foo bar") + output = f.getvalue() + self.assertIn("Usage: %dp_spark_pip install", output) + self.assertIn("No packages specified", output) + + def test_dp_spark_pip_no_session(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + self.assertIn("No active Spark Sessions found", f.getvalue()) + + def test_dp_spark_pip_no_packages_specified(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install") + self.assertIn("No packages specified", f.getvalue()) + + def test_dp_spark_pip_install_packages_single_session(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas numpy") + + mock_session.addArtifacts.assert_has_calls([ + mock.call("pandas", pypi=True), + mock.call("numpy", pypi=True), + ]) + self.assertEqual(mock_session.addArtifacts.call_count, 2) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + + def test_dp_spark_pip_install_packages_multiple_sessions(self): + mock_session1 = mock.Mock(spec=DataprocSparkSession) + mock_session2 = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark1"] = mock_session1 + self.shell.user_ns["spark2"] = mock_session2 + self.shell.user_ns["not_a_session"] = 5 + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + + mock_session1.addArtifacts.assert_called_once_with("pandas", pypi=True) + mock_session2.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + + def test_dp_spark_pip_add_artifacts_fails(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + mock_session.addArtifacts.side_effect = Exception("Failed") + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + + mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Failed to add artifacts: Failed", f.getvalue()) + + def test_dp_spark_pip_with_flags(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install -U pandas") + + mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + +if __name__ == "__main__": + unittest.main() From b41533fadba8e45d038b1fe98c49951c70f9da48 Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Wed, 18 Feb 2026 12:41:15 -0800 Subject: [PATCH 2/4] Add support for %dpip magic to install packages on Spark session --- google/cloud/dataproc_magics/__init__.py | 19 ++ .../magics.py | 12 +- tests/integration/dataproc_magics/__init__.py | 0 .../dataproc_magics/test_magics.py | 198 ++++++++++++++++++ tests/unit/dataproc_magics/__init__.py | 0 .../unit/{ => dataproc_magics}/test_magics.py | 43 ++-- 6 files changed, 243 insertions(+), 29 deletions(-) create mode 100644 google/cloud/dataproc_magics/__init__.py rename google/cloud/{dataproc_spark_connect => dataproc_magics}/magics.py (87%) create mode 100644 tests/integration/dataproc_magics/__init__.py create mode 100644 tests/integration/dataproc_magics/test_magics.py create mode 100644 tests/unit/dataproc_magics/__init__.py rename tests/unit/{ => dataproc_magics}/test_magics.py (78%) diff --git a/google/cloud/dataproc_magics/__init__.py b/google/cloud/dataproc_magics/__init__.py new file mode 100644 index 0000000..a348eb8 --- /dev/null +++ b/google/cloud/dataproc_magics/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .magics import DataprocMagics + + +def load_ipython_extension(ipython): + ipython.register_magics(DataprocMagics) diff --git a/google/cloud/dataproc_spark_connect/magics.py b/google/cloud/dataproc_magics/magics.py similarity index 87% rename from google/cloud/dataproc_spark_connect/magics.py rename to google/cloud/dataproc_magics/magics.py index 31886ec..579e9e3 100644 --- a/google/cloud/dataproc_spark_connect/magics.py +++ b/google/cloud/dataproc_magics/magics.py @@ -16,7 +16,6 @@ import shlex from IPython.core.magic import (Magics, magics_class, line_magic) -from pyspark.sql import SparkSession from google.cloud.dataproc_spark_connect import DataprocSparkSession @@ -32,7 +31,7 @@ def __init__( def _parse_command(self, args): if not args or args[0] != "install": - print("Usage: %dp_spark_pip install ...") + print("Usage: %dpip install ...") return # filter out 'install' and the flags (not currently supported) @@ -40,10 +39,10 @@ def _parse_command(self, args): return packages @line_magic - def dp_spark_pip(self, line): + def dpip(self, line): """ Custom magic to install pip packages as Spark Connect artifacts. - Usage: %dp_spark_pip install pandas numpy + Usage: %dpip install pandas numpy """ try: packages = self._parse_command(shlex.split(line)) @@ -72,8 +71,3 @@ def dp_spark_pip(self, line): print("Packages successfully added as artifacts.") except Exception as e: print(f"Failed to add artifacts: {e}") - - -# To register the magic -def load_ipython_extension(ipython): - ipython.register_magics(DataprocMagics) diff --git a/tests/integration/dataproc_magics/__init__.py b/tests/integration/dataproc_magics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/dataproc_magics/test_magics.py b/tests/integration/dataproc_magics/test_magics.py new file mode 100644 index 0000000..81b83ff --- /dev/null +++ b/tests/integration/dataproc_magics/test_magics.py @@ -0,0 +1,198 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import pytest +import certifi +from unittest import mock + +from google.cloud.dataproc_spark_connect import DataprocSparkSession + + +_SERVICE_ACCOUNT_KEY_FILE_ = "service_account_key.json" + + +@pytest.fixture(params=[None, "3.0"]) +def image_version(request): + return request.param + + +@pytest.fixture +def test_project(): + return os.getenv("GOOGLE_CLOUD_PROJECT") + + +@pytest.fixture +def test_region(): + return os.getenv("GOOGLE_CLOUD_REGION") + + +def is_ci_environment(): + """Detect if running in CI environment.""" + return os.getenv("CI") == "true" or os.getenv("GITHUB_ACTIONS") == "true" + + +@pytest.fixture +def auth_type(request): + """Auto-detect authentication type based on environment. + + CI environment (CI=true or GITHUB_ACTIONS=true): Uses SERVICE_ACCOUNT + Local environment: Uses END_USER_CREDENTIALS + Test parametrization can still override this default. + """ + # Allow test parametrization to override + if hasattr(request, "param"): + return request.param + + # Auto-detect based on environment + if is_ci_environment(): + return "SERVICE_ACCOUNT" + else: + return "END_USER_CREDENTIALS" + + +@pytest.fixture +def test_subnet(): + return os.getenv("DATAPROC_SPARK_CONNECT_SUBNET") + + +@pytest.fixture +def test_subnetwork_uri(test_subnet): + # Make DATAPROC_SPARK_CONNECT_SUBNET the full URI to align with how user would specify it in the project + return test_subnet + + +@pytest.fixture +def os_environment(auth_type, image_version, test_project, test_region): + original_environment = dict(os.environ) + if os.path.isfile(_SERVICE_ACCOUNT_KEY_FILE_): + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ( + _SERVICE_ACCOUNT_KEY_FILE_ + ) + os.environ["DATAPROC_SPARK_CONNECT_AUTH_TYPE"] = auth_type + if auth_type == "END_USER_CREDENTIALS": + os.environ.pop("DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT", None) + # Add SSL certificate fix + os.environ["SSL_CERT_FILE"] = certifi.where() + os.environ["REQUESTS_CA_BUNDLE"] = certifi.where() + yield os.environ + os.environ.clear() + os.environ.update(original_environment) + + +@pytest.fixture +def connect_session(test_project, test_region, os_environment): + session = ( + DataprocSparkSession.builder.projectId(test_project) + .location(test_region) + .getOrCreate() + ) + yield session + # Clean up the session after each test to prevent resource conflicts + try: + session.stop() + except Exception: + # Ignore cleanup errors to avoid masking the actual test failure + pass + + +# Tests for magics.py +@pytest.fixture +def ipython_shell(connect_session): + """Provides an IPython shell with a DataprocSparkSession in user_ns.""" + pytest.importorskip("IPython", reason="IPython not available") + try: + from IPython.terminal.interactiveshell import TerminalInteractiveShell + from google.cloud.dataproc_spark_connect import magics + + shell = TerminalInteractiveShell.instance() + shell.user_ns = {"spark": connect_session} + + # Load magics + magics.load_ipython_extension(shell) + + yield shell + finally: + from IPython.terminal.interactiveshell import TerminalInteractiveShell + + TerminalInteractiveShell.clear_instance() + + +def test_dpip_magic_loads(ipython_shell): + """Test that %dpip magic is registered.""" + assert "dpip" in ipython_shell.magics_manager.magics["line"] + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_install_single_package(mock_add_artifacts, ipython_shell, capsys): + """Test installing a single package with %dpip.""" + ipython_shell.run_line_magic("dpip", "install pandas") + mock_add_artifacts.assert_called_once_with("pandas", pypi=True) + captured = capsys.readouterr() + assert "Installing packages: " in captured.out + assert "Packages successfully added as artifacts." in captured.out + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_install_multiple_packages_with_flags( + mock_add_artifacts, ipython_shell, capsys +): + """Test installing multiple packages with flags like -U.""" + ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") + calls = [ + mock.call("numpy", pypi=True), + mock.call("scikit-learn", pypi=True), + ] + mock_add_artifacts.assert_has_calls(calls, any_order=True) + assert mock_add_artifacts.call_count == 2 + captured = capsys.readouterr() + assert "Installing packages: " in captured.out + assert "Packages successfully added as artifacts." in captured.out + + +def test_dpip_no_install_command(ipython_shell, capsys): + """Test usage message when 'install' is missing.""" + ipython_shell.run_line_magic("dpip", "pandas") + captured = capsys.readouterr() + assert "Usage: %dpip install ..." in captured.out + assert "No packages specified." in captured.out + + +def test_dpip_no_packages(ipython_shell, capsys): + """Test message when no packages are specified.""" + ipython_shell.run_line_magic("dpip", "install") + captured = capsys.readouterr() + assert "No packages specified." in captured.out + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_no_session(mock_add_artifacts, ipython_shell, capsys): + """Test message when no Spark session is active.""" + ipython_shell.user_ns = {} # Remove spark session from namespace + ipython_shell.run_line_magic("dpip", "install pandas") + captured = capsys.readouterr() + assert "No active Spark Sessions found." in captured.out + mock_add_artifacts.assert_not_called() + + +@mock.patch.object( + DataprocSparkSession, + "addArtifacts", + side_effect=Exception("Install failed"), +) +def test_dpip_install_failure(mock_add_artifacts, ipython_shell, capsys): + """Test error message on installation failure.""" + ipython_shell.run_line_magic("dpip", "install bad-package") + mock_add_artifacts.assert_called_once_with("bad-package", pypi=True) + captured = capsys.readouterr() + assert "Failed to add artifacts: Install failed" in captured.out diff --git a/tests/unit/dataproc_magics/__init__.py b/tests/unit/dataproc_magics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_magics.py b/tests/unit/dataproc_magics/test_magics.py similarity index 78% rename from tests/unit/test_magics.py rename to tests/unit/dataproc_magics/test_magics.py index 79aa190..b2382ff 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/dataproc_magics/test_magics.py @@ -18,7 +18,7 @@ from unittest import mock from google.cloud.dataproc_spark_connect import DataprocSparkSession -from google.cloud.dataproc_spark_connect.magics import DataprocMagics +from google.cloud.dataproc_magics import DataprocMagics from IPython.core.interactiveshell import InteractiveShell from traitlets.config import Config @@ -45,42 +45,44 @@ def test_parse_command_no_install(self): packages = self.magics._parse_command(["other", "pandas"]) self.assertIsNone(packages) - def test_dp_spark_pip_invalid_command(self): + def test_dpip_invalid_command(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("foo bar") + self.magics.dpip("foo bar") output = f.getvalue() - self.assertIn("Usage: %dp_spark_pip install", output) + self.assertIn("Usage: %dpip install", output) self.assertIn("No packages specified", output) - def test_dp_spark_pip_no_session(self): + def test_dpip_no_session(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") self.assertIn("No active Spark Sessions found", f.getvalue()) - def test_dp_spark_pip_no_packages_specified(self): + def test_dpip_no_packages_specified(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install") + self.magics.dpip("install") self.assertIn("No packages specified", f.getvalue()) - def test_dp_spark_pip_install_packages_single_session(self): + def test_dpip_install_packages_single_session(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas numpy") + self.magics.dpip("install pandas numpy") - mock_session.addArtifacts.assert_has_calls([ - mock.call("pandas", pypi=True), - mock.call("numpy", pypi=True), - ]) + mock_session.addArtifacts.assert_has_calls( + [ + mock.call("pandas", pypi=True), + mock.call("numpy", pypi=True), + ] + ) self.assertEqual(mock_session.addArtifacts.call_count, 2) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) - def test_dp_spark_pip_install_packages_multiple_sessions(self): + def test_dpip_install_packages_multiple_sessions(self): mock_session1 = mock.Mock(spec=DataprocSparkSession) mock_session2 = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark1"] = mock_session1 @@ -89,34 +91,35 @@ def test_dp_spark_pip_install_packages_multiple_sessions(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") mock_session1.addArtifacts.assert_called_once_with("pandas", pypi=True) mock_session2.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) - def test_dp_spark_pip_add_artifacts_fails(self): + def test_dpip_add_artifacts_fails(self): mock_session = mock.Mock(spec=DataprocSparkSession) mock_session.addArtifacts.side_effect = Exception("Failed") self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Failed to add artifacts: Failed", f.getvalue()) - def test_dp_spark_pip_with_flags(self): + def test_dpip_with_flags(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install -U pandas") + self.magics.dpip("install -U pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + if __name__ == "__main__": unittest.main() From 83bc885af0904a08bf9853978c70aa2a7e08b0fe Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Thu, 26 Feb 2026 16:57:58 -0800 Subject: [PATCH 3/4] Add support for %dpip magic to install packages on Spark session --- google/cloud/dataproc_magics/magics.py | 48 +++++----- .../dataproc_magics/test_magics.py | 92 ++++++++++--------- tests/unit/dataproc_magics/test_magics.py | 86 +++++++---------- 3 files changed, 113 insertions(+), 113 deletions(-) diff --git a/google/cloud/dataproc_magics/magics.py b/google/cloud/dataproc_magics/magics.py index 579e9e3..ef255ed 100644 --- a/google/cloud/dataproc_magics/magics.py +++ b/google/cloud/dataproc_magics/magics.py @@ -29,15 +29,6 @@ def __init__( ): super().__init__(shell, **kwargs) - def _parse_command(self, args): - if not args or args[0] != "install": - print("Usage: %dpip install ...") - return - - # filter out 'install' and the flags (not currently supported) - packages = [pkg for pkg in args[1:] if not pkg.startswith("-")] - return packages - @line_magic def dpip(self, line): """ @@ -45,29 +36,44 @@ def dpip(self, line): Usage: %dpip install pandas numpy """ try: - packages = self._parse_command(shlex.split(line)) + args = shlex.split(line) + + if not args or args[0] != "install": + print("Usage: %dpip install ...") + return + + packages = args[1:] # remove `install` if not packages: - print("No packages specified.") + print("Error: No packages specified.") + return + + if any(pkg.startswith("-") for pkg in packages): + print("Error: Flags are not currently supported.") return sessions = [ - obj - for obj in self.shell.user_ns.values() - if isinstance(obj, DataprocSparkSession) + (key, value) + for key, value in self.shell.user_ns.items() + if isinstance(value, DataprocSparkSession) ] if not sessions: print( - "No active Spark Sessions found. Please create one first." + "No active Dataproc Spark Session found. Please create one first." + ) + return + if len(sessions) > 1: + print( + "Error: Found more than one active Dataproc Spark Sessions." ) return - print("Installing packages: %s", packages) - for session in sessions: - for package in packages: - session.addArtifacts(package, pypi=True) + ((name, session),) = sessions + print(f"Active session found: {name}") + print(f"Installing packages: {packages}") + session.addArtifacts(*packages, pypi=True) - print("Packages successfully added as artifacts.") + print("Finished installing packages.") except Exception as e: - print(f"Failed to add artifacts: {e}") + print(f"Failed to install packages: {e}") diff --git a/tests/integration/dataproc_magics/test_magics.py b/tests/integration/dataproc_magics/test_magics.py index 81b83ff..1f9cde5 100644 --- a/tests/integration/dataproc_magics/test_magics.py +++ b/tests/integration/dataproc_magics/test_magics.py @@ -68,7 +68,8 @@ def test_subnet(): @pytest.fixture def test_subnetwork_uri(test_subnet): - # Make DATAPROC_SPARK_CONNECT_SUBNET the full URI to align with how user would specify it in the project + # Make DATAPROC_SPARK_CONNECT_SUBNET the full URI + # to align with how user would specify it in the project return test_subnet @@ -106,20 +107,18 @@ def connect_session(test_project, test_region, os_environment): pass -# Tests for magics.py @pytest.fixture def ipython_shell(connect_session): """Provides an IPython shell with a DataprocSparkSession in user_ns.""" - pytest.importorskip("IPython", reason="IPython not available") try: from IPython.terminal.interactiveshell import TerminalInteractiveShell - from google.cloud.dataproc_spark_connect import magics + from google.cloud import dataproc_magics shell = TerminalInteractiveShell.instance() shell.user_ns = {"spark": connect_session} # Load magics - magics.load_ipython_extension(shell) + dataproc_magics.load_ipython_extension(shell) yield shell finally: @@ -128,36 +127,37 @@ def ipython_shell(connect_session): TerminalInteractiveShell.clear_instance() +# Tests for magics.py def test_dpip_magic_loads(ipython_shell): """Test that %dpip magic is registered.""" assert "dpip" in ipython_shell.magics_manager.magics["line"] -@mock.patch.object(DataprocSparkSession, "addArtifacts") -def test_dpip_install_single_package(mock_add_artifacts, ipython_shell, capsys): +def test_dpip_install_success(connect_session, ipython_shell, capsys): """Test installing a single package with %dpip.""" - ipython_shell.run_line_magic("dpip", "install pandas") - mock_add_artifacts.assert_called_once_with("pandas", pypi=True) + ipython_shell.run_line_magic("dpip", "install roman numpy") captured = capsys.readouterr() - assert "Installing packages: " in captured.out - assert "Packages successfully added as artifacts." in captured.out + assert "Active session found:" in captured.out + assert "Installing packages:" in captured.out + assert "Finished installing packages." in captured.out + from pyspark.sql.connect.functions import udf + from pyspark.sql.types import StringType -@mock.patch.object(DataprocSparkSession, "addArtifacts") -def test_dpip_install_multiple_packages_with_flags( - mock_add_artifacts, ipython_shell, capsys -): - """Test installing multiple packages with flags like -U.""" - ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") - calls = [ - mock.call("numpy", pypi=True), - mock.call("scikit-learn", pypi=True), - ] - mock_add_artifacts.assert_has_calls(calls, any_order=True) - assert mock_add_artifacts.call_count == 2 - captured = capsys.readouterr() - assert "Installing packages: " in captured.out - assert "Packages successfully added as artifacts." in captured.out + df = connect_session.createDataFrame([(1666,)], ["number"]) + + def to_roman(number): + import roman + + return roman.toRoman(number) + + df_result = df.withColumn( + "roman", udf(to_roman, StringType())("number") + ).collect() + + assert df_result[0]["roman"] == "MDCLXVI" + + connect_session.stop() def test_dpip_no_install_command(ipython_shell, capsys): @@ -165,34 +165,44 @@ def test_dpip_no_install_command(ipython_shell, capsys): ipython_shell.run_line_magic("dpip", "pandas") captured = capsys.readouterr() assert "Usage: %dpip install ..." in captured.out - assert "No packages specified." in captured.out def test_dpip_no_packages(ipython_shell, capsys): """Test message when no packages are specified.""" ipython_shell.run_line_magic("dpip", "install") captured = capsys.readouterr() - assert "No packages specified." in captured.out + assert "Error: No packages specified." in captured.out -@mock.patch.object(DataprocSparkSession, "addArtifacts") -def test_dpip_no_session(mock_add_artifacts, ipython_shell, capsys): +def test_dpip_with_flags(ipython_shell, capsys): + """Test installing multiple packages with flags like -U.""" + ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") + captured = capsys.readouterr() + assert "Error: Flags are not currently supported." in captured.out + + +def test_dpip_no_session(ipython_shell, capsys): """Test message when no Spark session is active.""" ipython_shell.user_ns = {} # Remove spark session from namespace ipython_shell.run_line_magic("dpip", "install pandas") captured = capsys.readouterr() - assert "No active Spark Sessions found." in captured.out - mock_add_artifacts.assert_not_called() + assert "No active Dataproc Spark Session found." in captured.out -@mock.patch.object( - DataprocSparkSession, - "addArtifacts", - side_effect=Exception("Install failed"), -) -def test_dpip_install_failure(mock_add_artifacts, ipython_shell, capsys): +def test_dpip_install_failure(ipython_shell, capsys): """Test error message on installation failure.""" - ipython_shell.run_line_magic("dpip", "install bad-package") - mock_add_artifacts.assert_called_once_with("bad-package", pypi=True) + ipython_shell.run_line_magic("dpip", "install dp-non-existent-package") captured = capsys.readouterr() - assert "Failed to add artifacts: Install failed" in captured.out + assert "No matching distribution found" in captured.out + + +def test_dpip_multiple_sessions(ipython_shell, connect_session, capsys): + """Test error message when multiple Spark sessions found.""" + ipython_shell.user_ns["sparksession"] = connect_session + ipython_shell.user_ns["sparkanother"] = connect_session + ipython_shell.run_line_magic("dpip", "install pandas") + captured = capsys.readouterr() + assert ( + "Error: Found more than one active Dataproc Spark Sessions." + in captured.out + ) diff --git a/tests/unit/dataproc_magics/test_magics.py b/tests/unit/dataproc_magics/test_magics.py index b2382ff..ecf380d 100644 --- a/tests/unit/dataproc_magics/test_magics.py +++ b/tests/unit/dataproc_magics/test_magics.py @@ -31,41 +31,54 @@ def setUp(self): self.shell.config = Config() self.magics = DataprocMagics(shell=self.shell) - def test_parse_command_valid(self): - packages = self.magics._parse_command(["install", "pandas", "numpy"]) - self.assertEqual(packages, ["pandas", "numpy"]) + def test_dpip_with_flags(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dpip("install --upgrade numpy") + self.assertIn("Error: Flags are not currently supported.", f.getvalue()) - def test_parse_command_with_flags(self): - packages = self.magics._parse_command( - ["install", "-U", "pandas", "--upgrade", "numpy"] + def test_dpip_no_install(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dpip("pandas numpy") + self.assertIn( + "Usage: %dpip install ...", f.getvalue() ) - self.assertEqual(packages, ["pandas", "numpy"]) - - def test_parse_command_no_install(self): - packages = self.magics._parse_command(["other", "pandas"]) - self.assertIsNone(packages) def test_dpip_invalid_command(self): f = io.StringIO() with redirect_stdout(f): self.magics.dpip("foo bar") - output = f.getvalue() - self.assertIn("Usage: %dpip install", output) - self.assertIn("No packages specified", output) + self.assertIn( + "Usage: %dpip install ...", f.getvalue() + ) def test_dpip_no_session(self): f = io.StringIO() with redirect_stdout(f): self.magics.dpip("install pandas") - self.assertIn("No active Spark Sessions found", f.getvalue()) + self.assertIn("No active Dataproc Spark Session found", f.getvalue()) + + def test_dpip_multiple_sessions(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark1"] = mock_session + self.shell.user_ns["spark2"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dpip("install pandas") + self.assertIn( + "Error: Found more than one active Dataproc Spark Sessions", + f.getvalue(), + ) def test_dpip_no_packages_specified(self): f = io.StringIO() with redirect_stdout(f): self.magics.dpip("install") - self.assertIn("No packages specified", f.getvalue()) + self.assertIn("Error: No packages specified", f.getvalue()) - def test_dpip_install_packages_single_session(self): + def test_dpip_install_packages_success(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark"] = mock_session @@ -73,29 +86,11 @@ def test_dpip_install_packages_single_session(self): with redirect_stdout(f): self.magics.dpip("install pandas numpy") - mock_session.addArtifacts.assert_has_calls( - [ - mock.call("pandas", pypi=True), - mock.call("numpy", pypi=True), - ] + mock_session.addArtifacts.assert_called_once_with( + "pandas", "numpy", pypi=True ) - self.assertEqual(mock_session.addArtifacts.call_count, 2) - self.assertIn("Packages successfully added as artifacts.", f.getvalue()) - - def test_dpip_install_packages_multiple_sessions(self): - mock_session1 = mock.Mock(spec=DataprocSparkSession) - mock_session2 = mock.Mock(spec=DataprocSparkSession) - self.shell.user_ns["spark1"] = mock_session1 - self.shell.user_ns["spark2"] = mock_session2 - self.shell.user_ns["not_a_session"] = 5 - - f = io.StringIO() - with redirect_stdout(f): - self.magics.dpip("install pandas") - - mock_session1.addArtifacts.assert_called_once_with("pandas", pypi=True) - mock_session2.addArtifacts.assert_called_once_with("pandas", pypi=True) - self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + self.assertEqual(mock_session.addArtifacts.call_count, 1) + self.assertIn("Finished installing packages.", f.getvalue()) def test_dpip_add_artifacts_fails(self): mock_session = mock.Mock(spec=DataprocSparkSession) @@ -107,18 +102,7 @@ def test_dpip_add_artifacts_fails(self): self.magics.dpip("install pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) - self.assertIn("Failed to add artifacts: Failed", f.getvalue()) - - def test_dpip_with_flags(self): - mock_session = mock.Mock(spec=DataprocSparkSession) - self.shell.user_ns["spark"] = mock_session - - f = io.StringIO() - with redirect_stdout(f): - self.magics.dpip("install -U pandas") - - mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) - self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + self.assertIn("Failed to install packages: Failed", f.getvalue()) if __name__ == "__main__": From 43e9c60a3e630edd805e30ddc20b1fe722eebdae Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Thu, 26 Feb 2026 16:57:58 -0800 Subject: [PATCH 4/4] Add support for %dpip magic to install packages on Spark session --- google/cloud/dataproc_magics/magics.py | 21 +++---- .../dataproc_magics/test_magics.py | 59 ++++++++++--------- tests/unit/dataproc_magics/test_magics.py | 52 ++++++++-------- 3 files changed, 63 insertions(+), 69 deletions(-) diff --git a/google/cloud/dataproc_magics/magics.py b/google/cloud/dataproc_magics/magics.py index ef255ed..278cc81 100644 --- a/google/cloud/dataproc_magics/magics.py +++ b/google/cloud/dataproc_magics/magics.py @@ -39,18 +39,17 @@ def dpip(self, line): args = shlex.split(line) if not args or args[0] != "install": - print("Usage: %dpip install ...") - return + raise RuntimeError( + "Usage: %dpip install ..." + ) packages = args[1:] # remove `install` if not packages: - print("Error: No packages specified.") - return + raise RuntimeError("Error: No packages specified.") if any(pkg.startswith("-") for pkg in packages): - print("Error: Flags are not currently supported.") - return + raise RuntimeError("Error: Flags are not currently supported.") sessions = [ (key, value) @@ -59,15 +58,13 @@ def dpip(self, line): ] if not sessions: - print( - "No active Dataproc Spark Session found. Please create one first." + raise RuntimeError( + "Error: No active Dataproc Spark Session found. Please create one first." ) - return if len(sessions) > 1: - print( + raise RuntimeError( "Error: Found more than one active Dataproc Spark Sessions." ) - return ((name, session),) = sessions print(f"Active session found: {name}") @@ -76,4 +73,4 @@ def dpip(self, line): print("Finished installing packages.") except Exception as e: - print(f"Failed to install packages: {e}") + raise RuntimeError(f"Failed to install packages: {e}") from e diff --git a/tests/integration/dataproc_magics/test_magics.py b/tests/integration/dataproc_magics/test_magics.py index 1f9cde5..67a0976 100644 --- a/tests/integration/dataproc_magics/test_magics.py +++ b/tests/integration/dataproc_magics/test_magics.py @@ -4,7 +4,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -160,49 +160,52 @@ def to_roman(number): connect_session.stop() -def test_dpip_no_install_command(ipython_shell, capsys): +def test_dpip_no_install_command(ipython_shell): """Test usage message when 'install' is missing.""" - ipython_shell.run_line_magic("dpip", "pandas") - captured = capsys.readouterr() - assert "Usage: %dpip install ..." in captured.out + with pytest.raises( + RuntimeError, match="Usage: %dpip install ..." + ): + ipython_shell.run_line_magic("dpip", "pandas") -def test_dpip_no_packages(ipython_shell, capsys): +def test_dpip_no_packages(ipython_shell): """Test message when no packages are specified.""" - ipython_shell.run_line_magic("dpip", "install") - captured = capsys.readouterr() - assert "Error: No packages specified." in captured.out + with pytest.raises(RuntimeError, match="Error: No packages specified."): + ipython_shell.run_line_magic("dpip", "install") -def test_dpip_with_flags(ipython_shell, capsys): +def test_dpip_with_flags(ipython_shell): """Test installing multiple packages with flags like -U.""" - ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") - captured = capsys.readouterr() - assert "Error: Flags are not currently supported." in captured.out + with pytest.raises( + RuntimeError, match="Error: Flags are not currently supported." + ): + ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") -def test_dpip_no_session(ipython_shell, capsys): +def test_dpip_no_session(ipython_shell): """Test message when no Spark session is active.""" ipython_shell.user_ns = {} # Remove spark session from namespace - ipython_shell.run_line_magic("dpip", "install pandas") - captured = capsys.readouterr() - assert "No active Dataproc Spark Session found." in captured.out + with pytest.raises( + RuntimeError, match="No active Dataproc Spark Session found." + ): + ipython_shell.run_line_magic("dpip", "install pandas") -def test_dpip_install_failure(ipython_shell, capsys): +def test_dpip_install_failure(ipython_shell): """Test error message on installation failure.""" - ipython_shell.run_line_magic("dpip", "install dp-non-existent-package") - captured = capsys.readouterr() - assert "No matching distribution found" in captured.out + with pytest.raises( + RuntimeError, + match="No matching distribution found", + ): + ipython_shell.run_line_magic("dpip", "install dp-non-existent-package") -def test_dpip_multiple_sessions(ipython_shell, connect_session, capsys): +def test_dpip_multiple_sessions(ipython_shell, connect_session): """Test error message when multiple Spark sessions found.""" ipython_shell.user_ns["sparksession"] = connect_session ipython_shell.user_ns["sparkanother"] = connect_session - ipython_shell.run_line_magic("dpip", "install pandas") - captured = capsys.readouterr() - assert ( - "Error: Found more than one active Dataproc Spark Sessions." - in captured.out - ) + with pytest.raises( + RuntimeError, + match="Error: Found more than one active Dataproc Spark Sessions.", + ): + ipython_shell.run_line_magic("dpip", "install pandas") diff --git a/tests/unit/dataproc_magics/test_magics.py b/tests/unit/dataproc_magics/test_magics.py index ecf380d..83d0b3e 100644 --- a/tests/unit/dataproc_magics/test_magics.py +++ b/tests/unit/dataproc_magics/test_magics.py @@ -4,7 +4,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -32,51 +32,45 @@ def setUp(self): self.magics = DataprocMagics(shell=self.shell) def test_dpip_with_flags(self): - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Error: Flags are not currently supported." + ): self.magics.dpip("install --upgrade numpy") - self.assertIn("Error: Flags are not currently supported.", f.getvalue()) def test_dpip_no_install(self): - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Usage: %dpip install ..." + ): self.magics.dpip("pandas numpy") - self.assertIn( - "Usage: %dpip install ...", f.getvalue() - ) def test_dpip_invalid_command(self): - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Usage: %dpip install ..." + ): self.magics.dpip("foo bar") - self.assertIn( - "Usage: %dpip install ...", f.getvalue() - ) def test_dpip_no_session(self): - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Error: No active Dataproc Spark Session found" + ): self.magics.dpip("install pandas") - self.assertIn("No active Dataproc Spark Session found", f.getvalue()) def test_dpip_multiple_sessions(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark1"] = mock_session self.shell.user_ns["spark2"] = mock_session - f = io.StringIO() - with redirect_stdout(f): - self.magics.dpip("install pandas") - self.assertIn( + with self.assertRaisesRegex( + RuntimeError, "Error: Found more than one active Dataproc Spark Sessions", - f.getvalue(), - ) + ): + self.magics.dpip("install pandas") def test_dpip_no_packages_specified(self): - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Error: No packages specified" + ): self.magics.dpip("install") - self.assertIn("Error: No packages specified", f.getvalue()) def test_dpip_install_packages_success(self): mock_session = mock.Mock(spec=DataprocSparkSession) @@ -97,12 +91,12 @@ def test_dpip_add_artifacts_fails(self): mock_session.addArtifacts.side_effect = Exception("Failed") self.shell.user_ns["spark"] = mock_session - f = io.StringIO() - with redirect_stdout(f): + with self.assertRaisesRegex( + RuntimeError, "Failed to install packages: Failed" + ): self.magics.dpip("install pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) - self.assertIn("Failed to install packages: Failed", f.getvalue()) if __name__ == "__main__":