Skip to content

PHOENIX-7794 Eventually Consistent Global Secondary Indexes#2401

Open
virajjasani wants to merge 12 commits intoapache:masterfrom
virajjasani:PHOENIX-7794-master
Open

PHOENIX-7794 Eventually Consistent Global Secondary Indexes#2401
virajjasani wants to merge 12 commits intoapache:masterfrom
virajjasani:PHOENIX-7794-master

Conversation

@virajjasani
Copy link
Copy Markdown
Contributor

@virajjasani virajjasani commented Apr 7, 2026

Jira: PHOENIX-7794

Achieving consistently low latency at any scale is a major challenge for many critical web applications/services. This requires such applications to choose the right database. Distributed NoSQL databases like Apache Phoenix offer the scalability and throughput required for such critical workloads. However, when it comes to global secondary indexes, Phoenix provides strongly consistent (synchronous) indexes. Here, index updates are tightly coupled with the data table updates, meaning as the number of indexes grows, write latency on the data table can increase depending on the network overhead, and/or WAL replica availability of each index table. As a result, applications with high write volumes and multiple indexes can experience some throughput and availability degradation.

The purpose of this Jira is to provide the Eventually Consistent Global Secondary Indexes. Here, index updates are managed separately from the data table updates. This keeps write latency on the data table consistently lower regardless of the number of indexes created on the data table. This allows high write volume applications to take advantage of the global secondary indexes in Phoenix without slowing down their writes, while accepting eventual consistency of the indexes.

The design document attached to the Jira describes several possible approaches to achieve this, while finalizing two approaches to provide eventually consistent indexes.

Requirements for Eventually Consistent Indexes

  1. Users should be able to create eventually consistent indexes for both covered and uncovered indexes.
  2. The SQL statement should include the CONSISTENCY clause to determine whether the given covered or uncovered index is strongly consistent or eventually consistent. By default, consider the given index as strongly consistent. CREATE INDEX <index-name> ON <data-table> ( <col1>,... <colN>) INCLUDE (<col1>,...<colN>) CONSISTENCY = EVENTUAL
  3. Users should be able to seamlessly update the CONSISTENCY property of the given index from strong to eventual and vice versa using ALTER INDEX SQL statement. (although the change of consistency update depends on the UPDATE_CACHE_FREQUENCY used at the table level) ALTER INDEX <index-name> ON <data-table> CONSISTENCY = EVENTUAL
  4. Depending on the use cases, data tables can consist of the mix of zero or more strongly consistent indexes and zero or more eventually consistent indexes.
  5. Index verification MapReduce jobs should work for the eventually consistent global secondary indexes similar to how they work for the strongly consistent global secondary indexes.
  6. Concurrent mutations on the data table should work for eventually consistent indexes.
  7. Data table mutations need to produce and store the time ordered metadata (change records) for consumers to replay them and perform the index mutation RPCs.
  8. Updates to eventually consistent indexes should mirror the pre-index and post-index update semantics of strongly consistent updates. However, the separate RPCs for pre-index and post-index updates can be combined into a single RPC call. For instance, if the data table update failed, the consumer should update corresponding indexes with unverified rows (pre-index updates) only. If the data table update succeeded, the consumer should update corresponding indexes with verified rows (post-index update) only. The consumer does not need to perform both pre and post index update RPCs on the indexes.
  9. To improve the scale of index updates, mutations on indexes should be executed by consuming ordered change records per table region. This allows for parallel processing across all table regions.
  10. Once the data table region splits or merges into new daughter regions, any remaining ordered change records from the closed parent region should be processed before consuming newly generated change records for the new daughter regions.

Design doc

Two approaches are finalized for the eventually consistent global secondary indexes:

phoenix.index.cdc.mutation.serialize controls which approach is used for implementing eventually consistent global secondary indexes.

Approach 1: Serialized mutations (value = true)

During preBatchMutate(), IndexRegionObserver generates index mutations for each data table mutation and serializes them into a Protobuf IndexMutations message. This serialized payload is written as a column value in the CDC index table row alongside the CDC event. The IndexCDCConsumer later reads these pre-computed mutations from the CDC index, deserializes them, and applies them directly to the index table(s). In this approach, the consumer does not need to understand index structure or re-derive mutations — it simply replays what was already computed on the write path. The trade-off is increased CDC index row size due to the serialized mutation payload, and additional write IO on the CDC index table.

Approach 2: Generated mutations from data row states (default, value = false)

During preBatchMutate(), IndexRegionObserver writes only a lightweight CDC index entry without serialized index mutations. Instead, the CDC event is created with the DATA_ROW_STATE cdc scope. When the IndexCDCConsumer processes these events, it reads the CDC index rows which trigger a server-side scan of the data table to reconstruct the before-image currentDataRowState and after-image nextDataRowState of the data row at the change timestamp. These raw row states are returned as a Protobuf DataRowStates message. The consumer then provides these states into generateIndexMutationsForRow() — the same utility used by IndexRegionObserver#prepareIndexMutations on the write path — to derive index mutations at consume time. This approach keeps CDC index rows small, avoids additional write IO, and generates mutations based on the current index definition, but requires an additional data table read per CDC event and is sensitive to data visibility timing. Make sure max lookback age is long enough to retain before and after images of the row.

