", "([B[BJZ)V");
+ if (ctor == NULL)
+ goto cleanup;
+
+ /* Create CommitOp[] array */
+ jobjectArray opsArray = (*env)->NewObjectArray(env, num_ops, commitOpClass, NULL);
+ if (opsArray == NULL)
+ goto cleanup;
+
+ for (int i = 0; i < num_ops; i++)
+ {
+ jbyteArray jkey = (*env)->NewByteArray(env, (jsize)ops[i].key_size);
+ (*env)->SetByteArrayRegion(env, jkey, 0, (jsize)ops[i].key_size, (jbyte *)ops[i].key);
+
+ jbyteArray jvalue = NULL;
+ if (ops[i].value != NULL && ops[i].value_size > 0)
+ {
+ jvalue = (*env)->NewByteArray(env, (jsize)ops[i].value_size);
+ (*env)->SetByteArrayRegion(env, jvalue, 0, (jsize)ops[i].value_size,
+ (jbyte *)ops[i].value);
+ }
+
+ jobject opObj = (*env)->NewObject(env, commitOpClass, ctor, jkey, jvalue,
+ (jlong)ops[i].ttl,
+ ops[i].is_delete ? JNI_TRUE : JNI_FALSE);
+ (*env)->SetObjectArrayElement(env, opsArray, i, opObj);
+
+ (*env)->DeleteLocalRef(env, opObj);
+ (*env)->DeleteLocalRef(env, jkey);
+ if (jvalue != NULL)
+ (*env)->DeleteLocalRef(env, jvalue);
+ }
+
+ /* Call CommitHook.onCommit(CommitOp[], long) */
+ jclass hookClass = (*env)->GetObjectClass(env, hctx->hook_obj);
+ jmethodID onCommit =
+ (*env)->GetMethodID(env, hookClass, "onCommit", "([Lcom/tidesdb/CommitOp;J)I");
+
+ ret = (*env)->CallIntMethod(env, hctx->hook_obj, onCommit, opsArray, (jlong)commit_seq);
+
+ if ((*env)->ExceptionCheck(env))
+ {
+ (*env)->ExceptionClear(env);
+ ret = -1;
+ }
+
+ (*env)->DeleteLocalRef(env, opsArray);
+ (*env)->DeleteLocalRef(env, commitOpClass);
+ (*env)->DeleteLocalRef(env, hookClass);
+
+ if (need_detach)
+ (*hctx->jvm)->DetachCurrentThread(hctx->jvm);
+
+ return (int)ret;
+
+cleanup:
+ if ((*env)->ExceptionCheck(env))
+ (*env)->ExceptionClear(env);
+ if (need_detach)
+ (*hctx->jvm)->DetachCurrentThread(hctx->jvm);
+ return -1;
+}
+
+JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamily_nativeSetCommitHook(JNIEnv *env, jclass cls,
+ jlong cfHandle,
+ jobject hook,
+ jlong oldCtxHandle)
+{
+ tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)cfHandle;
+
+ /* Free old context if present */
+ if (oldCtxHandle != 0)
+ {
+ java_hook_ctx_t *old_ctx = (java_hook_ctx_t *)(uintptr_t)oldCtxHandle;
+ (*env)->DeleteGlobalRef(env, old_ctx->hook_obj);
+ free(old_ctx);
+ }
+
+ /* If hook is NULL, clear the hook */
+ if (hook == NULL)
+ {
+ int result = tidesdb_cf_set_commit_hook(cf, NULL, NULL);
+ if (result != TDB_SUCCESS)
+ {
+ throwTidesDBException(env, result, getErrorMessage(result));
+ }
+ return 0;
+ }
+
+ /* Allocate new context */
+ java_hook_ctx_t *ctx = (java_hook_ctx_t *)malloc(sizeof(java_hook_ctx_t));
+ if (ctx == NULL)
+ {
+ throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to allocate commit hook context");
+ return 0;
+ }
+
+ (*env)->GetJavaVM(env, &ctx->jvm);
+ ctx->hook_obj = (*env)->NewGlobalRef(env, hook);
+
+ int result = tidesdb_cf_set_commit_hook(cf, java_commit_hook_trampoline, ctx);
+ if (result != TDB_SUCCESS)
+ {
+ (*env)->DeleteGlobalRef(env, ctx->hook_obj);
+ free(ctx);
+ throwTidesDBException(env, result, getErrorMessage(result));
+ return 0;
+ }
+
+ return (jlong)(uintptr_t)ctx;
+}
+
JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamily_nativeRangeCost(JNIEnv *env, jclass cls,
jlong handle,
jbyteArray keyA,
diff --git a/src/main/java/com/tidesdb/ColumnFamily.java b/src/main/java/com/tidesdb/ColumnFamily.java
index 944c0ed..cb2708b 100644
--- a/src/main/java/com/tidesdb/ColumnFamily.java
+++ b/src/main/java/com/tidesdb/ColumnFamily.java
@@ -30,6 +30,7 @@ public class ColumnFamily {
private final long nativeHandle;
private final String name;
+ private long commitHookCtxHandle = 0;
ColumnFamily(long nativeHandle, String name) {
this.nativeHandle = nativeHandle;
@@ -146,6 +147,35 @@ public double rangeCost(byte[] keyA, byte[] keyB) throws TidesDBException {
return nativeRangeCost(nativeHandle, keyA, keyB);
}
+ /**
+ * Sets a commit hook (Change Data Capture) for this column family.
+ * The hook fires synchronously after every transaction commit, receiving the full
+ * batch of committed operations atomically. Keep the callback fast to avoid
+ * stalling writers.
+ *
+ * Hooks are runtime-only and not persisted. After a database restart,
+ * hooks must be re-registered by the application.
+ *
+ * @param hook the commit hook callback
+ * @throws TidesDBException if the hook cannot be set
+ */
+ public void setCommitHook(CommitHook hook) throws TidesDBException {
+ if (hook == null) {
+ throw new IllegalArgumentException("Hook cannot be null, use clearCommitHook() instead");
+ }
+ commitHookCtxHandle = nativeSetCommitHook(nativeHandle, hook, commitHookCtxHandle);
+ }
+
+ /**
+ * Clears the commit hook for this column family.
+ * After this call, no further commit callbacks will fire.
+ *
+ * @throws TidesDBException if the hook cannot be cleared
+ */
+ public void clearCommitHook() throws TidesDBException {
+ commitHookCtxHandle = nativeSetCommitHook(nativeHandle, null, commitHookCtxHandle);
+ }
+
long getNativeHandle() {
return nativeHandle;
}
@@ -159,4 +189,5 @@ private static native void nativeUpdateRuntimeConfig(long handle, long writeBuff
int skipListMaxLevel, float skipListProbability, double bloomFPR, int indexSampleRatio,
int syncMode, long syncIntervalUs, boolean persistToDisk) throws TidesDBException;
private static native double nativeRangeCost(long handle, byte[] keyA, byte[] keyB) throws TidesDBException;
+ private static native long nativeSetCommitHook(long handle, CommitHook hook, long oldCtxHandle) throws TidesDBException;
}
diff --git a/src/main/java/com/tidesdb/CommitHook.java b/src/main/java/com/tidesdb/CommitHook.java
new file mode 100644
index 0000000..35903a6
--- /dev/null
+++ b/src/main/java/com/tidesdb/CommitHook.java
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright (C) TidesDB
+ *
+ * Original Author: Alex Gaetano Padula
+ *
+ * Licensed under the Mozilla Public License, v. 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.tidesdb;
+
+/**
+ * Callback interface for commit hooks (Change Data Capture).
+ * Invoked synchronously after a transaction commits to a column family.
+ * The hook receives the full batch of committed operations atomically.
+ */
+@FunctionalInterface
+public interface CommitHook {
+
+ /**
+ * Called after a transaction commits to a column family.
+ *
+ * @param ops array of committed operations
+ * @param commitSeq monotonic commit sequence number
+ * @return 0 on success, non-zero on failure (logged as warning, does not roll back)
+ */
+ int onCommit(CommitOp[] ops, long commitSeq);
+}
diff --git a/src/main/java/com/tidesdb/CommitOp.java b/src/main/java/com/tidesdb/CommitOp.java
new file mode 100644
index 0000000..80cc9a6
--- /dev/null
+++ b/src/main/java/com/tidesdb/CommitOp.java
@@ -0,0 +1,82 @@
+/**
+ *
+ * Copyright (C) TidesDB
+ *
+ * Original Author: Alex Gaetano Padula
+ *
+ * Licensed under the Mozilla Public License, v. 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.tidesdb;
+
+/**
+ * Represents a single operation in a committed transaction batch.
+ * Passed to the commit hook callback after a transaction commits.
+ */
+public class CommitOp {
+
+ private final byte[] key;
+ private final byte[] value;
+ private final long ttl;
+ private final boolean delete;
+
+ /**
+ * Creates a new CommitOp.
+ *
+ * @param key the key
+ * @param value the value (null for deletes)
+ * @param ttl time-to-live (-1 for no expiry)
+ * @param delete true if this is a delete operation
+ */
+ public CommitOp(byte[] key, byte[] value, long ttl, boolean delete) {
+ this.key = key;
+ this.value = value;
+ this.ttl = ttl;
+ this.delete = delete;
+ }
+
+ /**
+ * Gets the key for this operation.
+ *
+ * @return the key bytes
+ */
+ public byte[] getKey() {
+ return key;
+ }
+
+ /**
+ * Gets the value for this operation.
+ *
+ * @return the value bytes, or null for delete operations
+ */
+ public byte[] getValue() {
+ return value;
+ }
+
+ /**
+ * Gets the TTL for this operation.
+ *
+ * @return the TTL in seconds since epoch, or -1 for no expiry
+ */
+ public long getTtl() {
+ return ttl;
+ }
+
+ /**
+ * Returns whether this is a delete operation.
+ *
+ * @return true if delete, false if put
+ */
+ public boolean isDelete() {
+ return delete;
+ }
+}
diff --git a/src/test/java/com/tidesdb/TidesDBTest.java b/src/test/java/com/tidesdb/TidesDBTest.java
index 9f6955d..d7494ba 100644
--- a/src/test/java/com/tidesdb/TidesDBTest.java
+++ b/src/test/java/com/tidesdb/TidesDBTest.java
@@ -24,6 +24,9 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.*;
@@ -909,6 +912,158 @@ void testRangeCostNullKeys() throws TidesDBException {
}
}
+ @Test
+ @Order(25)
+ void testCommitHookBasic() throws TidesDBException {
+ Config config = Config.builder(tempDir.resolve("testdb23").toString())
+ .numFlushThreads(2)
+ .numCompactionThreads(2)
+ .logLevel(LogLevel.INFO)
+ .blockCacheSize(64 * 1024 * 1024)
+ .maxOpenSSTables(256)
+ .build();
+
+ try (TidesDB db = TidesDB.open(config)) {
+ ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig();
+ db.createColumnFamily("test_cf", cfConfig);
+
+ ColumnFamily cf = db.getColumnFamily("test_cf");
+
+ List received = new ArrayList<>();
+ AtomicLong lastSeq = new AtomicLong();
+
+ cf.setCommitHook((ops, commitSeq) -> {
+ received.add(ops);
+ lastSeq.set(commitSeq);
+ return 0;
+ });
+
+ // Commit a put operation
+ try (Transaction txn = db.beginTransaction()) {
+ txn.put(cf, "key1".getBytes(), "value1".getBytes());
+ txn.commit();
+ }
+
+ // Hook fires synchronously, so data is available immediately
+ assertEquals(1, received.size());
+ assertEquals(1, received.get(0).length);
+ assertArrayEquals("key1".getBytes(), received.get(0)[0].getKey());
+ assertArrayEquals("value1".getBytes(), received.get(0)[0].getValue());
+ assertFalse(received.get(0)[0].isDelete());
+ assertTrue(lastSeq.get() > 0);
+
+ cf.clearCommitHook();
+ }
+ }
+
+ @Test
+ @Order(26)
+ void testCommitHookMultipleOps() throws TidesDBException {
+ Config config = Config.builder(tempDir.resolve("testdb24").toString())
+ .numFlushThreads(2)
+ .numCompactionThreads(2)
+ .logLevel(LogLevel.INFO)
+ .blockCacheSize(64 * 1024 * 1024)
+ .maxOpenSSTables(256)
+ .build();
+
+ try (TidesDB db = TidesDB.open(config)) {
+ ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig();
+ db.createColumnFamily("test_cf", cfConfig);
+
+ ColumnFamily cf = db.getColumnFamily("test_cf");
+
+ List received = new ArrayList<>();
+
+ cf.setCommitHook((ops, commitSeq) -> {
+ received.add(ops);
+ return 0;
+ });
+
+ // Commit multiple operations in one transaction
+ try (Transaction txn = db.beginTransaction()) {
+ txn.put(cf, "key1".getBytes(), "value1".getBytes());
+ txn.put(cf, "key2".getBytes(), "value2".getBytes());
+ txn.delete(cf, "key1".getBytes());
+ txn.commit();
+ }
+
+ // Should fire once with all operations
+ assertEquals(1, received.size());
+ assertEquals(3, received.get(0).length);
+
+ // Last op should be a delete
+ assertTrue(received.get(0)[2].isDelete());
+ assertArrayEquals("key1".getBytes(), received.get(0)[2].getKey());
+
+ cf.clearCommitHook();
+ }
+ }
+
+ @Test
+ @Order(27)
+ void testCommitHookClear() throws TidesDBException {
+ Config config = Config.builder(tempDir.resolve("testdb25").toString())
+ .numFlushThreads(2)
+ .numCompactionThreads(2)
+ .logLevel(LogLevel.INFO)
+ .blockCacheSize(64 * 1024 * 1024)
+ .maxOpenSSTables(256)
+ .build();
+
+ try (TidesDB db = TidesDB.open(config)) {
+ ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig();
+ db.createColumnFamily("test_cf", cfConfig);
+
+ ColumnFamily cf = db.getColumnFamily("test_cf");
+
+ List received = new ArrayList<>();
+
+ cf.setCommitHook((ops, commitSeq) -> {
+ received.add(ops);
+ return 0;
+ });
+
+ // First commit — hook should fire
+ try (Transaction txn = db.beginTransaction()) {
+ txn.put(cf, "key1".getBytes(), "value1".getBytes());
+ txn.commit();
+ }
+ assertEquals(1, received.size());
+
+ // Clear the hook
+ cf.clearCommitHook();
+
+ // Second commit — hook should NOT fire
+ try (Transaction txn = db.beginTransaction()) {
+ txn.put(cf, "key2".getBytes(), "value2".getBytes());
+ txn.commit();
+ }
+ assertEquals(1, received.size(), "Hook should not fire after clearing");
+ }
+ }
+
+ @Test
+ @Order(28)
+ void testCommitHookNullThrows() throws TidesDBException {
+ Config config = Config.builder(tempDir.resolve("testdb26").toString())
+ .numFlushThreads(2)
+ .numCompactionThreads(2)
+ .logLevel(LogLevel.INFO)
+ .blockCacheSize(64 * 1024 * 1024)
+ .maxOpenSSTables(256)
+ .build();
+
+ try (TidesDB db = TidesDB.open(config)) {
+ ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig();
+ db.createColumnFamily("test_cf", cfConfig);
+
+ ColumnFamily cf = db.getColumnFamily("test_cf");
+
+ assertThrows(IllegalArgumentException.class, () -> cf.setCommitHook(null));
+ }
+ }
+
@Test
@Order(21)
void testTransactionResetNullIsolation() throws TidesDBException {