diff --git a/pom.xml b/pom.xml index 2577888..39307c1 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.tidesdb tidesdb-java - 0.6.3 + 0.6.4 jar TidesDB Java diff --git a/src/main/c/com_tidesdb_TidesDB.c b/src/main/c/com_tidesdb_TidesDB.c index 4edc165..70af321 100644 --- a/src/main/c/com_tidesdb_TidesDB.c +++ b/src/main/c/com_tidesdb_TidesDB.c @@ -913,6 +913,158 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDBIterator_nativeFree(JNIEnv *env, } } +/** + * Context stored as the commit hook ctx pointer. + * Holds the JavaVM and a global reference to the Java CommitHook object. + */ +typedef struct +{ + JavaVM *jvm; + jobject hook_obj; /* global reference to CommitHook */ +} java_hook_ctx_t; + +/** + * C trampoline that bridges the tidesdb_commit_hook_fn callback to the Java CommitHook.onCommit + * method. Fires synchronously on the committing thread (which is always a Java thread). + */ +static int java_commit_hook_trampoline(const tidesdb_commit_op_t *ops, int num_ops, + uint64_t commit_seq, void *ctx) +{ + java_hook_ctx_t *hctx = (java_hook_ctx_t *)ctx; + JNIEnv *env = NULL; + int need_detach = 0; + + jint rc = (*hctx->jvm)->GetEnv(hctx->jvm, (void **)&env, JNI_VERSION_1_6); + if (rc == JNI_EDETACHED) + { + if ((*hctx->jvm)->AttachCurrentThread(hctx->jvm, (void **)&env, NULL) != 0) + return -1; + need_detach = 1; + } + else if (rc != JNI_OK) + { + return -1; + } + + jint ret = -1; + + /* Find CommitOp class and constructor: CommitOp(byte[], byte[], long, boolean) */ + jclass commitOpClass = (*env)->FindClass(env, "com/tidesdb/CommitOp"); + if (commitOpClass == NULL) + goto cleanup; + + jmethodID ctor = (*env)->GetMethodID(env, commitOpClass, "", "([B[BJZ)V"); + if (ctor == NULL) + goto cleanup; + + /* Create CommitOp[] array */ + jobjectArray opsArray = (*env)->NewObjectArray(env, num_ops, commitOpClass, NULL); + if (opsArray == NULL) + goto cleanup; + + for (int i = 0; i < num_ops; i++) + { + jbyteArray jkey = (*env)->NewByteArray(env, (jsize)ops[i].key_size); + (*env)->SetByteArrayRegion(env, jkey, 0, (jsize)ops[i].key_size, (jbyte *)ops[i].key); + + jbyteArray jvalue = NULL; + if (ops[i].value != NULL && ops[i].value_size > 0) + { + jvalue = (*env)->NewByteArray(env, (jsize)ops[i].value_size); + (*env)->SetByteArrayRegion(env, jvalue, 0, (jsize)ops[i].value_size, + (jbyte *)ops[i].value); + } + + jobject opObj = (*env)->NewObject(env, commitOpClass, ctor, jkey, jvalue, + (jlong)ops[i].ttl, + ops[i].is_delete ? JNI_TRUE : JNI_FALSE); + (*env)->SetObjectArrayElement(env, opsArray, i, opObj); + + (*env)->DeleteLocalRef(env, opObj); + (*env)->DeleteLocalRef(env, jkey); + if (jvalue != NULL) + (*env)->DeleteLocalRef(env, jvalue); + } + + /* Call CommitHook.onCommit(CommitOp[], long) */ + jclass hookClass = (*env)->GetObjectClass(env, hctx->hook_obj); + jmethodID onCommit = + (*env)->GetMethodID(env, hookClass, "onCommit", "([Lcom/tidesdb/CommitOp;J)I"); + + ret = (*env)->CallIntMethod(env, hctx->hook_obj, onCommit, opsArray, (jlong)commit_seq); + + if ((*env)->ExceptionCheck(env)) + { + (*env)->ExceptionClear(env); + ret = -1; + } + + (*env)->DeleteLocalRef(env, opsArray); + (*env)->DeleteLocalRef(env, commitOpClass); + (*env)->DeleteLocalRef(env, hookClass); + + if (need_detach) + (*hctx->jvm)->DetachCurrentThread(hctx->jvm); + + return (int)ret; + +cleanup: + if ((*env)->ExceptionCheck(env)) + (*env)->ExceptionClear(env); + if (need_detach) + (*hctx->jvm)->DetachCurrentThread(hctx->jvm); + return -1; +} + +JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamily_nativeSetCommitHook(JNIEnv *env, jclass cls, + jlong cfHandle, + jobject hook, + jlong oldCtxHandle) +{ + tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)cfHandle; + + /* Free old context if present */ + if (oldCtxHandle != 0) + { + java_hook_ctx_t *old_ctx = (java_hook_ctx_t *)(uintptr_t)oldCtxHandle; + (*env)->DeleteGlobalRef(env, old_ctx->hook_obj); + free(old_ctx); + } + + /* If hook is NULL, clear the hook */ + if (hook == NULL) + { + int result = tidesdb_cf_set_commit_hook(cf, NULL, NULL); + if (result != TDB_SUCCESS) + { + throwTidesDBException(env, result, getErrorMessage(result)); + } + return 0; + } + + /* Allocate new context */ + java_hook_ctx_t *ctx = (java_hook_ctx_t *)malloc(sizeof(java_hook_ctx_t)); + if (ctx == NULL) + { + throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to allocate commit hook context"); + return 0; + } + + (*env)->GetJavaVM(env, &ctx->jvm); + ctx->hook_obj = (*env)->NewGlobalRef(env, hook); + + int result = tidesdb_cf_set_commit_hook(cf, java_commit_hook_trampoline, ctx); + if (result != TDB_SUCCESS) + { + (*env)->DeleteGlobalRef(env, ctx->hook_obj); + free(ctx); + throwTidesDBException(env, result, getErrorMessage(result)); + return 0; + } + + return (jlong)(uintptr_t)ctx; +} + JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamily_nativeRangeCost(JNIEnv *env, jclass cls, jlong handle, jbyteArray keyA, diff --git a/src/main/java/com/tidesdb/ColumnFamily.java b/src/main/java/com/tidesdb/ColumnFamily.java index 944c0ed..cb2708b 100644 --- a/src/main/java/com/tidesdb/ColumnFamily.java +++ b/src/main/java/com/tidesdb/ColumnFamily.java @@ -30,6 +30,7 @@ public class ColumnFamily { private final long nativeHandle; private final String name; + private long commitHookCtxHandle = 0; ColumnFamily(long nativeHandle, String name) { this.nativeHandle = nativeHandle; @@ -146,6 +147,35 @@ public double rangeCost(byte[] keyA, byte[] keyB) throws TidesDBException { return nativeRangeCost(nativeHandle, keyA, keyB); } + /** + * Sets a commit hook (Change Data Capture) for this column family. + * The hook fires synchronously after every transaction commit, receiving the full + * batch of committed operations atomically. Keep the callback fast to avoid + * stalling writers. + * + *

Hooks are runtime-only and not persisted. After a database restart, + * hooks must be re-registered by the application.

+ * + * @param hook the commit hook callback + * @throws TidesDBException if the hook cannot be set + */ + public void setCommitHook(CommitHook hook) throws TidesDBException { + if (hook == null) { + throw new IllegalArgumentException("Hook cannot be null, use clearCommitHook() instead"); + } + commitHookCtxHandle = nativeSetCommitHook(nativeHandle, hook, commitHookCtxHandle); + } + + /** + * Clears the commit hook for this column family. + * After this call, no further commit callbacks will fire. + * + * @throws TidesDBException if the hook cannot be cleared + */ + public void clearCommitHook() throws TidesDBException { + commitHookCtxHandle = nativeSetCommitHook(nativeHandle, null, commitHookCtxHandle); + } + long getNativeHandle() { return nativeHandle; } @@ -159,4 +189,5 @@ private static native void nativeUpdateRuntimeConfig(long handle, long writeBuff int skipListMaxLevel, float skipListProbability, double bloomFPR, int indexSampleRatio, int syncMode, long syncIntervalUs, boolean persistToDisk) throws TidesDBException; private static native double nativeRangeCost(long handle, byte[] keyA, byte[] keyB) throws TidesDBException; + private static native long nativeSetCommitHook(long handle, CommitHook hook, long oldCtxHandle) throws TidesDBException; } diff --git a/src/main/java/com/tidesdb/CommitHook.java b/src/main/java/com/tidesdb/CommitHook.java new file mode 100644 index 0000000..35903a6 --- /dev/null +++ b/src/main/java/com/tidesdb/CommitHook.java @@ -0,0 +1,37 @@ +/** + * + * Copyright (C) TidesDB + * + * Original Author: Alex Gaetano Padula + * + * Licensed under the Mozilla Public License, v. 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.mozilla.org/en-US/MPL/2.0/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tidesdb; + +/** + * Callback interface for commit hooks (Change Data Capture). + * Invoked synchronously after a transaction commits to a column family. + * The hook receives the full batch of committed operations atomically. + */ +@FunctionalInterface +public interface CommitHook { + + /** + * Called after a transaction commits to a column family. + * + * @param ops array of committed operations + * @param commitSeq monotonic commit sequence number + * @return 0 on success, non-zero on failure (logged as warning, does not roll back) + */ + int onCommit(CommitOp[] ops, long commitSeq); +} diff --git a/src/main/java/com/tidesdb/CommitOp.java b/src/main/java/com/tidesdb/CommitOp.java new file mode 100644 index 0000000..80cc9a6 --- /dev/null +++ b/src/main/java/com/tidesdb/CommitOp.java @@ -0,0 +1,82 @@ +/** + * + * Copyright (C) TidesDB + * + * Original Author: Alex Gaetano Padula + * + * Licensed under the Mozilla Public License, v. 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.mozilla.org/en-US/MPL/2.0/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.tidesdb; + +/** + * Represents a single operation in a committed transaction batch. + * Passed to the commit hook callback after a transaction commits. + */ +public class CommitOp { + + private final byte[] key; + private final byte[] value; + private final long ttl; + private final boolean delete; + + /** + * Creates a new CommitOp. + * + * @param key the key + * @param value the value (null for deletes) + * @param ttl time-to-live (-1 for no expiry) + * @param delete true if this is a delete operation + */ + public CommitOp(byte[] key, byte[] value, long ttl, boolean delete) { + this.key = key; + this.value = value; + this.ttl = ttl; + this.delete = delete; + } + + /** + * Gets the key for this operation. + * + * @return the key bytes + */ + public byte[] getKey() { + return key; + } + + /** + * Gets the value for this operation. + * + * @return the value bytes, or null for delete operations + */ + public byte[] getValue() { + return value; + } + + /** + * Gets the TTL for this operation. + * + * @return the TTL in seconds since epoch, or -1 for no expiry + */ + public long getTtl() { + return ttl; + } + + /** + * Returns whether this is a delete operation. + * + * @return true if delete, false if put + */ + public boolean isDelete() { + return delete; + } +} diff --git a/src/test/java/com/tidesdb/TidesDBTest.java b/src/test/java/com/tidesdb/TidesDBTest.java index 9f6955d..d7494ba 100644 --- a/src/test/java/com/tidesdb/TidesDBTest.java +++ b/src/test/java/com/tidesdb/TidesDBTest.java @@ -24,6 +24,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.*; @@ -909,6 +912,158 @@ void testRangeCostNullKeys() throws TidesDBException { } } + @Test + @Order(25) + void testCommitHookBasic() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb23").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + List received = new ArrayList<>(); + AtomicLong lastSeq = new AtomicLong(); + + cf.setCommitHook((ops, commitSeq) -> { + received.add(ops); + lastSeq.set(commitSeq); + return 0; + }); + + // Commit a put operation + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "key1".getBytes(), "value1".getBytes()); + txn.commit(); + } + + // Hook fires synchronously, so data is available immediately + assertEquals(1, received.size()); + assertEquals(1, received.get(0).length); + assertArrayEquals("key1".getBytes(), received.get(0)[0].getKey()); + assertArrayEquals("value1".getBytes(), received.get(0)[0].getValue()); + assertFalse(received.get(0)[0].isDelete()); + assertTrue(lastSeq.get() > 0); + + cf.clearCommitHook(); + } + } + + @Test + @Order(26) + void testCommitHookMultipleOps() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb24").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + List received = new ArrayList<>(); + + cf.setCommitHook((ops, commitSeq) -> { + received.add(ops); + return 0; + }); + + // Commit multiple operations in one transaction + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "key1".getBytes(), "value1".getBytes()); + txn.put(cf, "key2".getBytes(), "value2".getBytes()); + txn.delete(cf, "key1".getBytes()); + txn.commit(); + } + + // Should fire once with all operations + assertEquals(1, received.size()); + assertEquals(3, received.get(0).length); + + // Last op should be a delete + assertTrue(received.get(0)[2].isDelete()); + assertArrayEquals("key1".getBytes(), received.get(0)[2].getKey()); + + cf.clearCommitHook(); + } + } + + @Test + @Order(27) + void testCommitHookClear() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb25").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + List received = new ArrayList<>(); + + cf.setCommitHook((ops, commitSeq) -> { + received.add(ops); + return 0; + }); + + // First commit — hook should fire + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "key1".getBytes(), "value1".getBytes()); + txn.commit(); + } + assertEquals(1, received.size()); + + // Clear the hook + cf.clearCommitHook(); + + // Second commit — hook should NOT fire + try (Transaction txn = db.beginTransaction()) { + txn.put(cf, "key2".getBytes(), "value2".getBytes()); + txn.commit(); + } + assertEquals(1, received.size(), "Hook should not fire after clearing"); + } + } + + @Test + @Order(28) + void testCommitHookNullThrows() throws TidesDBException { + Config config = Config.builder(tempDir.resolve("testdb26").toString()) + .numFlushThreads(2) + .numCompactionThreads(2) + .logLevel(LogLevel.INFO) + .blockCacheSize(64 * 1024 * 1024) + .maxOpenSSTables(256) + .build(); + + try (TidesDB db = TidesDB.open(config)) { + ColumnFamilyConfig cfConfig = ColumnFamilyConfig.defaultConfig(); + db.createColumnFamily("test_cf", cfConfig); + + ColumnFamily cf = db.getColumnFamily("test_cf"); + + assertThrows(IllegalArgumentException.class, () -> cf.setCommitHook(null)); + } + } + @Test @Order(21) void testTransactionResetNullIsolation() throws TidesDBException {