Use Approach 2 (serialize = false, default) to minimize write IO: no serialized mutations are written to the CDC index, keeping CDC index rows small and write latency uniform. The trade-off is higher read IO at consume time — the consumer performs an additional data table point-lookup with a raw scan per CDC event to reconstruct row states.

Use Approach 1 (serialize = true) to minimize read IO: the consumer reads pre-computed mutations from the CDC index and applies them directly, with no data table scan required at consume time. The trade-off is higher write IO — serialized index mutations are written alongside each CDC index entry, increasing CDC index row size and write-path latency. Although CDC index is expected to have TTL same as the data table max lookback age.

@palashc palashc self-requested a review April 14, 2026 19:00
updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
return newLastTimestamp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include a metric for lag. We can publish lag = EnvironmentEdgeManager.currentTimeMillis() - newLastTimestamp which will be sort if like ageOfLastApplied metric we have in hbase replication. Ofcourse this would include timestampBufferMs + pollIntervalMs but if lag goes beyond 11s (using defaults) we will able to tell which consumer might need tuning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me see if i can incorporate now or as follow-up Jira. Initially, I wanted to avoid adding metrics in this PR but for perf evaluation, checking metrics was also necessary so I added some.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having lag would definitely be helpful from the beginning., wdyt?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me publish this, I wonder what would be appropriate name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like cdcConsumerLagMs should be good I guess?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious to see this metric in the next perf run!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have metrics for both: per table and per jvm, but per table it cannot be aggregated. Metrics aggregation is only done by various clients.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay so we will be able to see the lag for a table as well, that's good

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the current metrics already do have table level values too. However, they are exposed only from the server:

  private void incrementTableSpecificHistogram(String baseName, String tableName, long t) {
    MetricHistogram tableHistogram =
      getMetricsRegistry().getHistogram(getMetricName(baseName, tableName));
    tableHistogram.add(t);
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp,
PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS);
}
return newLastTimestamp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric for lag can be returned from here as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +1001 to +1006
if (schemaName == null || schemaName.isEmpty()) {
cdcObjectName = "\"" + cdcObjectName + "\"";
} else {
cdcObjectName =
"\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + cdcObjectName + "\"";
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use SchemaUtil utility methods?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static String getEscapedTableName(String schemaName, String tableName) {
    if (schemaName == null || schemaName.length() == 0) {
      return "\"" + tableName + "\"";
    }
    return "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + tableName + "\"";
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return;
}
if (!hasEventuallyConsistentIndexes()) {
LOG.trace("No eventually consistent indexes found for table {}. Exiting consumer.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if an EC index is created later? How do we handle that? Should the consumer periodically check instead of returning?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting one - when EC index is created later, the current region's change logs cannot be processed. Instead, it will wait for all the current regions of the data table to either get split, merged or moved to other server and only after that, the change logs from their parent regions or its own region (from previous server) will start getting processed.

I can add this in the design doc.

Copy link
Copy Markdown
Contributor

@palashc palashc Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will wait for all the current regions of the data table to either get split, merged or moved

Does that mean the lag in the consumer can be very large in that case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean the lag in the consumer can be very large in that case?

Yes, that's the penalty of creating index later. On the other hand, we don't want to refresh PTable asynchronously by making too many syscat rpc calls.

In reality this should not matter because when client creates index later, the index (SC or EC) is always asynchronous. It can take several days to synchronize the index data using mapreduce jobs. So client is not expected to start using index immediately anyway, regardless of the type of the index.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can take several days to synchronize the index data using mapreduce jobs.

That's a good point, makes sense. I guess we can keep it this way - you already mentioned you will take care of updating the design for highlighting this point.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In production, the gap between index creation and the next region event should be not very huge I guess?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. In prod, regions split, merge and move all the time.


doneSignal.await(60, TimeUnit.SECONDS);
if (eventual) {
Thread.sleep(35000);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sleep with large times is going to make tests slow (and flaky but I can guess we cannot do anything about that in this case). Since we are adding multiple new tests which are going to need their own mini cluster, maybe we can have waitFor(predicate, timeout) kind of polling instead to atleast let the test make progress as soon as it can.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sleep with large times is going to make tests slow (and flaky but I can guess we cannot do anything about that in this case). Since we are adding multiple new tests which are going to need their own mini cluster, maybe we can have waitFor(predicate, timeout) kind of polling instead to atleast let the test make progress as soon as it can.

I have added such polling in other set of concurrent mutations extended index IT tests, but here I could not add because this test also needs to be used by synchronous index, so we want to keep the changes to the original tests as less as possible.

lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, encodedRegionName,
lastProcessedTimestamp, false);
}
if (lastProcessedTimestamp == previousTimestamp) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could expose the currentTime in a metric like cdcLastEmptyPollTimestamp, the wall-clock time of the last poll that returned no new CDC rows. We can report this metric on the table level as minimum across all regions of this table.

The tests can then use this to decide whether the consumers for a table have caught up. If latest mutation in the test was at time Tm and the minimum cdcLastEmptyPollTimestamp for the table is Tc > Tm, then we can say that all consumers have seen the world after the test mutations and found nothing new. This way we can avoid sleeping for large time periods.

I worked out an example of this with Claude, let me know if I am missing something.

Setup

10:00:00  UPSERT INTO T VALUES ('Bob', 42)     → lands in R1
10:00:00  UPSERT INTO T VALUES ('Zoe', 99)     → lands in R2
10:00:00  conn.commit()
10:00:00  afterWrites = now() = 10:00:00

Consumer Timeline

Time        R1 consumer                  R2 consumer                  min()
─────────── ──────────────────────────── ──────────────────────────── ──────────
09:59:58    empty poll → reports 09:59:58  empty poll → reports 09:59:58  09:59:58
10:00:00    [writes arrive]              [writes arrive]
10:00:01    picks up Bob's CDC row       sleeping (poll interval)       09:59:58
10:00:02    processes, writes to index   picks up Zoe's CDC row         09:59:58
10:00:03    empty poll → reports 10:00:03  processes, writes to index   09:59:58
10:00:04    sleeping                     empty poll → reports 10:00:04  10:00:03

Test Polling

10:00:01  min = 09:59:58  < afterWrites(10:00:00)  → keep waiting
10:00:02  min = 09:59:58  < afterWrites(10:00:00)  → keep waiting
10:00:03  min = 09:59:58  < afterWrites(10:00:00)  → keep waiting  (R1 caught up but R2 hasn't)
10:00:04  min = 10:00:03  > afterWrites(10:00:00)  → DONE, both caught up

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can report this metric on the table level as minimum across all regions of this table.

This might need some bookkeeping in MetricsIndexCDCConsumerSourceImpl to keep track of cdcLastEmptyPollTimestamp across all regions of a table and only publishing the minimum.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value across all regions cannot be provided on the given table without complicating the metrics framework, we don't have such metrics, is that correct? We publish metrics per JVM, not per table.

If this is only for test purpose, we don't need it, we have approach of poll-wait.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as tests are concerned, I have been running them for very long time https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-mulitbranch/job/tmp-ec/

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests maybe be fine today but they need not be in the future or in a different CI env. We already have so many flappers which are mainly due to waiting on async stuff (especially in internal forks). It will be good to avoid adding more such tests. Maybe we can expose this to an in-memory map, which can be consumed in the tests? We can re-think what more we can expose as a meaningful metric for the consumer in a separate jira.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the point is we can't get rid of polling but we can decide what to poll on. Verifying data correctness in a loop can be expensive - having a lightweight signal would be really helpful for both phoenix tests and downstream projects. Fine to track as a separate JIRA if not in scope for this PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think some sort of query check on SYSTEM.IDX_CDC_TRACKER could also be a helpful signal for tests?

SELECT MIN(LAST_TIMESTAMP) FROM SYSTEM.IDX_CDC_TRACKER
WHERE TABLE_NAME = ? AND STATUS = 'i'

MIN(LAST_TIMESTAMP) > afterWritesTimestamp means consumers have processed everything a test wrote?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it can be, but why we want to make tests more flaky by complicating it with timestamp comparison rather than using simple "wait and compare" approach? I do see more chances of tests getting flaky when we rely on timestamp based comparisons - both in hbase and phoenix. That's why we have tests where we inject custom EnvironmentEdgeManager just to make sure we can expect a specific timestamp. I don't think we need timestamp based comparison. As for phoenix-adapters, the test results comparison is quite simple with wait-and-compare.

Copy link
Copy Markdown
Contributor Author

@virajjasani virajjasani Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@palashc we also don't publish timestamp values as metrics usually because it can be bit expensive to represent or visualize? we can publish lag or timestamp diff as histogram, or success/failure events as counts etc. I wonder how would publishing timestamp help from metrics visualization viewpoint even if we decide to publish it eventually, wdyt?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how would publishing timestamp help from metrics visualization viewpoint even if we decide to publish it eventually

I think there is some confusion. I agree we do not need to push this as a metric, but using the timestamps from tracker table can be a utility for tests.

why we want to make tests more flaky by complicating it with timestamp comparison

I think using timestamps from the tracker table should not be flaky in this case. Those timestamps would not be wall clock times, right? Using the tracker table involves querying timestamp values that the consumer itself wrote (the cdc row timestamp), from the same EnvironmentEdgeManager - so there should be no concern of clock skew or different jvm etc. It can be a lightweight pre-check before running the expensive index verification.

But if you think tests are (and will be) stable in CI envs, we can keep this for separate jira.

Copy link
Copy Markdown
Contributor

@palashc palashc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, pending build results

Thank you @virajjasani for addressing review comments and helpful discussion. Great work, as always!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants