Support metadata push mode in BaseSingleSegmentConversionExecutor#17632
Conversation
❌ 3 Tests Failed:
View the full list of 3 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
krishan1390
left a comment
There was a problem hiding this comment.
Have we tested it with METADATA push mode ?
| uploadURL, convertedTarredSegmentFile); | ||
| break; | ||
| case METADATA: | ||
| case URI: |
There was a problem hiding this comment.
BaseMultipleSegmentsConversionExecutor doesn't support URI mode. Is it fine to add here ?
There was a problem hiding this comment.
Yeah let me revert that, not much roi compared to the increased testing scope for this PR
noob-se7en
left a comment
There was a problem hiding this comment.
Lets add a test for new code (metadata push via BaseSingleSegmentConverter).
| List<Header> httpHeaders = new ArrayList<>(); | ||
| httpHeaders.add(ifMatchHeader); | ||
| httpHeaders.add(refreshOnlyHeader); | ||
| httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); | ||
| httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); |
There was a problem hiding this comment.
These headers are not required for metadata push?
There was a problem hiding this comment.
uploadSegmentWithMetadata internally calls getSegmentPushMetadataHeaders to populate the required headers
…port-for-single-sce
…port-for-single-sce
…port-for-single-sce
|
Updated the code a bit to allow METADATA push mode even with local fs just for testing. Added tests for the tasks extending the |
…port-for-single-sce
| LOGGER.warn("Local output dir found, defaulting to TAR: {}.", outputSegmentDirURI); | ||
| singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, | ||
| BatchConfigProperties.SegmentPushType.TAR.toString()); | ||
| boolean allowMetadataPushWithLocalFs = Boolean.parseBoolean( |
There was a problem hiding this comment.
Why is the new param needed. Instead of this extra param - ALLOW_METADATA_PUSH_WITH_LOCAL_FS, can we not just take the metadata push path when configured regardless of LOCAL_FS or not ?
There was a problem hiding this comment.
Its risky for multi node setup where segment download url will point to a local path (mostly on the minion) and server will look for it on their own local fs for downloading the segment.
…port-for-single-sce
|
Please note that there unrelated test failures. We should take it separately to address flaky tests.
|
Summary
Adds METADATA (and URI) segment push mode to
BaseSingleSegmentConversionExecutorso single-segment conversion tasks can push via metadata (segment on output PinotFS + metadata to controller) instead of only TAR (HTTP upload). Moves shared segment-push logic intoBaseTaskExecutorso both single- and multiple-segment conversion executors reuse the same helpers.Changes
BaseTaskExecutor
SEGMENT_PUSH_DEFAULT_ATTEMPTS,SEGMENT_PUSH_DEFAULT_PARALLELISM,SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS.getPushJobSpec(configs)– buildsPushJobSpecfrom task config.generateSegmentGenerationJobSpec(tableName, configs, pushJobSpec)– buildsSegmentGenerationJobSpecfor controller push.moveSegmentToOutputPinotFS(configs, localSegmentTarFile)– copies local segment tar to output PinotFS; requiresoutput.segment.dir.uri.getSegmentPushCommonParams(tableNameWithType)– common HTTP params (parallel push protection, table name, table type).getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider, segmentConversionResult)– headers for metadata push (ZK metadata modifier + auth).getSegmentPushType(configs)– resolves push mode from config (push.mode, default TAR); subclasses can override.BaseSingleSegmentConversionExecutor
getSegmentPushType(configs)(overridable).output.segment.dir.uriandpush.controllerUriare set) and callSegmentPushUtils.sendSegmentUriAndMetadata.getSegmentPushCommonParamsfor upload parameters and the new base helpers for the METADATA path (no duplicated push logic).BaseMultipleSegmentsConversionExecutor
getPushJobSpec,generateSegmentGenerationJobSpec,moveSegmentToOutputPinotFS, andgetSegmentPushCommonParams; uses base implementations.getSegmentPushCommonHeadersnow delegates to basegetSegmentPushMetadataHeaders.pushSegmentuses basegetSegmentPushType(taskConfigs)for resolving push mode.SegmentGenerationAndPushTaskExecutor
moveSegmentToOutputPinotFSoverrides to handle missingoutput.segment.dir.uri(return local file URI) and delegates tosuper.moveSegmentToOutputPinotFSwhen the config is present.MinionTaskUtils
allowMetadataPushWithLocalFsto skip the override to TAR push mode when output segment dir is on local FS. This flag is introduced to enable testing of METADATA push mode and is not intended for production use (unless the local disk is somehow shared and all components have access to it).Usage (single-segment METADATA push)
push.mode=METADATA(or overridegetSegmentPushTypein a subclass).output.segment.dir.uriandpush.controllerUriin task config for the METADATA path.