feat: introduce a full server timeline view on Broker#19212
feat: introduce a full server timeline view on Broker#19212Fly-Style wants to merge 4 commits intoapache:masterfrom
Conversation
d107101 to
757f6f1
Compare
|
Is this related to: #18716? |
|
@jtuglu1 hi! Haven't seen this issue, will take a look tomorrow! Thanks! |
f67d13a to
ee433bc
Compare
|
If this is trying to solve the problem in the issue I linked above, can we add the embedded test added in: #18737 |
|
I find the logic for merging mergedSelectors quite complex and difficult to reason about. Introducing two additional state-tracking variables, metadataSegments and metadataRemovedSegmentIds, makes it even harder to follow. My two cents: avoid merging entirely and use MetadataSegmentView as the source of truth for determining which segments should be visible. With this approach, we would not need an additional MetadataSegmentViewCallback. Instead, in the poll() function, we can directly remove segments from mergedSelectors and mergedTimelines when they are marked as removed. These removals typically come from MarkSegmentsAsUnusedAction, and since we rarely reverse them, simply removing those segments should keep things mostly consistent. For newly added segments, we can reuse the ServerSelector from BrokerServerView, or create an empty one if necessary. We can still keep the BsvCallback to listen for segment location changes and update them only if the segment is visible according to metadata. BrokerServerView would remain the source of truth for which segments are available on which servers, while the updated MetadataSegmentView would be the source of truth for determining which segments should be queried and their locations. |
| public class MetadataSegmentView | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); | ||
| private static final String NEW_SEGMENTS_DETECTED_METRIC_NAME = "metadataSegmentView/segments/added"; |
There was a problem hiding this comment.
Elsewhere in this file, Metric.SYNC_DURATION_MILLIS is used for the sync time. The class here parallels the metadata sync on the Coordinator so it uses the same metric names when an equivalent is available. For these two I think the equivalents are Metric.UPDATED_USED_SEGMENTS and Metric.DELETED_SEGMENTS.
There was a problem hiding this comment.
Intention here is to highlight that specific area for ease of testing, and afterwards switch back to normal metrics.
| * Caches the replication factor for segment IDs. In case of coordinator restarts or leadership re-elections, the coordinator API returns `null` replication factor until load rules are evaluated. | ||
| * A set containing the identifiers of the segments currently being managed or tracked by this view. | ||
| * <p> | ||
| * This field is immutable and provides a snapshot of segment identifiers at a given point |
There was a problem hiding this comment.
In what sense is it immutable? It does look like it is mutated on each call to poll.
It looks like it isn't used outside poll, so for clarity the javadoc could say something like "this is not accessed outside poll() and so does not need synchronization".
There was a problem hiding this comment.
It's a leftover after trying different approaches, will remove.
| synchronized (lock) { | ||
| for (SegmentId segmentId : segmentIds) { | ||
| final DataSegment metadataSegment = metadataSegments.remove(segmentId); | ||
| metadataRemovedSegmentIds.add(segmentId); |
There was a problem hiding this comment.
What is this set being used for? It looks like it grows every time a segment is removed, and doesn't shrink unless a segment is added back after being removed. Over time, natural removal of segments from compaction, reindexing, etc will cause this to grow without bound. I wonder if we can solve the problem it is trying to solve some other way.
There was a problem hiding this comment.
Good catch - you're right that the original implementation had unbounded growth. This has been fixed: BsvCallback.segmentRemoved now calls metadataRemovedSegmentIds.remove(segmentId) when BSV
confirms a segment is fully gone, so the set drains as the lag window closes.
It was done to guard against a race condition between the two callback sources. When MSV marks a segment as removed (coordinator says it's unused), we immediately remove it from the merged timeline. However, BSV still may hold the segment because the historical hasn't unloaded it yet.
| @Test | ||
| public void testTimelineContainsAllAvailableSegments() | ||
| { | ||
| final var view = broker.bindings().getInstance(BrokerServerViewOfLatestUsedSegments.class); |
There was a problem hiding this comment.
It's more in the spirit of an embedded test to interact with the services through HTTP APIs rather than by directly interacting with Guice-provided Java objects. Perhaps you can introduce a runtime property to bind the TimelineServerView to this, and then test the resulting behavior.
This patch solves the issue that the Broker's existing server view only knows about segments that are actually loaded on a historical. If a segment exists in coordinator metadata but hasn't been loaded yet — or is being replaced by compaction — the Broker is blind to it. This new
BrokerServerViewOfLatestUsedSegmentsview fixes that by merging both sources.Details:
BrokerServerViewOfLatestUsedSegmentson the Broker. This class implements TimelineServerView and maintains a full timeline of all segments (available on historicals, available on indexers/peons, not available). Here we keep a single timeline for each datasource. When a callback is received for a segment (with or without server) added or removed, we simply add it to the respective datasource timeline.MetadataSegmentViewfinishes a poll from the Coordinator, it determines the delta of segments added/removed so that it can invoke the new callback accordingly.timeline.lookup()will always return the latest complete partition set in the timeline object, irrespective of whether those segments are available or not.metadataSegmentCacheEnableis false, the new server view class will throw an exception when any method is invoked since we do not have info of unavailable segments.This PR has: