From 1b6874c7755ead76e72423c264bc6a97b3d2fc13 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Fri, 20 Feb 2026 23:46:56 -0500 Subject: [PATCH 1/3] column family commit hook implementation; updated pyproject --- pyproject.toml | 2 +- src/tidesdb/__init__.py | 4 + src/tidesdb/tidesdb.py | 88 ++++++++++++++++++++ tests/test_tidesdb.py | 176 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 269 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 825cf26..4ed9fa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "tidesdb" -version = "0.9.3" +version = "0.9.4" description = "Official Python bindings for TidesDB - A high-performance embedded key-value storage engine" readme = "README.md" requires-python = ">=3.10" diff --git a/src/tidesdb/__init__.py b/src/tidesdb/__init__.py index 1636984..eaf072e 100644 --- a/src/tidesdb/__init__.py +++ b/src/tidesdb/__init__.py @@ -25,7 +25,9 @@ default_column_family_config, save_config_to_ini, load_config_from_ini, + CommitOp, COMPARATOR_FUNC, + COMMIT_HOOK_FUNC, ) __version__ = "0.9.1" @@ -47,5 +49,7 @@ "default_column_family_config", "save_config_to_ini", "load_config_from_ini", + "CommitOp", "COMPARATOR_FUNC", + "COMMIT_HOOK_FUNC", ] diff --git a/src/tidesdb/tidesdb.py b/src/tidesdb/tidesdb.py index c589634..5ef53ff 100644 --- a/src/tidesdb/tidesdb.py +++ b/src/tidesdb/tidesdb.py @@ -23,6 +23,7 @@ import os import sys from ctypes import ( + CFUNCTYPE, POINTER, Structure, c_char, @@ -30,6 +31,7 @@ c_double, c_float, c_int, + c_long, c_size_t, c_uint8, c_uint32, @@ -198,9 +200,28 @@ class _CColumnFamilyConfig(Structure): ("l1_file_count_trigger", c_int), ("l0_queue_stall_threshold", c_int), ("use_btree", c_int), + ("commit_hook_fn", c_void_p), + ("commit_hook_ctx", c_void_p), ] +class _CCommitOp(Structure): + """C structure for tidesdb_commit_op_t.""" + + _fields_ = [ + ("key", POINTER(c_uint8)), + ("key_size", c_size_t), + ("value", POINTER(c_uint8)), + ("value_size", c_size_t), + ("ttl", c_long), + ("is_delete", c_int), + ] + + +# Commit hook callback: int (*)(const tidesdb_commit_op_t*, int, uint64_t, void*) +COMMIT_HOOK_FUNC = CFUNCTYPE(c_int, POINTER(_CCommitOp), c_int, c_uint64, c_void_p) + + class _CConfig(Structure): """C structure for tidesdb_config_t.""" @@ -413,6 +434,9 @@ class _CCacheStats(Structure): _lib.tidesdb_cf_config_load_from_ini.argtypes = [c_char_p, c_char_p, POINTER(_CColumnFamilyConfig)] _lib.tidesdb_cf_config_load_from_ini.restype = c_int +_lib.tidesdb_cf_set_commit_hook.argtypes = [c_void_p, COMMIT_HOOK_FUNC, c_void_p] +_lib.tidesdb_cf_set_commit_hook.restype = c_int + # Comparator function type: int (*)(const uint8_t*, size_t, const uint8_t*, size_t, void*) COMPARATOR_FUNC = ctypes.CFUNCTYPE(c_int, POINTER(c_uint8), c_size_t, POINTER(c_uint8), c_size_t, c_void_p) DESTROY_FUNC = ctypes.CFUNCTYPE(None, c_void_p) @@ -533,6 +557,16 @@ class CacheStats: num_partitions: int +@dataclass +class CommitOp: + """A single operation from a committed transaction batch.""" + + key: bytes + value: bytes | None + ttl: int + is_delete: bool + + def default_config() -> Config: """Get default database configuration.""" return Config(db_path="") @@ -798,6 +832,60 @@ def update_runtime_config(self, config: ColumnFamilyConfig, persist_to_disk: boo if result != TDB_SUCCESS: raise TidesDBError.from_code(result, "failed to update runtime config") + def set_commit_hook(self, callback: callable) -> None: + """ + Set a commit hook (change data capture) callback for this column family. + + The callback fires synchronously after every transaction commit on this + column family. It receives the full batch of committed operations + atomically, enabling real-time change data capture. + + The callback signature is: + callback(ops: list[CommitOp], commit_seq: int) -> int + + Return 0 from the callback on success. A non-zero return is logged as a + warning but does not roll back the commit. + + Args: + callback: Python callable with the signature above + """ + def c_hook(ops_ptr, num_ops, commit_seq, ctx_ptr): + ops = [] + for i in range(num_ops): + c_op = ops_ptr[i] + key = ctypes.string_at(c_op.key, c_op.key_size) if c_op.key and c_op.key_size > 0 else b"" + value = ctypes.string_at(c_op.value, c_op.value_size) if c_op.value and c_op.value_size > 0 else None + ops.append(CommitOp( + key=key, + value=value, + ttl=c_op.ttl, + is_delete=bool(c_op.is_delete), + )) + try: + return callback(ops, commit_seq) + except Exception: + return -1 + + c_func = COMMIT_HOOK_FUNC(c_hook) + + # Store reference to prevent garbage collection + if not hasattr(self, "_commit_hook_ref"): + self._commit_hook_ref = None + self._commit_hook_ref = c_func + + result = _lib.tidesdb_cf_set_commit_hook(self._cf, c_func, None) + if result != TDB_SUCCESS: + raise TidesDBError.from_code(result, "failed to set commit hook") + + def clear_commit_hook(self) -> None: + """Disable the commit hook for this column family.""" + result = _lib.tidesdb_cf_set_commit_hook(self._cf, COMMIT_HOOK_FUNC(0), None) + if result != TDB_SUCCESS: + raise TidesDBError.from_code(result, "failed to clear commit hook") + + if hasattr(self, "_commit_hook_ref"): + self._commit_hook_ref = None + def range_cost(self, key_a: bytes, key_b: bytes) -> float: """ Estimate the computational cost of iterating between two keys. diff --git a/tests/test_tidesdb.py b/tests/test_tidesdb.py index 6111c21..954e1ed 100644 --- a/tests/test_tidesdb.py +++ b/tests/test_tidesdb.py @@ -711,5 +711,181 @@ def test_load_preserves_all_fields(self, temp_db_path): assert loaded.l0_queue_stall_threshold == original.l0_queue_stall_threshold +class TestCommitHook: + """Tests for commit hook (change data capture) API.""" + + def test_hook_fires_on_commit(self, db, cf): + """Test that the commit hook fires when a transaction commits.""" + captured = [] + + def hook(ops, commit_seq): + captured.append({"ops": ops, "seq": commit_seq}) + return 0 + + cf.set_commit_hook(hook) + + with db.begin_txn() as txn: + txn.put(cf, b"key1", b"value1") + txn.commit() + + assert len(captured) == 1 + assert len(captured[0]["ops"]) == 1 + assert captured[0]["ops"][0].key == b"key1" + assert captured[0]["ops"][0].value == b"value1" + assert captured[0]["ops"][0].is_delete is False + assert captured[0]["seq"] > 0 + + cf.clear_commit_hook() + + def test_hook_captures_deletes(self, db, cf): + """Test that the hook correctly reports delete operations.""" + captured = [] + + with db.begin_txn() as txn: + txn.put(cf, b"del_key", b"del_val") + txn.commit() + + def hook(ops, commit_seq): + captured.append(ops) + return 0 + + cf.set_commit_hook(hook) + + with db.begin_txn() as txn: + txn.delete(cf, b"del_key") + txn.commit() + + assert len(captured) == 1 + delete_ops = [op for op in captured[0] if op.is_delete] + assert len(delete_ops) >= 1 + assert delete_ops[0].key == b"del_key" + assert delete_ops[0].value is None + + cf.clear_commit_hook() + + def test_hook_multi_op_batch(self, db, cf): + """Test that the hook receives all operations in a batch.""" + captured = [] + + def hook(ops, commit_seq): + captured.append(ops) + return 0 + + cf.set_commit_hook(hook) + + with db.begin_txn() as txn: + txn.put(cf, b"batch1", b"v1") + txn.put(cf, b"batch2", b"v2") + txn.put(cf, b"batch3", b"v3") + txn.commit() + + assert len(captured) == 1 + assert len(captured[0]) == 3 + keys = {op.key for op in captured[0]} + assert keys == {b"batch1", b"batch2", b"batch3"} + + cf.clear_commit_hook() + + def test_hook_commit_seq_increases(self, db, cf): + """Test that commit_seq is monotonically increasing.""" + seqs = [] + + def hook(ops, commit_seq): + seqs.append(commit_seq) + return 0 + + cf.set_commit_hook(hook) + + for i in range(3): + with db.begin_txn() as txn: + txn.put(cf, f"seq_key_{i}".encode(), f"seq_val_{i}".encode()) + txn.commit() + + assert len(seqs) == 3 + assert seqs[0] < seqs[1] < seqs[2] + + cf.clear_commit_hook() + + def test_clear_hook_stops_firing(self, db, cf): + """Test that clearing the hook stops callbacks.""" + captured = [] + + def hook(ops, commit_seq): + captured.append(ops) + return 0 + + cf.set_commit_hook(hook) + + with db.begin_txn() as txn: + txn.put(cf, b"before_clear", b"v") + txn.commit() + + assert len(captured) == 1 + + cf.clear_commit_hook() + + with db.begin_txn() as txn: + txn.put(cf, b"after_clear", b"v") + txn.commit() + + # No new captures after clearing + assert len(captured) == 1 + + def test_hook_failure_does_not_rollback(self, db, cf): + """Test that a hook returning non-zero does not affect the commit.""" + def failing_hook(ops, commit_seq): + return -1 # Simulate failure + + cf.set_commit_hook(failing_hook) + + with db.begin_txn() as txn: + txn.put(cf, b"survive", b"value") + txn.commit() + + cf.clear_commit_hook() + + # Data should still be committed despite hook failure + with db.begin_txn() as txn: + assert txn.get(cf, b"survive") == b"value" + + def test_hook_with_ttl(self, db, cf): + """Test that the hook reports TTL values.""" + captured = [] + + def hook(ops, commit_seq): + captured.append(ops) + return 0 + + cf.set_commit_hook(hook) + + ttl_val = int(time.time()) + 3600 # 1 hour from now + with db.begin_txn() as txn: + txn.put(cf, b"ttl_key", b"ttl_val", ttl=ttl_val) + txn.commit() + + assert len(captured) == 1 + assert captured[0][0].ttl == ttl_val + + cf.clear_commit_hook() + + def test_hook_exception_handled(self, db, cf): + """Test that Python exceptions in the hook don't crash the process.""" + def crashing_hook(ops, commit_seq): + raise RuntimeError("hook crashed!") + + cf.set_commit_hook(crashing_hook) + + # Should not raise - exception is caught internally + with db.begin_txn() as txn: + txn.put(cf, b"crash_key", b"crash_val") + txn.commit() + + cf.clear_commit_hook() + + # Data should still be committed + with db.begin_txn() as txn: + assert txn.get(cf, b"crash_key") == b"crash_val" + + if __name__ == "__main__": pytest.main([__file__, "-v"]) From f51b96451aa2c725dda71ab9523cda918ba1e6b4 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Sat, 21 Feb 2026 00:00:36 -0500 Subject: [PATCH 2/3] correct argtypes for ttl and adjust flaky ttl expiration test --- src/tidesdb/tidesdb.py | 2 +- tests/test_tidesdb.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tidesdb/tidesdb.py b/src/tidesdb/tidesdb.py index 5ef53ff..6f3a650 100644 --- a/src/tidesdb/tidesdb.py +++ b/src/tidesdb/tidesdb.py @@ -311,7 +311,7 @@ class _CCacheStats(Structure): c_size_t, POINTER(c_uint8), c_size_t, - c_int, + c_long, ] _lib.tidesdb_txn_put.restype = c_int diff --git a/tests/test_tidesdb.py b/tests/test_tidesdb.py index 954e1ed..9b35725 100644 --- a/tests/test_tidesdb.py +++ b/tests/test_tidesdb.py @@ -301,7 +301,7 @@ def test_ttl_expiration(self, db, cf): txn.commit() cf.flush_memtable() - time.sleep(0.5) + time.sleep(2) with db.begin_txn() as txn: try: From b2ab2d8b5f9297ac8d01127f9e18f4163c327591 Mon Sep 17 00:00:00 2001 From: Alex Gaetano Padula Date: Sat, 21 Feb 2026 00:09:21 -0500 Subject: [PATCH 3/3] c_long-c_int64 for all time_t fields (always 8 bytes on every platform) --- src/tidesdb/tidesdb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tidesdb/tidesdb.py b/src/tidesdb/tidesdb.py index 6f3a650..e3bf34c 100644 --- a/src/tidesdb/tidesdb.py +++ b/src/tidesdb/tidesdb.py @@ -31,7 +31,7 @@ c_double, c_float, c_int, - c_long, + c_int64, c_size_t, c_uint8, c_uint32, @@ -213,7 +213,7 @@ class _CCommitOp(Structure): ("key_size", c_size_t), ("value", POINTER(c_uint8)), ("value_size", c_size_t), - ("ttl", c_long), + ("ttl", c_int64), ("is_delete", c_int), ] @@ -311,7 +311,7 @@ class _CCacheStats(Structure): c_size_t, POINTER(c_uint8), c_size_t, - c_long, + c_int64, ] _lib.tidesdb_txn_put.restype = c_int