Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "tidesdb"
version = "0.9.3"
version = "0.9.4"
description = "Official Python bindings for TidesDB - A high-performance embedded key-value storage engine"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
4 changes: 4 additions & 0 deletions src/tidesdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
default_column_family_config,
save_config_to_ini,
load_config_from_ini,
CommitOp,
COMPARATOR_FUNC,
COMMIT_HOOK_FUNC,
)

__version__ = "0.9.1"
Expand All @@ -47,5 +49,7 @@
"default_column_family_config",
"save_config_to_ini",
"load_config_from_ini",
"CommitOp",
"COMPARATOR_FUNC",
"COMMIT_HOOK_FUNC",
]
90 changes: 89 additions & 1 deletion src/tidesdb/tidesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import os
import sys
from ctypes import (
CFUNCTYPE,
POINTER,
Structure,
c_char,
c_char_p,
c_double,
c_float,
c_int,
c_int64,
c_size_t,
c_uint8,
c_uint32,
Expand Down Expand Up @@ -198,9 +200,28 @@ class _CColumnFamilyConfig(Structure):
("l1_file_count_trigger", c_int),
("l0_queue_stall_threshold", c_int),
("use_btree", c_int),
("commit_hook_fn", c_void_p),
("commit_hook_ctx", c_void_p),
]


class _CCommitOp(Structure):
"""C structure for tidesdb_commit_op_t."""

_fields_ = [
("key", POINTER(c_uint8)),
("key_size", c_size_t),
("value", POINTER(c_uint8)),
("value_size", c_size_t),
("ttl", c_int64),
("is_delete", c_int),
]


# Commit hook callback: int (*)(const tidesdb_commit_op_t*, int, uint64_t, void*)
COMMIT_HOOK_FUNC = CFUNCTYPE(c_int, POINTER(_CCommitOp), c_int, c_uint64, c_void_p)


class _CConfig(Structure):
"""C structure for tidesdb_config_t."""

Expand Down Expand Up @@ -290,7 +311,7 @@ class _CCacheStats(Structure):
c_size_t,
POINTER(c_uint8),
c_size_t,
c_int,
c_int64,
]
_lib.tidesdb_txn_put.restype = c_int

Expand Down Expand Up @@ -413,6 +434,9 @@ class _CCacheStats(Structure):
_lib.tidesdb_cf_config_load_from_ini.argtypes = [c_char_p, c_char_p, POINTER(_CColumnFamilyConfig)]
_lib.tidesdb_cf_config_load_from_ini.restype = c_int

_lib.tidesdb_cf_set_commit_hook.argtypes = [c_void_p, COMMIT_HOOK_FUNC, c_void_p]
_lib.tidesdb_cf_set_commit_hook.restype = c_int

# Comparator function type: int (*)(const uint8_t*, size_t, const uint8_t*, size_t, void*)
COMPARATOR_FUNC = ctypes.CFUNCTYPE(c_int, POINTER(c_uint8), c_size_t, POINTER(c_uint8), c_size_t, c_void_p)
DESTROY_FUNC = ctypes.CFUNCTYPE(None, c_void_p)
Expand Down Expand Up @@ -533,6 +557,16 @@ class CacheStats:
num_partitions: int


@dataclass
class CommitOp:
"""A single operation from a committed transaction batch."""

key: bytes
value: bytes | None
ttl: int
is_delete: bool


def default_config() -> Config:
"""Get default database configuration."""
return Config(db_path="")
Expand Down Expand Up @@ -798,6 +832,60 @@ def update_runtime_config(self, config: ColumnFamilyConfig, persist_to_disk: boo
if result != TDB_SUCCESS:
raise TidesDBError.from_code(result, "failed to update runtime config")

def set_commit_hook(self, callback: callable) -> None:
"""
Set a commit hook (change data capture) callback for this column family.

The callback fires synchronously after every transaction commit on this
column family. It receives the full batch of committed operations
atomically, enabling real-time change data capture.

The callback signature is:
callback(ops: list[CommitOp], commit_seq: int) -> int

Return 0 from the callback on success. A non-zero return is logged as a
warning but does not roll back the commit.

Args:
callback: Python callable with the signature above
"""
def c_hook(ops_ptr, num_ops, commit_seq, ctx_ptr):
ops = []
for i in range(num_ops):
c_op = ops_ptr[i]
key = ctypes.string_at(c_op.key, c_op.key_size) if c_op.key and c_op.key_size > 0 else b""
value = ctypes.string_at(c_op.value, c_op.value_size) if c_op.value and c_op.value_size > 0 else None
ops.append(CommitOp(
key=key,
value=value,
ttl=c_op.ttl,
is_delete=bool(c_op.is_delete),
))
try:
return callback(ops, commit_seq)
except Exception:
return -1

c_func = COMMIT_HOOK_FUNC(c_hook)

# Store reference to prevent garbage collection
if not hasattr(self, "_commit_hook_ref"):
self._commit_hook_ref = None
self._commit_hook_ref = c_func

result = _lib.tidesdb_cf_set_commit_hook(self._cf, c_func, None)
if result != TDB_SUCCESS:
raise TidesDBError.from_code(result, "failed to set commit hook")

def clear_commit_hook(self) -> None:
"""Disable the commit hook for this column family."""
result = _lib.tidesdb_cf_set_commit_hook(self._cf, COMMIT_HOOK_FUNC(0), None)
if result != TDB_SUCCESS:
raise TidesDBError.from_code(result, "failed to clear commit hook")

if hasattr(self, "_commit_hook_ref"):
self._commit_hook_ref = None

def range_cost(self, key_a: bytes, key_b: bytes) -> float:
"""
Estimate the computational cost of iterating between two keys.
Expand Down
178 changes: 177 additions & 1 deletion tests/test_tidesdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_ttl_expiration(self, db, cf):
txn.commit()

cf.flush_memtable()
time.sleep(0.5)
time.sleep(2)

with db.begin_txn() as txn:
try:
Expand Down Expand Up @@ -711,5 +711,181 @@ def test_load_preserves_all_fields(self, temp_db_path):
assert loaded.l0_queue_stall_threshold == original.l0_queue_stall_threshold


class TestCommitHook:
"""Tests for commit hook (change data capture) API."""

def test_hook_fires_on_commit(self, db, cf):
"""Test that the commit hook fires when a transaction commits."""
captured = []

def hook(ops, commit_seq):
captured.append({"ops": ops, "seq": commit_seq})
return 0

cf.set_commit_hook(hook)

with db.begin_txn() as txn:
txn.put(cf, b"key1", b"value1")
txn.commit()

assert len(captured) == 1
assert len(captured[0]["ops"]) == 1
assert captured[0]["ops"][0].key == b"key1"
assert captured[0]["ops"][0].value == b"value1"
assert captured[0]["ops"][0].is_delete is False
assert captured[0]["seq"] > 0

cf.clear_commit_hook()

def test_hook_captures_deletes(self, db, cf):
"""Test that the hook correctly reports delete operations."""
captured = []

with db.begin_txn() as txn:
txn.put(cf, b"del_key", b"del_val")
txn.commit()

def hook(ops, commit_seq):
captured.append(ops)
return 0

cf.set_commit_hook(hook)

with db.begin_txn() as txn:
txn.delete(cf, b"del_key")
txn.commit()

assert len(captured) == 1
delete_ops = [op for op in captured[0] if op.is_delete]
assert len(delete_ops) >= 1
assert delete_ops[0].key == b"del_key"
assert delete_ops[0].value is None

cf.clear_commit_hook()

def test_hook_multi_op_batch(self, db, cf):
"""Test that the hook receives all operations in a batch."""
captured = []

def hook(ops, commit_seq):
captured.append(ops)
return 0

cf.set_commit_hook(hook)

with db.begin_txn() as txn:
txn.put(cf, b"batch1", b"v1")
txn.put(cf, b"batch2", b"v2")
txn.put(cf, b"batch3", b"v3")
txn.commit()

assert len(captured) == 1
assert len(captured[0]) == 3
keys = {op.key for op in captured[0]}
assert keys == {b"batch1", b"batch2", b"batch3"}

cf.clear_commit_hook()

def test_hook_commit_seq_increases(self, db, cf):
"""Test that commit_seq is monotonically increasing."""
seqs = []

def hook(ops, commit_seq):
seqs.append(commit_seq)
return 0

cf.set_commit_hook(hook)

for i in range(3):
with db.begin_txn() as txn:
txn.put(cf, f"seq_key_{i}".encode(), f"seq_val_{i}".encode())
txn.commit()

assert len(seqs) == 3
assert seqs[0] < seqs[1] < seqs[2]

cf.clear_commit_hook()

def test_clear_hook_stops_firing(self, db, cf):
"""Test that clearing the hook stops callbacks."""
captured = []

def hook(ops, commit_seq):
captured.append(ops)
return 0

cf.set_commit_hook(hook)

with db.begin_txn() as txn:
txn.put(cf, b"before_clear", b"v")
txn.commit()

assert len(captured) == 1

cf.clear_commit_hook()

with db.begin_txn() as txn:
txn.put(cf, b"after_clear", b"v")
txn.commit()

# No new captures after clearing
assert len(captured) == 1

def test_hook_failure_does_not_rollback(self, db, cf):
"""Test that a hook returning non-zero does not affect the commit."""
def failing_hook(ops, commit_seq):
return -1 # Simulate failure

cf.set_commit_hook(failing_hook)

with db.begin_txn() as txn:
txn.put(cf, b"survive", b"value")
txn.commit()

cf.clear_commit_hook()

# Data should still be committed despite hook failure
with db.begin_txn() as txn:
assert txn.get(cf, b"survive") == b"value"

def test_hook_with_ttl(self, db, cf):
"""Test that the hook reports TTL values."""
captured = []

def hook(ops, commit_seq):
captured.append(ops)
return 0

cf.set_commit_hook(hook)

ttl_val = int(time.time()) + 3600 # 1 hour from now
with db.begin_txn() as txn:
txn.put(cf, b"ttl_key", b"ttl_val", ttl=ttl_val)
txn.commit()

assert len(captured) == 1
assert captured[0][0].ttl == ttl_val

cf.clear_commit_hook()

def test_hook_exception_handled(self, db, cf):
"""Test that Python exceptions in the hook don't crash the process."""
def crashing_hook(ops, commit_seq):
raise RuntimeError("hook crashed!")

cf.set_commit_hook(crashing_hook)

# Should not raise - exception is caught internally
with db.begin_txn() as txn:
txn.put(cf, b"crash_key", b"crash_val")
txn.commit()

cf.clear_commit_hook()

# Data should still be committed
with db.begin_txn() as txn:
assert txn.get(cf, b"crash_key") == b"crash_val"


if __name__ == "__main__":
pytest.main([__file__, "-v"])
Loading