Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.tidesdb</groupId>
<artifactId>tidesdb-java</artifactId>
<version>0.6.3</version>
<version>0.6.4</version>
<packaging>jar</packaging>

<name>TidesDB Java</name>
Expand Down
152 changes: 152 additions & 0 deletions src/main/c/com_tidesdb_TidesDB.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,158 @@ JNIEXPORT void JNICALL Java_com_tidesdb_TidesDBIterator_nativeFree(JNIEnv *env,
}
}

/**
* Context stored as the commit hook ctx pointer.
* Holds the JavaVM and a global reference to the Java CommitHook object.
*/
typedef struct
{
JavaVM *jvm;
jobject hook_obj; /* global reference to CommitHook */
} java_hook_ctx_t;

/**
* C trampoline that bridges the tidesdb_commit_hook_fn callback to the Java CommitHook.onCommit
* method. Fires synchronously on the committing thread (which is always a Java thread).
*/
static int java_commit_hook_trampoline(const tidesdb_commit_op_t *ops, int num_ops,
uint64_t commit_seq, void *ctx)
{
java_hook_ctx_t *hctx = (java_hook_ctx_t *)ctx;
JNIEnv *env = NULL;
int need_detach = 0;

jint rc = (*hctx->jvm)->GetEnv(hctx->jvm, (void **)&env, JNI_VERSION_1_6);
if (rc == JNI_EDETACHED)
{
if ((*hctx->jvm)->AttachCurrentThread(hctx->jvm, (void **)&env, NULL) != 0)
return -1;
need_detach = 1;
}
else if (rc != JNI_OK)
{
return -1;
}

jint ret = -1;

/* Find CommitOp class and constructor: CommitOp(byte[], byte[], long, boolean) */
jclass commitOpClass = (*env)->FindClass(env, "com/tidesdb/CommitOp");
if (commitOpClass == NULL)
goto cleanup;

jmethodID ctor = (*env)->GetMethodID(env, commitOpClass, "<init>", "([B[BJZ)V");
if (ctor == NULL)
goto cleanup;

/* Create CommitOp[] array */
jobjectArray opsArray = (*env)->NewObjectArray(env, num_ops, commitOpClass, NULL);
if (opsArray == NULL)
goto cleanup;

for (int i = 0; i < num_ops; i++)
{
jbyteArray jkey = (*env)->NewByteArray(env, (jsize)ops[i].key_size);
(*env)->SetByteArrayRegion(env, jkey, 0, (jsize)ops[i].key_size, (jbyte *)ops[i].key);

jbyteArray jvalue = NULL;
if (ops[i].value != NULL && ops[i].value_size > 0)
{
jvalue = (*env)->NewByteArray(env, (jsize)ops[i].value_size);
(*env)->SetByteArrayRegion(env, jvalue, 0, (jsize)ops[i].value_size,
(jbyte *)ops[i].value);
}

jobject opObj = (*env)->NewObject(env, commitOpClass, ctor, jkey, jvalue,
(jlong)ops[i].ttl,
ops[i].is_delete ? JNI_TRUE : JNI_FALSE);
(*env)->SetObjectArrayElement(env, opsArray, i, opObj);

(*env)->DeleteLocalRef(env, opObj);
(*env)->DeleteLocalRef(env, jkey);
if (jvalue != NULL)
(*env)->DeleteLocalRef(env, jvalue);
}

/* Call CommitHook.onCommit(CommitOp[], long) */
jclass hookClass = (*env)->GetObjectClass(env, hctx->hook_obj);
jmethodID onCommit =
(*env)->GetMethodID(env, hookClass, "onCommit", "([Lcom/tidesdb/CommitOp;J)I");

ret = (*env)->CallIntMethod(env, hctx->hook_obj, onCommit, opsArray, (jlong)commit_seq);

if ((*env)->ExceptionCheck(env))
{
(*env)->ExceptionClear(env);
ret = -1;
}

(*env)->DeleteLocalRef(env, opsArray);
(*env)->DeleteLocalRef(env, commitOpClass);
(*env)->DeleteLocalRef(env, hookClass);

if (need_detach)
(*hctx->jvm)->DetachCurrentThread(hctx->jvm);

return (int)ret;

cleanup:
if ((*env)->ExceptionCheck(env))
(*env)->ExceptionClear(env);
if (need_detach)
(*hctx->jvm)->DetachCurrentThread(hctx->jvm);
return -1;
}

JNIEXPORT jlong JNICALL Java_com_tidesdb_ColumnFamily_nativeSetCommitHook(JNIEnv *env, jclass cls,
jlong cfHandle,
jobject hook,
jlong oldCtxHandle)
{
tidesdb_column_family_t *cf = (tidesdb_column_family_t *)(uintptr_t)cfHandle;

/* Free old context if present */
if (oldCtxHandle != 0)
{
java_hook_ctx_t *old_ctx = (java_hook_ctx_t *)(uintptr_t)oldCtxHandle;
(*env)->DeleteGlobalRef(env, old_ctx->hook_obj);
free(old_ctx);
}

/* If hook is NULL, clear the hook */
if (hook == NULL)
{
int result = tidesdb_cf_set_commit_hook(cf, NULL, NULL);
if (result != TDB_SUCCESS)
{
throwTidesDBException(env, result, getErrorMessage(result));
}
return 0;
}

/* Allocate new context */
java_hook_ctx_t *ctx = (java_hook_ctx_t *)malloc(sizeof(java_hook_ctx_t));
if (ctx == NULL)
{
throwTidesDBException(env, TDB_ERR_MEMORY, "Failed to allocate commit hook context");
return 0;
}

(*env)->GetJavaVM(env, &ctx->jvm);
ctx->hook_obj = (*env)->NewGlobalRef(env, hook);

int result = tidesdb_cf_set_commit_hook(cf, java_commit_hook_trampoline, ctx);
if (result != TDB_SUCCESS)
{
(*env)->DeleteGlobalRef(env, ctx->hook_obj);
free(ctx);
throwTidesDBException(env, result, getErrorMessage(result));
return 0;
}

return (jlong)(uintptr_t)ctx;
}

