Skip to content

Support metadata push mode in BaseSingleSegmentConversionExecutor#17632

Merged
swaminathanmanish merged 11 commits intoapache:masterfrom
shounakmk219:metadata-push-support-for-single-sce
Feb 16, 2026
Merged

Support metadata push mode in BaseSingleSegmentConversionExecutor#17632
swaminathanmanish merged 11 commits intoapache:masterfrom
shounakmk219:metadata-push-support-for-single-sce

Conversation

@shounakmk219
Copy link
Copy Markdown
Collaborator

@shounakmk219 shounakmk219 commented Feb 4, 2026

Summary

Adds METADATA (and URI) segment push mode to BaseSingleSegmentConversionExecutor so single-segment conversion tasks can push via metadata (segment on output PinotFS + metadata to controller) instead of only TAR (HTTP upload). Moves shared segment-push logic into BaseTaskExecutor so both single- and multiple-segment conversion executors reuse the same helpers.

Changes

BaseTaskExecutor

  • Added shared segment push constants: SEGMENT_PUSH_DEFAULT_ATTEMPTS, SEGMENT_PUSH_DEFAULT_PARALLELISM, SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS.
  • Added protected helpers used by both conversion executors:
    • getPushJobSpec(configs) – builds PushJobSpec from task config.
    • generateSegmentGenerationJobSpec(tableName, configs, pushJobSpec) – builds SegmentGenerationJobSpec for controller push.
    • moveSegmentToOutputPinotFS(configs, localSegmentTarFile) – copies local segment tar to output PinotFS; requires output.segment.dir.uri.
    • getSegmentPushCommonParams(tableNameWithType) – common HTTP params (parallel push protection, table name, table type).
    • getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider, segmentConversionResult) – headers for metadata push (ZK metadata modifier + auth).
    • getSegmentPushType(configs) – resolves push mode from config (push.mode, default TAR); subclasses can override.

BaseSingleSegmentConversionExecutor

  • Supports configurable push mode (TAR vs METADATA/URI) via getSegmentPushType(configs) (overridable).
  • After conversion, upload step branches on push type: TAR → existing HTTP upload; METADATA/URI → move segment to output PinotFS (when output.segment.dir.uri and push.controllerUri are set) and call SegmentPushUtils.sendSegmentUriAndMetadata.
  • Uses base getSegmentPushCommonParams for upload parameters and the new base helpers for the METADATA path (no duplicated push logic).

BaseMultipleSegmentsConversionExecutor

  • Removed duplicate push logic: dropped local constants and implementations of getPushJobSpec, generateSegmentGenerationJobSpec, moveSegmentToOutputPinotFS, and getSegmentPushCommonParams; uses base implementations.
  • getSegmentPushCommonHeaders now delegates to base getSegmentPushMetadataHeaders.
  • pushSegment uses base getSegmentPushType(taskConfigs) for resolving push mode.

SegmentGenerationAndPushTaskExecutor

  • moveSegmentToOutputPinotFS overrides to handle missing output.segment.dir.uri (return local file URI) and delegates to super.moveSegmentToOutputPinotFS when the config is present.

MinionTaskUtils

  • new flag allowMetadataPushWithLocalFs to skip the override to TAR push mode when output segment dir is on local FS. This flag is introduced to enable testing of METADATA push mode and is not intended for production use (unless the local disk is somehow shared and all components have access to it).

Usage (single-segment METADATA push)

  • Set task config push.mode = METADATA (or override getSegmentPushType in a subclass).
  • Provide output.segment.dir.uri and push.controllerUri in task config for the METADATA path.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Feb 4, 2026

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
12196 3 12193 59
View the full list of 3 ❄️ flaky test(s)
org.apache.pinot.integration.tests.KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest::setUp

Flake rate in main: 100.00% (Passed 0 times, Failed 76 times)

Stack Traces | 12.1s run time
Could not find a valid Docker environment. Please see logs and check configuration
org.apache.pinot.plugin.inputformat.json.confluent.JsonConfluentSchemaTest::@BeforeClass setup

Flake rate in main: 100.00% (Passed 0 times, Failed 49 times)

Stack Traces | 0.44s run time
Could not find a valid Docker environment. Please see logs and check configuration
org.apache.pinot.plugin.inputformat.json.confluent.JsonConfluentSchemaTest::setup

Flake rate in main: 100.00% (Passed 0 times, Failed 98 times)

Stack Traces | 0.44s run time
Could not find a valid Docker environment. Please see logs and check configuration

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Copy Markdown
Contributor

@krishan1390 krishan1390 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we tested it with METADATA push mode ?

uploadURL, convertedTarredSegmentFile);
break;
case METADATA:
case URI:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BaseMultipleSegmentsConversionExecutor doesn't support URI mode. Is it fine to add here ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let me revert that, not much roi compared to the increased testing scope for this PR

Copy link
Copy Markdown
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a test for new code (metadata push via BaseSingleSegmentConverter).

Comment on lines 185 to 189
List<Header> httpHeaders = new ArrayList<>();
httpHeaders.add(ifMatchHeader);
httpHeaders.add(refreshOnlyHeader);
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These headers are not required for metadata push?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploadSegmentWithMetadata internally calls getSegmentPushMetadataHeaders to populate the required headers

@shounakmk219
Copy link
Copy Markdown
Collaborator Author

Updated the code a bit to allow METADATA push mode even with local fs just for testing. Added tests for the tasks extending the BaseSingleSegmentConversionExecutor

@shounakmk219 shounakmk219 added enhancement Improvement to existing functionality minion Related to Pinot Minion task framework labels Feb 12, 2026
Copy link
Copy Markdown
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM !

LOGGER.warn("Local output dir found, defaulting to TAR: {}.", outputSegmentDirURI);
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.toString());
boolean allowMetadataPushWithLocalFs = Boolean.parseBoolean(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the new param needed. Instead of this extra param - ALLOW_METADATA_PUSH_WITH_LOCAL_FS, can we not just take the metadata push path when configured regardless of LOCAL_FS or not ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its risky for multi node setup where segment download url will point to a local path (mostly on the minion) and server will look for it on their own local fs for downloading the segment.

@swaminathanmanish
Copy link
Copy Markdown
Contributor

Please note that there unrelated test failures. We should take it separately to address flaky tests.

KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.setUp:292->BaseRealtimeClusterIntegrationTest.setUp:67->startKafka:88->startSchemaRegistry:99 » IllegalState Could not find a valid Docker environment. Please see logs and check configuration

@swaminathanmanish swaminathanmanish merged commit 657e7f0 into apache:master Feb 16, 2026
36 of 48 checks passed
@xiangfu0 xiangfu0 added the ingestion Related to data ingestion pipeline label Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improvement to existing functionality ingestion Related to data ingestion pipeline minion Related to Pinot Minion task framework

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants