Skip to content

Commit caedb08

Browse files
authored
Add partitioned group creation time to visit marker (#7217)
Signed-off-by: Anna Tran <trananna@amazon.com>
1 parent 2e4bc85 commit caedb08

6 files changed

Lines changed: 16 additions & 10 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
3636
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
3737
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
38+
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
3839
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
3940
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
4041
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132

pkg/compactor/blocks_cleaner_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,9 +1000,9 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
10001000
require.NoError(t, err)
10011001
require.False(t, partitionedGroupFileExists)
10021002

1003-
partitionedGroupFileExists, err = userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath())
1003+
visitMarkerExists, err := userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath())
10041004
require.NoError(t, err)
1005-
require.False(t, partitionedGroupFileExists)
1005+
require.False(t, visitMarkerExists)
10061006
}
10071007

10081008
func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {

pkg/compactor/partition_compaction_grouper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
632632
partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount
633633
partitionID := partitionedGroup.partition.PartitionID
634634
partitionedGroupLogger := log.With(g.logger, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "rangeDuration", partitionedGroup.rangeDuration().String(), "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group_hash", groupHash)
635-
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionID)
635+
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionedGroup.partitionedGroupInfo.CreationTime, partitionID)
636636
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker)
637637
if isVisited, err := g.isGroupVisited(partitionID, visitMarkerManager); err != nil {
638638
level.Warn(partitionedGroupLogger).Log("msg", "unable to check if partition is visited", "err", err, "group", partitionedGroup.String())

pkg/compactor/partition_compaction_planner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
8585
// claimed same partition in grouper at same time.
8686
time.Sleep(p.plannerDelay)
8787

88-
visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
88+
visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionInfo.PartitionedGroupCreationTime, partitionID)
8989
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
9090
existingPartitionVisitMarker := &partitionVisitMarker{}
9191
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)

pkg/compactor/partition_visit_marker.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,22 @@ type partitionVisitMarker struct {
2828
CompactorID string `json:"compactorID"`
2929
Status VisitStatus `json:"status"`
3030
PartitionedGroupID uint32 `json:"partitionedGroupID"`
31-
PartitionID int `json:"partitionID"`
31+
// VisitTime is a unix timestamp of when the partitioning group plan was created, in order to validate if the marker
32+
// is referring to the latest version of the group plan
33+
PartitionedGroupCreationTime int64 `json:"partitionedGroupCreationTime"`
34+
PartitionID int `json:"partitionID"`
3235
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
3336
VisitTime int64 `json:"visitTime"`
3437
// Version of the file.
3538
Version int `json:"version"`
3639
}
3740

38-
func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionID int) *partitionVisitMarker {
41+
func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitioned_group_creation_time int64, partitionID int) *partitionVisitMarker {
3942
return &partitionVisitMarker{
40-
CompactorID: compactorID,
41-
PartitionedGroupID: partitionedGroupID,
42-
PartitionID: partitionID,
43+
CompactorID: compactorID,
44+
PartitionedGroupID: partitionedGroupID,
45+
PartitionedGroupCreationTime: partitioned_group_creation_time,
46+
PartitionID: partitionID,
4347
}
4448
}
4549

pkg/compactor/partitioned_group_info.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
156156
status.PendingPartitions++
157157
allPartitionCompleted = false
158158
status.PendingOrFailedPartitions = append(status.PendingOrFailedPartitions, partition)
159-
} else if visitMarker.VisitTime < p.CreationTime {
159+
} else if visitMarker.VisitTime < p.CreationTime ||
160+
(visitMarker.PartitionedGroupCreationTime > 0 && visitMarker.PartitionedGroupCreationTime < p.CreationTime) {
160161
status.VisitMarkersToDelete = append(status.VisitMarkersToDelete, visitMarker)
161162
allPartitionCompleted = false
162163
} else if (visitMarker.GetStatus() == Pending || visitMarker.GetStatus() == InProgress) && !visitMarker.IsExpired(partitionVisitMarkerTimeout) {

0 commit comments

Comments
 (0)