Skip to content

Conversation

@mohamedawnallah
Copy link
Contributor

@mohamedawnallah mohamedawnallah commented Jul 27, 2025

Change Description

Towards #35046.
Depends on #35920.
Next #36729.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Important

Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services.

@mohamedawnallah mohamedawnallah changed the title sdks/python: sink data with Milvus Search I/O connector sdks/python: sink data with Milvus Search I/O Connector Jul 27, 2025
@mohamedawnallah mohamedawnallah changed the title sdks/python: sink data with Milvus Search I/O Connector sdks/python: sink data with Milvus Search I/O connector Jul 27, 2025
@mohamedawnallah
Copy link
Contributor Author

stop reviewer notifications

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment assign set of reviewers

@mohamedawnallah
Copy link
Contributor Author

mohamedawnallah commented Aug 6, 2025

Regards dynamic batching perhaps it is in nice to have category. For now we can safely use the default JDBC I/O connector default batch size (1000)

@mohamedawnallah mohamedawnallah changed the title sdks/python: sink data with Milvus Search I/O connector [1/2] sdks/python: sink data with Milvus Search I/O connector Aug 6, 2025
@mohamedawnallah mohamedawnallah changed the title [1/2] sdks/python: sink data with Milvus Search I/O connector [2/3] sdks/python: sink data with Milvus Search I/O connector Aug 6, 2025
@mohamedawnallah mohamedawnallah changed the title [2/3] sdks/python: sink data with Milvus Search I/O connector [1/3] sdks/python: sink data with Milvus Search I/O connector Aug 6, 2025
@mohamedawnallah mohamedawnallah changed the title [1/3] sdks/python: sink data with Milvus Search I/O connector [1/2] sdks/python: sink data with Milvus Search I/O connector Aug 6, 2025
@mohamedawnallah
Copy link
Contributor Author

Will make this PR in reviewable state after this PR #35467 merged. It refactors and unify some code related to testing from both Milvus Enrichment Handler and Sink I/O

@mohamedawnallah mohamedawnallah marked this pull request as draft October 31, 2025 15:34
@mohamedawnallah
Copy link
Contributor Author

mohamedawnallah commented Nov 2, 2025

The test test_gpu_auto_convert_to_cpu in the CI workflow run beam_PreCommit_Python_ML (Run Python_ML PreCommit 3.9) failed with an error unrelated to this PR's changeset. This appears to be an unconcerning transient timeout error.
CI Run: https://github.com/apache/beam/actions/runs/19015233042/job/54302103752?pr=35708

📋 Failure Logs
[gw5] [ 30%] FAILED apache_beam/ml/inference/pytorch_inference_test.py::PytorchRunInferencePipelineTest::test_gpu_auto_convert_to_cpu 
_________ PytorchRunInferencePipelineTest.test_gpu_auto_convert_to_cpu _________
[gw5] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python

self = <apache_beam.ml.inference.pytorch_inference_test.PytorchRunInferencePipelineTest testMethod=test_gpu_auto_convert_to_cpu>

    def test_gpu_auto_convert_to_cpu(self):
      """
      This tests the scenario in which the user defines `device='GPU'` for the
      PytorchModelHandlerX, but runs the pipeline on a machine without GPU, we
      automatically detect this discrepancy and do automatic conversion to CPU.
      A warning is also logged to inform the user.
      """
      with self.assertLogs() as log:
        with TestPipeline() as pipeline:
          examples = torch.from_numpy(
              np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
    
          state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
                                    ('linear.bias', torch.Tensor([0.5]))])
          path = os.path.join(self.tmpdir, 'my_state_dict_path')
          torch.save(state_dict, path)
    
          model_handler = PytorchModelHandlerTensor(
              state_dict_path=path,
              model_class=PytorchLinearRegression,
              model_params={
                  'input_dim': 1, 'output_dim': 1
              },
              device='GPU')
          # Upon initialization, device is cuda
          self.assertEqual(model_handler._device, torch.device('cuda'))
    
          pcoll = pipeline | 'start' >> beam.Create(examples)
          # pylint: disable=expression-not-assigned
          pcoll | RunInference(model_handler)
    
          # During model loading, device converted to cuda
>         self.assertEqual(model_handler._device, torch.device('cuda'))

apache_beam/ml/inference/pytorch_inference_test.py:770: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/pipeline.py:670: in __exit__
    self.result = self.run()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:122: in run
    state = result.wait_until_finish(duration=self.timeout)
...

The test test_external_test_specs in the CI workflow run beam_PreCommit_Python (Run Python PreCommit 3.9) failed with an error unrelated to this PR's changeset. This appears to be an unconcerning transient gRPC timeout error.
CI Run: https://github.com/apache/beam/actions/runs/19015233009/job/54302103686?pr=35708

📋 Failure Logs
[gw3] [ 99%] FAILED apache_beam/yaml/main_test.py::MainTest::test_external_test_specs
______________________ MainTest.test_external_test_specs _______________________
[gw3] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/bin/python

self = <apache_beam.yaml.main_test.MainTest testMethod=test_external_test_specs>

    def test_external_test_specs(self):
      with tempfile.TemporaryDirectory() as tmpdir:
        good_suite = os.path.join(tmpdir, 'good.yaml')
        with open(good_suite, 'w') as fout:
          fout.write(PASSING_TEST_SUITE)
        bad_suite = os.path.join(tmpdir, 'bad.yaml')
        with open(bad_suite, 'w') as fout:
          fout.write(FAILING_TEST_SUITE)
    
        # Must pass.
>       main.run_tests([
            '--yaml_pipeline',
            TEST_PIPELINE,
            '--test_suite',
            good_suite,
        ],
                       exit=False)

apache_beam/yaml/main_test.py:167: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

argv = ['--yaml_pipeline', '\npipeline:\n  type: chain\n  transforms:\n    - type: Create\n      config:\n        elements: [...ork/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpuubw5yj6/good.yaml']
exit = False

    def run_tests(argv=None, exit=True):
      known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv(
          argv)
      pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
      options = _build_pipeline_options(pipeline_spec, pipeline_args)
    
      if known_args.create_test and known_args.fix_tests:
        raise ValueError(
            'At most one of --create_test and --fix_tests may be specified.')
      elif known_args.create_test:
        result = unittest.TestResult()
        tests = []
      else:
        if known_args.test_suite:
          with open(known_args.test_suite) as fin:
            test_suite_holder = yaml.load(
                fin, Loader=yaml_transform.SafeLineLoader) or {}
        else:
          test_suite_holder = pipeline_spec
        test_specs = test_suite_holder.get('tests', [])
        if not isinstance(test_specs, list):
          raise TypeError('tests attribute must be a list of test specifications.')
        elif not test_specs:
          raise RuntimeError(
              'No tests found. '
              "If you haven't added a set of tests yet, you can get started by "
              'running your pipeline with the --create_test flag enabled.')
    
        tests = [
            yaml_testing.YamlTestCase(
                pipeline_spec, test_spec, options, known_args.fix_tests)
            for test_spec in test_specs
        ]
        suite = unittest.TestSuite(tests)
        result = unittest.TextTestRunner().run(suite)
    
      if known_args.fix_tests or known_args.create_test:
        update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)
    
      if exit:
        # emulates unittest.main()
        sys.exit(0 if result.wasSuccessful() else 1)
      else:
        if not result.wasSuccessful():
>         raise RuntimeError(result)
E         RuntimeError: <unittest.runner.TextTestResult run=1 errors=1 failures=0>

apache_beam/yaml/main.py:196: RuntimeError
----------------------------- Captured stderr call -----------------------------
E
======================================================================
ERROR: ExternalTest (line 3)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 51, in runTest
    self._fixes = run_test(
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 102, in run_test
    _ = p | yaml_transform.YamlTransform(
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 672, in __exit__
    self.result.wait_until_finish()
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 571, in wait_until_finish
    raise self._runtime_exception
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 580, in _observe_state
    for state_response in self._state_stream:
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-11-02T17:37:42.69913264+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"

The Publish Test Results CI step in the CI workflow run PreCommit Python PVR Flink failed to successsfully publish those test results and it is unrelated to this PR's changeset. It seems a transient error
CI Run: https://github.com/apache/beam/pull/35708/checks?check_run_id=54302996157

@mohamedawnallah
Copy link
Contributor Author

Hey @damccorm, I think this PR is now ready for review. I've added the Milvus Sink I/O integration along with unit and integration tests, and did some refactoring by unifying core/testing utilities between the Milvus enrichment handler and sink I/O. All relevant CI tests are passing, and I'd love to receive any feedback when you have time for it

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Pausing my review for now since my last comment would cause a major set of changes

@mohamedawnallah mohamedawnallah changed the title [1/2] sdks/python: sink data with Milvus Search I/O connector [1/3] sdks/python: refactor Milvus-related utilities as prepration step for Milvus Sink I/O integration Nov 5, 2025
In this commit, we remove that builder method to remain functional
and be used in the next Milvus sink integration PR
@mohamedawnallah mohamedawnallah changed the title [1/3] sdks/python: refactor Milvus-related utilities as prepration step for Milvus Sink I/O integration [1/3] sdks/python: refactor Milvus-related utilities as preparation step for Milvus Sink I/O integration Nov 5, 2025
Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It looks like there are now merge conflicts, but other than that and my one comment this looks good

@mohamedawnallah
Copy link
Contributor Author

mohamedawnallah commented Nov 10, 2025

@damccorm – addressed the recent comments and moved the jupyter notebook update to be after the Beam version is released (as in #35708 (comment)). The relevant CI tests are passing in the CI and the CI failed tests are unrelated to the changeset this PR introduce. I think the PR is now ready for another look

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@damccorm damccorm merged commit 83ebe73 into apache:master Nov 12, 2025
104 of 105 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants