-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[1/3] sdks/python: refactor Milvus-related utilities as preparation step for Milvus Sink I/O integration #35708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services. |
|
stop reviewer notifications |
|
Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment |
|
Regards dynamic batching perhaps it is in nice to have category. For now we can safely use the default JDBC I/O connector default batch size (1000) |
b084802 to
288f245
Compare
288f245 to
b957194
Compare
|
Will make this PR in reviewable state after this PR #35467 merged. It refactors and unify some code related to testing from both Milvus Enrichment Handler and Sink I/O |
c07bd55 to
5df9628
Compare
4512b63 to
91266a7
Compare
390e69f to
9445aaa
Compare
|
The test 📋 Failure Logs[gw5] [ 30%] FAILED apache_beam/ml/inference/pytorch_inference_test.py::PytorchRunInferencePipelineTest::test_gpu_auto_convert_to_cpu
_________ PytorchRunInferencePipelineTest.test_gpu_auto_convert_to_cpu _________
[gw5] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python
self = <apache_beam.ml.inference.pytorch_inference_test.PytorchRunInferencePipelineTest testMethod=test_gpu_auto_convert_to_cpu>
def test_gpu_auto_convert_to_cpu(self):
"""
This tests the scenario in which the user defines `device='GPU'` for the
PytorchModelHandlerX, but runs the pipeline on a machine without GPU, we
automatically detect this discrepancy and do automatic conversion to CPU.
A warning is also logged to inform the user.
"""
with self.assertLogs() as log:
with TestPipeline() as pipeline:
examples = torch.from_numpy(
np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
('linear.bias', torch.Tensor([0.5]))])
path = os.path.join(self.tmpdir, 'my_state_dict_path')
torch.save(state_dict, path)
model_handler = PytorchModelHandlerTensor(
state_dict_path=path,
model_class=PytorchLinearRegression,
model_params={
'input_dim': 1, 'output_dim': 1
},
device='GPU')
# Upon initialization, device is cuda
self.assertEqual(model_handler._device, torch.device('cuda'))
pcoll = pipeline | 'start' >> beam.Create(examples)
# pylint: disable=expression-not-assigned
pcoll | RunInference(model_handler)
# During model loading, device converted to cuda
> self.assertEqual(model_handler._device, torch.device('cuda'))
apache_beam/ml/inference/pytorch_inference_test.py:770:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/pipeline.py:670: in __exit__
self.result = self.run()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
...The test 📋 Failure Logs[gw3] [ 99%] FAILED apache_beam/yaml/main_test.py::MainTest::test_external_test_specs
______________________ MainTest.test_external_test_specs _______________________
[gw3] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/bin/python
self = <apache_beam.yaml.main_test.MainTest testMethod=test_external_test_specs>
def test_external_test_specs(self):
with tempfile.TemporaryDirectory() as tmpdir:
good_suite = os.path.join(tmpdir, 'good.yaml')
with open(good_suite, 'w') as fout:
fout.write(PASSING_TEST_SUITE)
bad_suite = os.path.join(tmpdir, 'bad.yaml')
with open(bad_suite, 'w') as fout:
fout.write(FAILING_TEST_SUITE)
# Must pass.
> main.run_tests([
'--yaml_pipeline',
TEST_PIPELINE,
'--test_suite',
good_suite,
],
exit=False)
apache_beam/yaml/main_test.py:167:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argv = ['--yaml_pipeline', '\npipeline:\n type: chain\n transforms:\n - type: Create\n config:\n elements: [...ork/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpuubw5yj6/good.yaml']
exit = False
def run_tests(argv=None, exit=True):
known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv(
argv)
pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
options = _build_pipeline_options(pipeline_spec, pipeline_args)
if known_args.create_test and known_args.fix_tests:
raise ValueError(
'At most one of --create_test and --fix_tests may be specified.')
elif known_args.create_test:
result = unittest.TestResult()
tests = []
else:
if known_args.test_suite:
with open(known_args.test_suite) as fin:
test_suite_holder = yaml.load(
fin, Loader=yaml_transform.SafeLineLoader) or {}
else:
test_suite_holder = pipeline_spec
test_specs = test_suite_holder.get('tests', [])
if not isinstance(test_specs, list):
raise TypeError('tests attribute must be a list of test specifications.')
elif not test_specs:
raise RuntimeError(
'No tests found. '
"If you haven't added a set of tests yet, you can get started by "
'running your pipeline with the --create_test flag enabled.')
tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)
if known_args.fix_tests or known_args.create_test:
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)
if exit:
# emulates unittest.main()
sys.exit(0 if result.wasSuccessful() else 1)
else:
if not result.wasSuccessful():
> raise RuntimeError(result)
E RuntimeError: <unittest.runner.TextTestResult run=1 errors=1 failures=0>
apache_beam/yaml/main.py:196: RuntimeError
----------------------------- Captured stderr call -----------------------------
E
======================================================================
ERROR: ExternalTest (line 3)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 51, in runTest
self._fixes = run_test(
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 102, in run_test
_ = p | yaml_transform.YamlTransform(
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 672, in __exit__
self.result.wait_until_finish()
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 571, in wait_until_finish
raise self._runtime_exception
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 580, in _observe_state
for state_response in self._state_stream:
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-11-02T17:37:42.69913264+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"The Publish Test Results CI step in the CI workflow run |
|
Hey @damccorm, I think this PR is now ready for review. I've added the Milvus Sink I/O integration along with unit and integration tests, and did some refactoring by unifying core/testing utilities between the Milvus enrichment handler and sink I/O. All relevant CI tests are passing, and I'd love to receive any feedback when you have time for it |
damccorm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Pausing my review for now since my last comment would cause a major set of changes
In this commit, we remove that builder method to remain functional and be used in the next Milvus sink integration PR
damccorm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! It looks like there are now merge conflicts, but other than that and my one comment this looks good
0ee038c to
0c00044
Compare
|
@damccorm – addressed the recent comments and moved the jupyter notebook update to be after the Beam version is released (as in #35708 (comment)). The relevant CI tests are passing in the CI and the CI failed tests are unrelated to the changeset this PR introduce. I think the PR is now ready for another look |
damccorm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Change Description
Towards #35046.
Depends on #35920.
Next #36729.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.