JNIEXPORT jdouble JNICALL Java_com_tidesdb_ColumnFamily_nativeRangeCost(JNIEnv *env, jclass cls,
jlong handle,
jbyteArray keyA,
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/com/tidesdb/ColumnFamily.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ColumnFamily {

private final long nativeHandle;
private final String name;
private long commitHookCtxHandle = 0;

ColumnFamily(long nativeHandle, String name) {
this.nativeHandle = nativeHandle;
Expand Down Expand Up @@ -146,6 +147,35 @@ public double rangeCost(byte[] keyA, byte[] keyB) throws TidesDBException {
return nativeRangeCost(nativeHandle, keyA, keyB);
}

/**
* Sets a commit hook (Change Data Capture) for this column family.
* The hook fires synchronously after every transaction commit, receiving the full
* batch of committed operations atomically. Keep the callback fast to avoid
* stalling writers.
*
* <p>Hooks are runtime-only and not persisted. After a database restart,
* hooks must be re-registered by the application.</p>
*
* @param hook the commit hook callback
* @throws TidesDBException if the hook cannot be set
*/
public void setCommitHook(CommitHook hook) throws TidesDBException {
if (hook == null) {
throw new IllegalArgumentException("Hook cannot be null, use clearCommitHook() instead");
}
commitHookCtxHandle = nativeSetCommitHook(nativeHandle, hook, commitHookCtxHandle);
}

/**
* Clears the commit hook for this column family.
* After this call, no further commit callbacks will fire.
*
* @throws TidesDBException if the hook cannot be cleared
*/
public void clearCommitHook() throws TidesDBException {
commitHookCtxHandle = nativeSetCommitHook(nativeHandle, null, commitHookCtxHandle);
}

long getNativeHandle() {
return nativeHandle;
}
Expand All @@ -159,4 +189,5 @@ private static native void nativeUpdateRuntimeConfig(long handle, long writeBuff
int skipListMaxLevel, float skipListProbability, double bloomFPR, int indexSampleRatio,
int syncMode, long syncIntervalUs, boolean persistToDisk) throws TidesDBException;
private static native double nativeRangeCost(long handle, byte[] keyA, byte[] keyB) throws TidesDBException;
private static native long nativeSetCommitHook(long handle, CommitHook hook, long oldCtxHandle) throws TidesDBException;
}
37 changes: 37 additions & 0 deletions src/main/java/com/tidesdb/CommitHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
*
* Copyright (C) TidesDB
*
* Original Author: Alex Gaetano Padula
*
* Licensed under the Mozilla Public License, v. 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.mozilla.org/en-US/MPL/2.0/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.tidesdb;

/**
* Callback interface for commit hooks (Change Data Capture).
* Invoked synchronously after a transaction commits to a column family.
* The hook receives the full batch of committed operations atomically.
*/
@FunctionalInterface
public interface CommitHook {

/**
* Called after a transaction commits to a column family.
*
* @param ops array of committed operations
* @param commitSeq monotonic commit sequence number
* @return 0 on success, non-zero on failure (logged as warning, does not roll back)
*/
int onCommit(CommitOp[] ops, long commitSeq);
}
82 changes: 82 additions & 0 deletions src/main/java/com/tidesdb/CommitOp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
*
* Copyright (C) TidesDB
*
* Original Author: Alex Gaetano Padula
*
* Licensed under the Mozilla Public License, v. 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.mozilla.org/en-US/MPL/2.0/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.tidesdb;

/**
* Represents a single operation in a committed transaction batch.
* Passed to the commit hook callback after a transaction commits.
*/
public class CommitOp {

private final byte[] key;
private final byte[] value;
private final long ttl;
private final boolean delete;

/**
* Creates a new CommitOp.
*
* @param key the key
* @param value the value (null for deletes)
* @param ttl time-to-live (-1 for no expiry)
* @param delete true if this is a delete operation
*/
public CommitOp(byte[] key, byte[] value, long ttl, boolean delete) {
this.key = key;
this.value = value;
this.ttl = ttl;
this.delete = delete;
}

/**
* Gets the key for this operation.
*
* @return the key bytes
*/
public byte[] getKey() {
return key;
}

/**
* Gets the value for this operation.
*
* @return the value bytes, or null for delete operations
*/
public byte[] getValue() {
return value;
}

/**
* Gets the TTL for this operation.
*
* @return the TTL in seconds since epoch, or -1 for no expiry
*/
public long getTtl() {
return ttl;
}

/**
* Returns whether this is a delete operation.
*
* @return true if delete, false if put
*/
public boolean isDelete() {
return delete;
}
}
Loading
Loading