Skip to content

Conversation

@wzhero1
Copy link

@wzhero1 wzhero1 commented Dec 22, 2025

Purpose

This PR adds support for accessing system fields (specifically sequence_number) in AuditLogTable and BinlogTable.

Currently, downstream consumers often need the sequence_number to ensure strict ordering or to implement exactly-once processing when reading from audit logs or binlogs. However, these system tables previously did not expose this field.

This change implements KeyValueSystemFieldsRecordReader to enable the projection of system fields for KeyValue records, allowing users to query sequence_number directly from audit log and binlog tables.

Tests

  • Added IncrementalReadSystemFieldsTest to verify functionality.
  • Updated AuditLogTableTest and BinlogTableTest to cover sequence_number read cases.

API and Format

  • No change to storage format.
  • No public API change.

Documentation

  • docs/content/concepts/system-tables.md needs to change

@wzhero1 wzhero1 force-pushed the auditlog-support-seqnum branch 9 times, most recently from ed453a8 to eae1a70 Compare December 25, 2025 03:40
@wzhero1 wzhero1 force-pushed the auditlog-support-seqnum branch from eae1a70 to 7ab955a Compare December 29, 2025 02:23
Copy link
Contributor

@yunfengzhou-hub yunfengzhou-hub left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left some comments as below.

@Nullable int[] projection) {
if (systemFieldExtractors.isEmpty()) {
// No system fields, use the default unwrap logic
return KeyValueTableRead.unwrap(reader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to merge this method into KeyValueTableRead#unwrap. As a decorator/wrapper to this method, extending the original method might be more beneficial to extensibility.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I've moved the logic into KeyValueTableRead.java. The method structure is now:

  • unwrap(RecordReader<KeyValue> reader, List<SpecialFieldExtractor> specialFieldExtractors, @Nullable int[] projection) as public method for extensibility
  • unwrap(RecordReader<KeyValue> reader) as private method for internal use

checkAnswer(
sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 'tag2') ORDER BY id"),
Seq(Row("-D", 999)))
Seq(Row("-D", 100002L, 999)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100002L is the value of id column, rather than sequence number. Not sure whether it is correct here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

999 is the value of id, 100002L is the sequence number

}

/** Creates a table with changelog producer enabled. */
private FileStoreTable createChangelogTable(String tableName) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only used in one place. So compared with introducing this method, it might be better to preserve the original code structure and commit history.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it's currently used in one place, I prefer keeping this helper method for single responsibility and better extensibility. It encapsulates the changelog table creation logic cleanly, making future test additions easier without duplicating configuration code.

*/
@Nullable
public static SystemFieldExtractor getExtractor(String fieldName) {
return EXTRACTOR_REGISTRY.get(fieldName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method only checks field names. It might be better to also check field data types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For special fields (like _ROWKIND, _SEQUENCE_NUMBER), field name matching is sufficient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants