diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 82b6fd0330c9..e2bdc3c6c0f8 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -169,11 +169,11 @@ def process( # we are too ahead of time, let's wait. restriction_tracker.defer_remainder( timestamp.Timestamp(current_output_timestamp)) - return + break if not restriction_tracker.try_claim(current_output_index): # nothing to claim, just stop - return + break output = self._get_output(current_output_index, current_output_timestamp) @@ -186,6 +186,9 @@ def process( current_output_index += 1 + # Don't yield any values here so that the generator + # raises StopIteration when we break out of the while loop. + class PeriodicSequence(PTransform): '''