PHOENIX-7794 Eventually Consistent Global Secondary Indexes#2401
PHOENIX-7794 Eventually Consistent Global Secondary Indexes#2401virajjasani wants to merge 12 commits intoapache:masterfrom
Conversation
| updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, | ||
| PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); | ||
| } | ||
| return newLastTimestamp; |
There was a problem hiding this comment.
We should include a metric for lag. We can publish lag = EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp which will be sort if like ageOfLastApplied metric we have in hbase replication. Ofcourse this would include timestampBufferMs + pollIntervalMs but if lag goes beyond 11s (using defaults) we will able to tell which consumer might need tuning.
There was a problem hiding this comment.
Yes, let me see if i can incorporate now or as follow-up Jira. Initially, I wanted to avoid adding metrics in this PR but for perf evaluation, checking metrics was also necessary so I added some.
There was a problem hiding this comment.
Having lag would definitely be helpful from the beginning., wdyt?
There was a problem hiding this comment.
Let me publish this, I wonder what would be appropriate name.
There was a problem hiding this comment.
something like cdcConsumerLagMs should be good I guess?
There was a problem hiding this comment.
curious to see this metric in the next perf run!
There was a problem hiding this comment.
We have metrics for both: per table and per jvm, but per table it cannot be aggregated. Metrics aggregation is only done by various clients.
There was a problem hiding this comment.
Oh okay so we will be able to see the lag for a table as well, that's good
There was a problem hiding this comment.
All the current metrics already do have table level values too. However, they are exposed only from the server:
private void incrementTableSpecificHistogram(String baseName, String tableName, long t) {
MetricHistogram tableHistogram =
getMetricsRegistry().getHistogram(getMetricName(baseName, tableName));
tableHistogram.add(t);
}
| updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, | ||
| PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); | ||
| } | ||
| return newLastTimestamp; |
There was a problem hiding this comment.
Metric for lag can be returned from here as well.
| if (schemaName == null || schemaName.isEmpty()) { | ||
| cdcObjectName = "\"" + cdcObjectName + "\""; | ||
| } else { | ||
| cdcObjectName = | ||
| "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + cdcObjectName + "\""; | ||
| } |
There was a problem hiding this comment.
Can we use SchemaUtil utility methods?
There was a problem hiding this comment.
public static String getEscapedTableName(String schemaName, String tableName) {
if (schemaName == null || schemaName.length() == 0) {
return "\"" + tableName + "\"";
}
return "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + tableName + "\"";
}
| return; | ||
| } | ||
| if (!hasEventuallyConsistentIndexes()) { | ||
| LOG.trace("No eventually consistent indexes found for table {}. Exiting consumer.", |
There was a problem hiding this comment.
What if an EC index is created later? How do we handle that? Should the consumer periodically check instead of returning?
There was a problem hiding this comment.
This is interesting one - when EC index is created later, the current region's change logs cannot be processed. Instead, it will wait for all the current regions of the data table to either get split, merged or moved to other server and only after that, the change logs from their parent regions or its own region (from previous server) will start getting processed.
I can add this in the design doc.
There was a problem hiding this comment.
it will wait for all the current regions of the data table to either get split, merged or moved
Does that mean the lag in the consumer can be very large in that case?
There was a problem hiding this comment.
Does that mean the lag in the consumer can be very large in that case?
Yes, that's the penalty of creating index later. On the other hand, we don't want to refresh PTable asynchronously by making too many syscat rpc calls.
In reality this should not matter because when client creates index later, the index (SC or EC) is always asynchronous. It can take several days to synchronize the index data using mapreduce jobs. So client is not expected to start using index immediately anyway, regardless of the type of the index.
There was a problem hiding this comment.
It can take several days to synchronize the index data using mapreduce jobs.
That's a good point, makes sense. I guess we can keep it this way - you already mentioned you will take care of updating the design for highlighting this point.
There was a problem hiding this comment.
In production, the gap between index creation and the next region event should be not very huge I guess?
There was a problem hiding this comment.
Yes, exactly. In prod, regions split, merge and move all the time.
|
|
||
| doneSignal.await(60, TimeUnit.SECONDS); | ||
| if (eventual) { | ||
| Thread.sleep(35000); |
There was a problem hiding this comment.
Using sleep with large times is going to make tests slow (and flaky but I can guess we cannot do anything about that in this case). Since we are adding multiple new tests which are going to need their own mini cluster, maybe we can have waitFor(predicate, timeout) kind of polling instead to atleast let the test make progress as soon as it can.
There was a problem hiding this comment.
Using sleep with large times is going to make tests slow (and flaky but I can guess we cannot do anything about that in this case). Since we are adding multiple new tests which are going to need their own mini cluster, maybe we can have waitFor(predicate, timeout) kind of polling instead to atleast let the test make progress as soon as it can.
I have added such polling in other set of concurrent mutations extended index IT tests, but here I could not add because this test also needs to be used by synchronous index, so we want to keep the changes to the original tests as less as possible.
| lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, encodedRegionName, | ||
| lastProcessedTimestamp, false); | ||
| } | ||
| if (lastProcessedTimestamp == previousTimestamp) { |
There was a problem hiding this comment.
Here we could expose the currentTime in a metric like cdcLastEmptyPollTimestamp, the wall-clock time of the last poll that returned no new CDC rows. We can report this metric on the table level as minimum across all regions of this table.
The tests can then use this to decide whether the consumers for a table have caught up. If latest mutation in the test was at time Tm and the minimum cdcLastEmptyPollTimestamp for the table is Tc > Tm, then we can say that all consumers have seen the world after the test mutations and found nothing new. This way we can avoid sleeping for large time periods.
I worked out an example of this with Claude, let me know if I am missing something.
Setup
10:00:00 UPSERT INTO T VALUES ('Bob', 42) → lands in R1
10:00:00 UPSERT INTO T VALUES ('Zoe', 99) → lands in R2
10:00:00 conn.commit()
10:00:00 afterWrites = now() = 10:00:00
Consumer Timeline
Time R1 consumer R2 consumer min()
─────────── ──────────────────────────── ──────────────────────────── ──────────
09:59:58 empty poll → reports 09:59:58 empty poll → reports 09:59:58 09:59:58
10:00:00 [writes arrive] [writes arrive]
10:00:01 picks up Bob's CDC row sleeping (poll interval) 09:59:58
10:00:02 processes, writes to index picks up Zoe's CDC row 09:59:58
10:00:03 empty poll → reports 10:00:03 processes, writes to index 09:59:58
10:00:04 sleeping empty poll → reports 10:00:04 10:00:03
Test Polling
10:00:01 min = 09:59:58 < afterWrites(10:00:00) → keep waiting
10:00:02 min = 09:59:58 < afterWrites(10:00:00) → keep waiting
10:00:03 min = 09:59:58 < afterWrites(10:00:00) → keep waiting (R1 caught up but R2 hasn't)
10:00:04 min = 10:00:03 > afterWrites(10:00:00) → DONE, both caught up
There was a problem hiding this comment.
We can report this metric on the table level as minimum across all regions of this table.
This might need some bookkeeping in MetricsIndexCDCConsumerSourceImpl to keep track of cdcLastEmptyPollTimestamp across all regions of a table and only publishing the minimum.
There was a problem hiding this comment.
The value across all regions cannot be provided on the given table without complicating the metrics framework, we don't have such metrics, is that correct? We publish metrics per JVM, not per table.
If this is only for test purpose, we don't need it, we have approach of poll-wait.
There was a problem hiding this comment.
As far as tests are concerned, I have been running them for very long time https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-mulitbranch/job/tmp-ec/
There was a problem hiding this comment.
Tests maybe be fine today but they need not be in the future or in a different CI env. We already have so many flappers which are mainly due to waiting on async stuff (especially in internal forks). It will be good to avoid adding more such tests. Maybe we can expose this to an in-memory map, which can be consumed in the tests? We can re-think what more we can expose as a meaningful metric for the consumer in a separate jira.
There was a problem hiding this comment.
I guess the point is we can't get rid of polling but we can decide what to poll on. Verifying data correctness in a loop can be expensive - having a lightweight signal would be really helpful for both phoenix tests and downstream projects. Fine to track as a separate JIRA if not in scope for this PR.
There was a problem hiding this comment.
Do you think some sort of query check on SYSTEM.IDX_CDC_TRACKER could also be a helpful signal for tests?
SELECT MIN(LAST_TIMESTAMP) FROM SYSTEM.IDX_CDC_TRACKER
WHERE TABLE_NAME = ? AND STATUS = 'i'
MIN(LAST_TIMESTAMP) > afterWritesTimestamp means consumers have processed everything a test wrote?
There was a problem hiding this comment.
Yes it can be, but why we want to make tests more flaky by complicating it with timestamp comparison rather than using simple "wait and compare" approach? I do see more chances of tests getting flaky when we rely on timestamp based comparisons - both in hbase and phoenix. That's why we have tests where we inject custom EnvironmentEdgeManager just to make sure we can expect a specific timestamp. I don't think we need timestamp based comparison. As for phoenix-adapters, the test results comparison is quite simple with wait-and-compare.
There was a problem hiding this comment.
@palashc we also don't publish timestamp values as metrics usually because it can be bit expensive to represent or visualize? we can publish lag or timestamp diff as histogram, or success/failure events as counts etc. I wonder how would publishing timestamp help from metrics visualization viewpoint even if we decide to publish it eventually, wdyt?
There was a problem hiding this comment.
I wonder how would publishing timestamp help from metrics visualization viewpoint even if we decide to publish it eventually
I think there is some confusion. I agree we do not need to push this as a metric, but using the timestamps from tracker table can be a utility for tests.
why we want to make tests more flaky by complicating it with timestamp comparison
I think using timestamps from the tracker table should not be flaky in this case. Those timestamps would not be wall clock times, right? Using the tracker table involves querying timestamp values that the consumer itself wrote (the cdc row timestamp), from the same EnvironmentEdgeManager - so there should be no concern of clock skew or different jvm etc. It can be a lightweight pre-check before running the expensive index verification.
But if you think tests are (and will be) stable in CI envs, we can keep this for separate jira.
palashc
left a comment
There was a problem hiding this comment.
+1, pending build results
Thank you @virajjasani for addressing review comments and helpful discussion. Great work, as always!
Jira: PHOENIX-7794
Achieving consistently low latency at any scale is a major challenge for many critical web applications/services. This requires such applications to choose the right database. Distributed NoSQL databases like Apache Phoenix offer the scalability and throughput required for such critical workloads. However, when it comes to global secondary indexes, Phoenix provides strongly consistent (synchronous) indexes. Here, index updates are tightly coupled with the data table updates, meaning as the number of indexes grows, write latency on the data table can increase depending on the network overhead, and/or WAL replica availability of each index table. As a result, applications with high write volumes and multiple indexes can experience some throughput and availability degradation.
The purpose of this Jira is to provide the Eventually Consistent Global Secondary Indexes. Here, index updates are managed separately from the data table updates. This keeps write latency on the data table consistently lower regardless of the number of indexes created on the data table. This allows high write volume applications to take advantage of the global secondary indexes in Phoenix without slowing down their writes, while accepting eventual consistency of the indexes.
The design document attached to the Jira describes several possible approaches to achieve this, while finalizing two approaches to provide eventually consistent indexes.
Requirements for Eventually Consistent Indexes
CREATE INDEX <index-name> ON <data-table> ( <col1>,... <colN>) INCLUDE (<col1>,...<colN>) CONSISTENCY = EVENTUALALTER INDEX <index-name> ON <data-table> CONSISTENCY = EVENTUALDesign doc
Two approaches are finalized for the eventually consistent global secondary indexes:
phoenix.index.cdc.mutation.serializecontrols which approach is used for implementing eventually consistent global secondary indexes.Approach 1: Serialized mutations (value = true)
During preBatchMutate(), IndexRegionObserver generates index mutations for each data table mutation and serializes them into a Protobuf IndexMutations message. This serialized payload is written as a column value in the CDC index table row alongside the CDC event. The IndexCDCConsumer later reads these pre-computed mutations from the CDC index, deserializes them, and applies them directly to the index table(s). In this approach, the consumer does not need to understand index structure or re-derive mutations — it simply replays what was already computed on the write path. The trade-off is increased CDC index row size due to the serialized mutation payload, and additional write IO on the CDC index table.
Approach 2: Generated mutations from data row states (default, value = false)
During preBatchMutate(), IndexRegionObserver writes only a lightweight CDC index entry without serialized index mutations. Instead, the CDC event is created with the DATA_ROW_STATE cdc scope. When the IndexCDCConsumer processes these events, it reads the CDC index rows which trigger a server-side scan of the data table to reconstruct the before-image
currentDataRowStateand after-imagenextDataRowStateof the data row at the change timestamp. These raw row states are returned as a Protobuf DataRowStates message. The consumer then provides these states into generateIndexMutationsForRow() — the same utility used by IndexRegionObserver#prepareIndexMutations on the write path — to derive index mutations at consume time. This approach keeps CDC index rows small, avoids additional write IO, and generates mutations based on the current index definition, but requires an additional data table read per CDC event and is sensitive to data visibility timing. Make sure max lookback age is long enough to retain before and after images of the row.Use Approach 2 (serialize = false, default) to minimize write IO: no serialized mutations are written to the CDC index, keeping CDC index rows small and write latency uniform. The trade-off is higher read IO at consume time — the consumer performs an additional data table point-lookup with a raw scan per CDC event to reconstruct row states.
Use Approach 1 (serialize = true) to minimize read IO: the consumer reads pre-computed mutations from the CDC index and applies them directly, with no data table scan required at consume time. The trade-off is higher write IO — serialized index mutations are written alongside each CDC index entry, increasing CDC index row size and write-path latency. Although CDC index is expected to have TTL same as the data table max lookback age.