introduce clear separation between proposed compaction, eligibility status, and compaction mode, along with improved task state tracking#18968
Conversation
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for the PR, @cecemei !
I like the idea of separating the CompactionStatus (i.e. the current degree of compaction of an interval) from the Eligibility (i.e. whether an interval should be picked for compaction or not).
I have left some suggestions to aid with the separation. Let me know if they make sense.
server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
Outdated
Show resolved
Hide resolved
| FULL_COMPACTION, | ||
| INCREMENTAL_COMPACTION, | ||
| NOT_ELIGIBLE, | ||
| NOT_APPLICABLE |
There was a problem hiding this comment.
How is "not applicable" different from "not eligible"?
I think eligibility should have only 3 possible states - not eligible, incremental and full.
There was a problem hiding this comment.
they are tracked differently in AutoCompactionSnapshot, my original implementation only has three states and it failed: https://github.com/apache/druid/actions/runs/21509820801/job/61973864065. eventually i'd want evaluate() to only return eligibility and not status, so complete/pending status should have different eligibility
| @JsonCreator | ||
| public ClientCompactionIntervalSpec( | ||
| @JsonProperty("interval") Interval interval, | ||
| @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor> uncompactedSegments, |
There was a problem hiding this comment.
It is a little confusing to add segments here since there is already a SpecificSegmentsSpec which specifies the set of segments that should be compacted.
I feel we should add a new type of CompactionInputSpec which will be used for incremental compaction only. It would have both a non-null interval as well as a non-empty set of segment IDs to compact incrementally. The CompactionTask would handle the different input specs accordingly.
There was a problem hiding this comment.
SpecificSegmentsSpec is not supported by msq, and it somewhat felt a bit deprecated to me, maybe because of the segment lock stuff. i kinda like to specify an interval since it gives some certainty, also wonder maybe we should check non-null for interval here? i didnt see any instance with null interval.
There was a problem hiding this comment.
i kinda like to specify an interval since it gives some certainty
Yes, I agree, in the new scheme of things, it is always nice to specify the interval.
That's why I advise adding a new implementation of CompactionInputSpec which will be used only for incremental compaction so that it is easy to distinguish the two on the task side as well. We need not piggyback on the existing CompactionIntervalSpec.
| @@ -64,18 +62,27 @@ Eligibility checkEligibilityForCompaction( | |||
| */ | |||
| class Eligibility | |||
There was a problem hiding this comment.
Since this class is going to have a wider usage now, let's move it out into a separate file of its own.
| * @param searchPolicy the policy used to determine compaction eligibility | ||
| * @return a CompactionCandidate with updated status and potentially filtered segments | ||
| */ | ||
| public CompactionCandidate evaluate( |
There was a problem hiding this comment.
This method should not live here. The computation of the eligiblity should be done in CompactionStatusTracker.
The overall workflow would be like this:
- identify a
CompactionCandidate - determine its
CompactionStatus - determine the eligiblity via
CompactionStatusTracker.computeEligibility()
There was a problem hiding this comment.
synced offline, DataSourceCompactibleSegmentIterator would be responsible for filtering out ineligible candidates
There was a problem hiding this comment.
+1
But this method should still not live here.
We should just have a withEligibility() method so that this class remains a bean for the most part.
This method can be moved to CompactionEligibility.compute() similar to CompactionStatus.compute().
| * | ||
| * @return Pair of eligibility status and compaction status with reason for first failed check | ||
| */ | ||
| Pair<CompactionCandidateSearchPolicy.Eligibility, CompactionStatus> evaluate() |
There was a problem hiding this comment.
Since the CompactionStatus and eligibility are going to be two separate things now, this class should not deal with eligibility at all. We can retain the old code in this class.
Once the CompactionStatus has been computed, that should be passed to CompactionStatusTracker.computeEligiblity() to determine the eligibility.
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Outdated
Show resolved
Hide resolved
| if (inputBytesCheck.isSkipped()) { | ||
| return inputBytesCheck; | ||
| final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck = inputBytesAreWithinLimit(); | ||
| if (inputBytesCheck != null) { |
There was a problem hiding this comment.
I think the input bytes check should now be moved out of this class and used in CompactionStatusTracker.computeEligibility() instead.
There was a problem hiding this comment.
@cecemei , I think this class should remain untouched so that CompactionStatus can remain completely agnostic of CompactionEligibility.
There was a problem hiding this comment.
I guess this conversation is out of date after latest refactorings?
| final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); | ||
| final CompactionStatus compactionStatus = CompactionStatus.compute(candidates, config, fingerprintMapper); | ||
| final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus); | ||
| final CompactionCandidate candidatesWithStatus = |
There was a problem hiding this comment.
I feel this class should remain untouched since we are only computing the CompactionStatus here. The eligiblity computation should happen later in the CompactionJobQueue only.
There was a problem hiding this comment.
why should it be jobQueue only? both jobQueue and not as a coordinator duty?
There was a problem hiding this comment.
Coordinator-based compaction will be deprecated soon.
We want to support incremental compaction on Overlord-based compaction only so that it becomes an incentive for users to migrate to compaction supervisors. We followed the same approach with indexing fingerprints as well.
There was a problem hiding this comment.
hmm still thinking about this.
There was a problem hiding this comment.
For reference, Cece. Changes I have been working on have also been only adding functionality to the supervisor. The IndexingState fingerprinting that compaction uses is only available for supervisors and rule based compaction/reindexing will also be supervisor only. Maybe it is time we officially do a separate PR and mark this duty as deprecated and update docs as such since so much is happening in this space these days
| case FULL_COMPACTION: | ||
| clientCompactionIntervalSpec = new ClientCompactionIntervalSpec(entry.getCompactionInterval(), null, null); | ||
| break; | ||
| case INCREMENTAL_COMPACTION: |
There was a problem hiding this comment.
Let's update this part only when incremental compaction has been implemented on the task side. Also, we should support incremental compaction with CompactionJobQueue (i.e. Overlord-based compaction) only.
There was a problem hiding this comment.
Please revert this change since we need not support incremental compaction with Coordinator-based compaction duty.
@cecemei , just so that we are on the same page, the It would be nice to get rid of the SKIPPED state too since it is more of an eligibility thing, but I guess we can leave it for now. |
| final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); | ||
| final CompactionStatus compactionStatus = CompactionStatus.compute(candidates, config, fingerprintMapper); | ||
| final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus); | ||
| final CompactionCandidate candidatesWithStatus = |
There was a problem hiding this comment.
Coordinator-based compaction will be deprecated soon.
We want to support incremental compaction on Overlord-based compaction only so that it becomes an incentive for users to migrate to compaction supervisors. We followed the same approach with indexing fingerprints as well.
| * @param searchPolicy the policy used to determine compaction eligibility | ||
| * @return a CompactionCandidate with updated status and potentially filtered segments | ||
| */ | ||
| public CompactionCandidate evaluate( |
There was a problem hiding this comment.
+1
But this method should still not live here.
We should just have a withEligibility() method so that this class remains a bean for the most part.
This method can be moved to CompactionEligibility.compute() similar to CompactionStatus.compute().
| if (inputBytesCheck.isSkipped()) { | ||
| return inputBytesCheck; | ||
| final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck = inputBytesAreWithinLimit(); | ||
| if (inputBytesCheck != null) { |
There was a problem hiding this comment.
@cecemei , I think this class should remain untouched so that CompactionStatus can remain completely agnostic of CompactionEligibility.
| /** | ||
| * Describes the eligibility of an interval for compaction. | ||
| */ | ||
| public class CompactionEligibility |
There was a problem hiding this comment.
We should add a method to this class:
public static CompactionEligibility compute(CompactionStatus status, CompactionCandidateSearchPolicy policy) {
// determine the eligibility here
}
There was a problem hiding this comment.
added a new evaluate static method
| final CompactionCandidate candidatesWithStatus = | ||
| CompactionCandidate.from(segments, config.getSegmentGranularity()) | ||
| .evaluate(config, searchPolicy, fingerprintMapper); |
There was a problem hiding this comment.
This part should probably look more like the following:
| final CompactionCandidate candidatesWithStatus = | |
| CompactionCandidate.from(segments, config.getSegmentGranularity()) | |
| .evaluate(config, searchPolicy, fingerprintMapper); | |
| final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); | |
| final CompactionStatus compactionStatus = CompactionStatus.compute(candidates, config, fingerprintMapper); | |
| final CompactionEligibility eligibility = CompactionEligibility.compute(compactionStatus, searchPolicy); | |
| final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus).withEligibility(eligibility); |
| ); | ||
| } | ||
|
|
||
| public CompactionCandidate withPolicyEligibility(CompactionEligibility eligibility) |
There was a problem hiding this comment.
Nit: Thinking about this, I think it would be best to have a single method (e.g. withStatusAndEligibility()) that creates a copy of the CompactionCandidate with both the status and eligibility populated.
Otherwise, we would always create an intermediate copy unnecessarily.
| CompactionCandidate candidate, | ||
| CompactionCandidateSearchPolicy searchPolicy | ||
| ) | ||
| public CompactionStatus computeCompactionStatus(CompactionCandidate candidate) |
There was a problem hiding this comment.
I suppose we can now remove this method altogether, since we do not want to use the lastTaskStatus anyway.
There was a problem hiding this comment.
not sure i understand, dont we still track pending/running/completed status?
server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/server/compaction/CompactionEligibility.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/server/compaction/CompactionEligibilityEvaluateTest.java
Fixed
Show fixed
Hide fixed
I addressed most comments except for the coordinator based compaction and incremental compaction feature change. The major change is i moved a lot of CompactionStatus stuff to CompactionEligibility, and update the CHECK to return String (null for previous CompactionStatus.COMPLETE, and non-null for previous CompactionStatus.pending...), it seems simpler since after all we're just looking for a reason to do compaction. |
| * <p> | ||
| * This method performs a two-stage evaluation: | ||
| * <ol> | ||
| * <li>First, uses {@link Evaluator} to check if the candidate needs compaction |
There was a problem hiding this comment.
did we make any changes to the Evaluator logic in moving it to this file or is it just lift and place in new location?
| final CompactionCandidate candidates = CompactionCandidate.from(segments, config.getSegmentGranularity()); | ||
| final CompactionStatus compactionStatus = CompactionStatus.compute(candidates, config, fingerprintMapper); | ||
| final CompactionCandidate candidatesWithStatus = candidates.withCurrentStatus(compactionStatus); | ||
| final CompactionCandidate candidatesWithStatus = |
There was a problem hiding this comment.
For reference, Cece. Changes I have been working on have also been only adding functionality to the supervisor. The IndexingState fingerprinting that compaction uses is only available for supervisors and rule based compaction/reindexing will also be supervisor only. Maybe it is time we officially do a separate PR and mark this duty as deprecated and update docs as such since so much is happening in this space these days
|
Had a brief discussion with @cecemei on this. I think there has been some confusion regarding the roles of In short, we shouldn't need to make any major modification to any class in this PR except I have shared some details offline. We should be able to get this resolved by Monday. |
|
Hi @kfaraz @capistrant , I made some major changes to this pr, and hopefully it's in a much better state, please take a look when you're available, thanks! The incremental compaction related change has been removed from this pr, i'm going to make another one for that. |
There was a problem hiding this comment.
I think some javadocs in this class would be helpful. A class level one + at least a brief sentence of each mode, even if they are a bit self explanatory
|
|
||
| if (candidate.getCurrentStatus() != null) { | ||
| autoCompactionContext.put(COMPACTION_REASON_KEY, candidate.getCurrentStatus().getReason()); | ||
| if (CompactionMode.NOT_APPLICABLE.equals(candidate.getMode())) { |
There was a problem hiding this comment.
Is this supposed to be if (!CompactionMode.NOT_APPLICABLE.equals(candidate.getMode()))?
Seems to me like for compaction jobs the reason will be missing now, and for NOT_APPLICABLE a task wouldn't even be started anyways, as an exception would get thrown by compactSegments
| @@ -346,8 +361,10 @@ static DimensionRangePartitionsSpec getEffectiveRangePartitionsSpec(DimensionRan | |||
| */ | |||
| private static class Evaluator | |||
There was a problem hiding this comment.
I'm not sure how much I like moving away from a more descriptive return type for the individual checks. It feels like a nitpick since functionally everything is the same. And I guess I understand why you wanted to not be using CompactionStatus all over the place in the Evaluator like we were. My only alternative suggestion would be yet another simple record like object or something signifying a check with a result and reason (populated if the check failed). The "return null if check passed" just feels a little unintuitive. Maybe a compromise would be explicitly stating how the checks work in javadoc for the CHECKS and FINGERPRINT_CHECKS lists (or in the evalaute method javadoc)
There was a problem hiding this comment.
Yeah, I don't like the null returns much myself.
We should update each CHECK to return a CheckResult object instead of a CompactionStatus.
The rest of this class should probably remain unchanged as it seems to be bloating up the PR without much value added.
| if (inputBytesCheck.isSkipped()) { | ||
| return inputBytesCheck; | ||
| final CompactionCandidateSearchPolicy.Eligibility inputBytesCheck = inputBytesAreWithinLimit(); | ||
| if (inputBytesCheck != null) { |
There was a problem hiding this comment.
I guess this conversation is out of date after latest refactorings?
| Assert.assertEquals(expectedSegmentCountAwaitingCompaction, snapshot.getSegmentCountAwaitingCompaction()); | ||
| Assert.assertEquals(expectedSegmentCountCompressed, snapshot.getSegmentCountCompacted()); | ||
| Assert.assertEquals(expectedSegmentCountSkipped, snapshot.getSegmentCountSkipped()); | ||
| Assert.assertEquals(new AutoCompactionSnapshot( |
There was a problem hiding this comment.
will this make it less clear what is causing the difference than having all the individual assertions? your way looks better on the eyes, but I wonder if it makes it more annoying for a dev to identify what exactly they broke if it starts failing?
kfaraz
left a comment
There was a problem hiding this comment.
@cecemei , I feel a lot of changes here are really not needed and probably make the definitions of various compaction entities confusing.
- The only change that is really needed here is adding a
CompactionModeenum toEligibility. - The existing terminology
CompactionCandidate,CompactionStatus,Eligibility,CompactionTaskStatuscaptures the respective purposes nicely and also provides a clear distinction. Let's stick with the existing terminology unless there is some real gap in the current nomenclature/functionality. - If you feel it needs some explanation, we may add/update javadocs.
- You may either remove all the refactors from this PR (so that we can quickly unblock incremental compaction in #18996) OR update this PR to only make suggested modifications that do not break the existing definitions.
P.S. Existing definitions
| Class | Definition |
|---|---|
CompactionCandidate |
A potential candidate for compaction, identified by an interval and some segment IDs |
CompactionStatus |
Current status of a CompactionCandidate indicated by number of compacted/uncompacted segments/bytes in the interval. |
Eligibility |
Given a CompactionCandidate and CompactionStatus, does the policy think it is eligible for compaction? |
CompactionTaskStatus |
Status of past/ongoing compaction tasks for a CompactionCandidate |
All of these are distinct from each other and should preferably remain so.
There is room for improvement in these classes, but we should do them in a manner that does not completely change the existing semantics.
| final CompactionStatus compactionStatus = getCurrentStatusForJob(job, policy); | ||
| switch (compactionStatus.getState()) { | ||
| case RUNNING: | ||
| final CompactionCandidate.TaskState candidateState = getCurrentTaskStateForJob(job); |
There was a problem hiding this comment.
Please do not add a new task state. Use the existing CompactionTaskStatus class.
| return createCandidate(proposedCompaction, eligibility, null); | ||
| } | ||
|
|
||
| public CompactionCandidate createCandidate( |
There was a problem hiding this comment.
We shouldn't really have this method here. The CompactionMode is an attribute that we associate to a CompactionCandidate. One should not be responsible for creating the other.
| '}'; | ||
| } | ||
| } | ||
| CompactionCandidate createCandidate(CompactionCandidate.ProposedCompaction candidate, CompactionStatus eligibility); |
There was a problem hiding this comment.
It doesn't make sense for the policy to be creating candidates since the candidates are identified much earlier by the DatasourceCompactibleSegmentIterator itself. The policy can only check the eligibility of a candidate for compaction. (I suppose the name CompactionCandidateSearchPolicy might be misleading here, since it is really the CompactionCandidatePickAndPriorizePolicy but it can be a mouthful, so we can stick with what we have.)
We should retain the original method here. The caller (CompactionJobQueue) should just check the eligibility and then decide whether to launch a job or not. If yes, then whether full or incremental.
| * Used by {@link CompactionStatusTracker#computeCompactionTaskState(CompactionCandidate)}. | ||
| * The callsite then determines whether to launch compaction task or not. | ||
| */ | ||
| public enum TaskState |
There was a problem hiding this comment.
Please do not add a new TaskState, use CompactionTaskStatus instead. If it doesn't contain any info, let's add it there.
| * Non-empty list of segments of a datasource being proposed for compaction. | ||
| * A proposed compaction typically contains all the segments of a single time chunk. | ||
| */ | ||
| public static class ProposedCompaction |
There was a problem hiding this comment.
What we need is a way to distinguish between a candidate before and after the status has been computed, so that we can avoid the null checks on currentStatus. I don't think this change fully achieves that.
Also, the name CompactionCandidate captures the intent better as it defines a potential "candidate" for compaction. So let's stick with it.
Instead of this change, you may consider adding a new class CompactionCandidateAndStatus which will be used by both the methods of the search policy. The new class will contain only a candidate field and a status field.
I don't think we will need this class to contain the Eligibility (or the CompactionMode) since it will be computed and used only by the CompactionJobQueue.
| * @param eligibility initial eligibility from compaction config checks | ||
| * @return final compaction candidate | ||
| */ | ||
| class Eligibility |
There was a problem hiding this comment.
Let's retain the Eligibility class since it answers the question:
"Does the search policy consider this CompactionCandidate eligible for compaction?"
The Eligibility class should now also contain an enum field for CompactionMode. The rest of the class can remain unchanged.
| CompactionCandidate candidatesWithStatus = CompactionCandidate | ||
| .from(partialEternitySegments, null) | ||
| .withCurrentStatus(CompactionStatus.skipped("Segments have partial-eternity intervals")); | ||
| CompactionCandidate candidatesWithStatus = |
There was a problem hiding this comment.
This class probably doesn't need to change in this PR.
The Eligibility should be computed and used only by the CompactionJobQueue (and maybe CompactSegments for backward compatibility).
| * This method assumes that the given candidate is eligible for compaction | ||
| * based on the current compaction config/supervisor of the datasource. | ||
| */ | ||
| public CompactionStatus computeCompactionStatus( |
There was a problem hiding this comment.
The preferable approach here is to completely remove this method and use the existing method getLatestTaskStatus().
This PR refactors compaction eligibility evaluation by introducing clear separation between proposed compaction (what segments to compact), eligibility status (whether compaction is needed), and compaction mode (how to compact), along with improved task state tracking.
Motivation
The previous
CompactionStatusimplementation mixed multiple concerns:This made it difficult to reason about compaction decisions and track the state of compaction candidates through the system.
Key Changes
1. Introduced
CompactionModeEnumNew enum to represent different compaction strategies:
FULL_COMPACTION: Compact all segments in the intervalNOT_APPLICABLE: No compaction should be performedThe mode determines whether a candidate will be queued for compaction or skipped.
2. Refactored
CompactionCandidateNew structure:
ProposedCompaction: Nested class containing the segment list, intervals, and statisticsCompactionStatus: Eligibility evaluation result (COMPLETE, ELIGIBLE, NOT_ELIGIBLE)CompactionMode: How to perform compaction (FULL_COMPACTION or NOT_APPLICABLE)policyNote: Optional note from the policy explaining why this candidate was chosen/skippedBenefits:
3. Updated
CompactionStatusNew responsibilities:
COMPLETE,ELIGIBLE, orNOT_ELIGIBLESeparation from policy:
CompactionStatuschecks config-based requirements (granularity, rollup, etc.)4. Simplified
CompactionCandidateSearchPolicyChanged interface:
checkEligibilityForCompaction(candidate, taskStatus) → EligibilitycreateCandidate(proposedCompaction, eligibility) → CompactionCandidateBenefits:
5. Improved Task State Tracking
New
TaskStateenum inCompactionCandidate:READY: No task running, can start compactionTASK_IN_PROGRESS: Compaction task already runningRECENTLY_COMPLETED: Task recently finished, segments not yet updatedBenefits:
Code Flow Example
Testing
CompactionCandidateTestfor new candidate structureCompactionStatusBuilderTestfor eligibility evaluationThis PR has: