diff --git a/test/collection/test_config.py b/test/collection/test_config.py index 7287c2ffd..93f399e51 100644 --- a/test/collection/test_config.py +++ b/test/collection/test_config.py @@ -5,9 +5,11 @@ from pydantic import ValidationError from weaviate.collections.classes.config import ( + _ReplicationConfigUpdate, Configure, DataType, Property, + Reconfigure, ReferenceProperty, Vectorizers, _CollectionConfigCreate, @@ -15,6 +17,8 @@ _ObjectTTLConfig, _RerankerProvider, _VectorizerConfigCreate, + _ReplicationConfigCreate, + ReplicationDeletionStrategy, ) from weaviate.collections.classes.config_named_vectors import _NamedVectorConfigCreate from weaviate.collections.classes.config_vectorizers import ( @@ -2682,3 +2686,137 @@ def test_config_with_vectors(vector_config: List[_VectorConfigCreate], expected: def test_object_ttl_config_to_dict(ttl_config: _ObjectTTLConfig, expected: dict) -> None: """Test that _ObjectTTLConfig.to_dict() properly converts timedelta to seconds.""" assert ttl_config.to_dict() == expected + + +TEST_CONFIGURE_WITH_REPLICATION_PARAMETERS = [ + (Configure.replication(), {}), + ( + Configure.replication( + factor=3, + async_enabled=True, + deletion_strategy=ReplicationDeletionStrategy.DELETE_ON_CONFLICT, + ), + { + "factor": 3, + "asyncEnabled": True, + "deletionStrategy": "DeleteOnConflict", + }, + ), + ( + Configure.replication( + async_config=Configure.Replication.async_config( + max_workers=10, + hashtree_height=5, + frequency=60, + frequency_while_propagating=30, + alive_nodes_checking_frequency=120, + logging_frequency=15, + diff_batch_size=100, + diff_per_node_timeout=10, + pre_propagation_timeout=20, + propagation_timeout=300, + propagation_limit=1000, + propagation_delay=5, + propagation_concurrency=4, + propagation_batch_size=50, + ) + ), + { + "asyncConfig": { + "maxWorkers": 10, + "hashtreeHeight": 5, + "frequency": 60, + "frequencyWhilePropagating": 30, + "aliveNodesCheckingFrequency": 120, + "loggingFrequency": 15, + "diffBatchSize": 100, + "diffPerNodeTimeout": 10, + "prePropagationTimeout": 20, + "propagationTimeout": 300, + "propagationLimit": 1000, + "propagationDelay": 5, + "propagationConcurrency": 4, + "propagationBatchSize": 50, + } + }, + ), +] + + +@pytest.mark.parametrize("config,expected", TEST_CONFIGURE_WITH_REPLICATION_PARAMETERS) +def test_configure_with_replication(config: _ReplicationConfigCreate, expected: dict) -> None: + """Test that _ReplicationConfig.to_dict() properly converts replication settings.""" + assert config._to_dict() == expected + + +TEST_RECONFIGURE_WITH_REPLICATION_PARAMETERS = [ + ( + Reconfigure.replication(), + { + "factor": None, + "asyncEnabled": None, + "deletionStrategy": None, + "asyncConfig": None, + }, + ), + ( + Reconfigure.replication( + factor=3, + async_enabled=True, + deletion_strategy=ReplicationDeletionStrategy.DELETE_ON_CONFLICT, + ), + { + "factor": 3, + "asyncEnabled": True, + "deletionStrategy": "DeleteOnConflict", + "asyncConfig": None, + }, + ), + ( + Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=10, + hashtree_height=5, + frequency=60, + frequency_while_propagating=30, + alive_nodes_checking_frequency=120, + logging_frequency=15, + diff_batch_size=100, + diff_per_node_timeout=10, + pre_propagation_timeout=20, + propagation_timeout=300, + propagation_limit=1000, + propagation_delay=5, + propagation_concurrency=4, + propagation_batch_size=50, + ) + ), + { + "factor": None, + "asyncEnabled": None, + "deletionStrategy": None, + "asyncConfig": { + "maxWorkers": 10, + "hashtreeHeight": 5, + "frequency": 60, + "frequencyWhilePropagating": 30, + "aliveNodesCheckingFrequency": 120, + "loggingFrequency": 15, + "diffBatchSize": 100, + "diffPerNodeTimeout": 10, + "prePropagationTimeout": 20, + "propagationTimeout": 300, + "propagationLimit": 1000, + "propagationDelay": 5, + "propagationConcurrency": 4, + "propagationBatchSize": 50, + }, + }, + ), +] + + +@pytest.mark.parametrize("config,expected", TEST_RECONFIGURE_WITH_REPLICATION_PARAMETERS) +def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected: dict) -> None: + """Test that _ReplicationConfig.to_dict() properly converts replication settings.""" + assert config.model_dump() == expected diff --git a/weaviate/collections/classes/config.py b/weaviate/collections/classes/config.py index c4b679362..44c4cec32 100644 --- a/weaviate/collections/classes/config.py +++ b/weaviate/collections/classes/config.py @@ -287,15 +287,51 @@ class _ShardingConfigCreate(_ConfigCreateModel): function: str = "murmur3" +class _AsyncReplicationConfigCreate(_ConfigCreateModel): + maxWorkers: Optional[int] + hashtreeHeight: Optional[int] + frequency: Optional[int] + frequencyWhilePropagating: Optional[int] + aliveNodesCheckingFrequency: Optional[int] + loggingFrequency: Optional[int] + diffBatchSize: Optional[int] + diffPerNodeTimeout: Optional[int] + prePropagationTimeout: Optional[int] + propagationTimeout: Optional[int] + propagationLimit: Optional[int] + propagationDelay: Optional[int] + propagationConcurrency: Optional[int] + propagationBatchSize: Optional[int] + + +class _AsyncReplicationConfigUpdate(_ConfigUpdateModel): + maxWorkers: Optional[int] + hashtreeHeight: Optional[int] + frequency: Optional[int] + frequencyWhilePropagating: Optional[int] + aliveNodesCheckingFrequency: Optional[int] + loggingFrequency: Optional[int] + diffBatchSize: Optional[int] + diffPerNodeTimeout: Optional[int] + prePropagationTimeout: Optional[int] + propagationTimeout: Optional[int] + propagationLimit: Optional[int] + propagationDelay: Optional[int] + propagationConcurrency: Optional[int] + propagationBatchSize: Optional[int] + + class _ReplicationConfigCreate(_ConfigCreateModel): factor: Optional[int] asyncEnabled: Optional[bool] + asyncConfig: Optional[_AsyncReplicationConfigCreate] deletionStrategy: Optional[ReplicationDeletionStrategy] class _ReplicationConfigUpdate(_ConfigUpdateModel): factor: Optional[int] asyncEnabled: Optional[bool] + asyncConfig: Optional[_AsyncReplicationConfigUpdate] deletionStrategy: Optional[ReplicationDeletionStrategy] @@ -2339,6 +2375,88 @@ def __add_props( ret_dict["properties"] = existing_props +class _Replication: + @staticmethod + def async_config( + *, + max_workers: Optional[int] = None, + hashtree_height: Optional[int] = None, + frequency: Optional[int] = None, + frequency_while_propagating: Optional[int] = None, + alive_nodes_checking_frequency: Optional[int] = None, + logging_frequency: Optional[int] = None, + diff_batch_size: Optional[int] = None, + diff_per_node_timeout: Optional[int] = None, + pre_propagation_timeout: Optional[int] = None, + propagation_timeout: Optional[int] = None, + propagation_limit: Optional[int] = None, + propagation_delay: Optional[int] = None, + propagation_concurrency: Optional[int] = None, + propagation_batch_size: Optional[int] = None, + ) -> _AsyncReplicationConfigCreate: + """Create a configuration object create for async replication settings when creating a collection. + + This is only available with WeaviateDB `>=v1.36.0`. + """ + return _AsyncReplicationConfigCreate( + maxWorkers=max_workers, + hashtreeHeight=hashtree_height, + frequency=frequency, + frequencyWhilePropagating=frequency_while_propagating, + aliveNodesCheckingFrequency=alive_nodes_checking_frequency, + loggingFrequency=logging_frequency, + diffBatchSize=diff_batch_size, + diffPerNodeTimeout=diff_per_node_timeout, + prePropagationTimeout=pre_propagation_timeout, + propagationTimeout=propagation_timeout, + propagationLimit=propagation_limit, + propagationDelay=propagation_delay, + propagationConcurrency=propagation_concurrency, + propagationBatchSize=propagation_batch_size, + ) + + +class _ReplicationUpdate: + @staticmethod + def async_config( + *, + max_workers: Optional[int] = None, + hashtree_height: Optional[int] = None, + frequency: Optional[int] = None, + frequency_while_propagating: Optional[int] = None, + alive_nodes_checking_frequency: Optional[int] = None, + logging_frequency: Optional[int] = None, + diff_batch_size: Optional[int] = None, + diff_per_node_timeout: Optional[int] = None, + pre_propagation_timeout: Optional[int] = None, + propagation_timeout: Optional[int] = None, + propagation_limit: Optional[int] = None, + propagation_delay: Optional[int] = None, + propagation_concurrency: Optional[int] = None, + propagation_batch_size: Optional[int] = None, + ) -> _AsyncReplicationConfigUpdate: + """Create a configuration object for async replication settings when updating a collection. + + This is only available with WeaviateDB `>=v1.36.0`. + """ + return _AsyncReplicationConfigUpdate( + maxWorkers=max_workers, + hashtreeHeight=hashtree_height, + frequency=frequency, + frequencyWhilePropagating=frequency_while_propagating, + aliveNodesCheckingFrequency=alive_nodes_checking_frequency, + loggingFrequency=logging_frequency, + diffBatchSize=diff_batch_size, + diffPerNodeTimeout=diff_per_node_timeout, + prePropagationTimeout=pre_propagation_timeout, + propagationTimeout=propagation_timeout, + propagationLimit=propagation_limit, + propagationDelay=propagation_delay, + propagationConcurrency=propagation_concurrency, + propagationBatchSize=propagation_batch_size, + ) + + class Configure: """Use this factory class to generate the correct object for use when using the `collections.create()` method. E.g., `.multi_tenancy()` will return a `MultiTenancyConfigCreate` object to be used in the `multi_tenancy_config` argument. @@ -2354,6 +2472,7 @@ class Configure: Vectors = _Vectors MultiVectors = _MultiVectors ObjectTTL = _ObjectTTL + Replication = _Replication @staticmethod def inverted_index( @@ -2416,6 +2535,7 @@ def replication( factor: Optional[int] = None, async_enabled: Optional[bool] = None, deletion_strategy: Optional[ReplicationDeletionStrategy] = None, + async_config: Optional[_AsyncReplicationConfigCreate] = None, ) -> _ReplicationConfigCreate: """Create a `ReplicationConfigCreate` object to be used when defining the replication configuration of Weaviate. @@ -2425,11 +2545,13 @@ def replication( factor: The replication factor. async_enabled: Enabled async replication. deletion_strategy: How conflicts between different nodes about deleted objects are resolved. + async_config: The configuration for async replication. This is only relevant if `async_enabled` is `True`. """ return _ReplicationConfigCreate( factor=factor, asyncEnabled=async_enabled, deletionStrategy=deletion_strategy, + asyncConfig=async_config, ) @staticmethod @@ -2632,6 +2754,7 @@ class Reconfigure: Generative = _Generative # config is the same for create and update Reranker = _Reranker # config is the same for create and update ObjectTTL = _ObjectTTLUpdate + Replication = _ReplicationUpdate @staticmethod def inverted_index( @@ -2664,6 +2787,7 @@ def replication( factor: Optional[int] = None, async_enabled: Optional[bool] = None, deletion_strategy: Optional[ReplicationDeletionStrategy] = None, + async_config: Optional[_AsyncReplicationConfigUpdate] = None, ) -> _ReplicationConfigUpdate: """Create a `ReplicationConfigUpdate` object. @@ -2673,11 +2797,13 @@ def replication( factor: The replication factor. async_enabled: Enable async replication. deletion_strategy: How conflicts between different nodes about deleted objects are resolved. + async_config: The async replication configuration. This is only applicable if `async_enabled` is set to `True`. """ return _ReplicationConfigUpdate( factor=factor, asyncEnabled=async_enabled, deletionStrategy=deletion_strategy, + asyncConfig=async_config, ) @staticmethod