Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions test/collection/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
from pydantic import ValidationError

from weaviate.collections.classes.config import (
_ReplicationConfigUpdate,
Configure,
DataType,
Property,
Reconfigure,
ReferenceProperty,
Vectorizers,
_CollectionConfigCreate,
_GenerativeProvider,
_ObjectTTLConfig,
_RerankerProvider,
_VectorizerConfigCreate,
_ReplicationConfigCreate,
ReplicationDeletionStrategy,
)
from weaviate.collections.classes.config_named_vectors import _NamedVectorConfigCreate
from weaviate.collections.classes.config_vectorizers import (
Expand Down Expand Up @@ -2682,3 +2686,137 @@ def test_config_with_vectors(vector_config: List[_VectorConfigCreate], expected:
def test_object_ttl_config_to_dict(ttl_config: _ObjectTTLConfig, expected: dict) -> None:
"""Test that _ObjectTTLConfig.to_dict() properly converts timedelta to seconds."""
assert ttl_config.to_dict() == expected


TEST_CONFIGURE_WITH_REPLICATION_PARAMETERS = [
(Configure.replication(), {}),
(
Configure.replication(
factor=3,
async_enabled=True,
deletion_strategy=ReplicationDeletionStrategy.DELETE_ON_CONFLICT,
),
{
"factor": 3,
"asyncEnabled": True,
"deletionStrategy": "DeleteOnConflict",
},
),
(
Configure.replication(
async_config=Configure.Replication.async_config(
max_workers=10,
hashtree_height=5,
frequency=60,
frequency_while_propagating=30,
alive_nodes_checking_frequency=120,
logging_frequency=15,
diff_batch_size=100,
diff_per_node_timeout=10,
pre_propagation_timeout=20,
propagation_timeout=300,
propagation_limit=1000,
propagation_delay=5,
propagation_concurrency=4,
propagation_batch_size=50,
)
),
{
"asyncConfig": {
"maxWorkers": 10,
"hashtreeHeight": 5,
"frequency": 60,
"frequencyWhilePropagating": 30,
"aliveNodesCheckingFrequency": 120,
"loggingFrequency": 15,
"diffBatchSize": 100,
"diffPerNodeTimeout": 10,
"prePropagationTimeout": 20,
"propagationTimeout": 300,
"propagationLimit": 1000,
"propagationDelay": 5,
"propagationConcurrency": 4,
"propagationBatchSize": 50,
}
},
),
]


@pytest.mark.parametrize("config,expected", TEST_CONFIGURE_WITH_REPLICATION_PARAMETERS)
def test_configure_with_replication(config: _ReplicationConfigCreate, expected: dict) -> None:
"""Test that _ReplicationConfig.to_dict() properly converts replication settings."""
assert config._to_dict() == expected


TEST_RECONFIGURE_WITH_REPLICATION_PARAMETERS = [
(
Reconfigure.replication(),
{
"factor": None,
"asyncEnabled": None,
"deletionStrategy": None,
"asyncConfig": None,
},
),
(
Reconfigure.replication(
factor=3,
async_enabled=True,
deletion_strategy=ReplicationDeletionStrategy.DELETE_ON_CONFLICT,
),
{
"factor": 3,
"asyncEnabled": True,
"deletionStrategy": "DeleteOnConflict",
"asyncConfig": None,
},
),
(
Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=10,
hashtree_height=5,
frequency=60,
frequency_while_propagating=30,
alive_nodes_checking_frequency=120,
logging_frequency=15,
diff_batch_size=100,
diff_per_node_timeout=10,
pre_propagation_timeout=20,
propagation_timeout=300,
propagation_limit=1000,
propagation_delay=5,
propagation_concurrency=4,
propagation_batch_size=50,
)
),
{
"factor": None,
"asyncEnabled": None,
"deletionStrategy": None,
"asyncConfig": {
"maxWorkers": 10,
"hashtreeHeight": 5,
"frequency": 60,
"frequencyWhilePropagating": 30,
"aliveNodesCheckingFrequency": 120,
"loggingFrequency": 15,
"diffBatchSize": 100,
"diffPerNodeTimeout": 10,
"prePropagationTimeout": 20,
"propagationTimeout": 300,
"propagationLimit": 1000,
"propagationDelay": 5,
"propagationConcurrency": 4,
"propagationBatchSize": 50,
},
},
),
]


@pytest.mark.parametrize("config,expected", TEST_RECONFIGURE_WITH_REPLICATION_PARAMETERS)
def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected: dict) -> None:
"""Test that _ReplicationConfig.to_dict() properly converts replication settings."""
assert config.model_dump() == expected
126 changes: 126 additions & 0 deletions weaviate/collections/classes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,51 @@ class _ShardingConfigCreate(_ConfigCreateModel):
function: str = "murmur3"


class _AsyncReplicationConfigCreate(_ConfigCreateModel):
maxWorkers: Optional[int]
hashtreeHeight: Optional[int]
frequency: Optional[int]
frequencyWhilePropagating: Optional[int]
aliveNodesCheckingFrequency: Optional[int]
loggingFrequency: Optional[int]
diffBatchSize: Optional[int]
diffPerNodeTimeout: Optional[int]
prePropagationTimeout: Optional[int]
propagationTimeout: Optional[int]
propagationLimit: Optional[int]
propagationDelay: Optional[int]
propagationConcurrency: Optional[int]
propagationBatchSize: Optional[int]


class _AsyncReplicationConfigUpdate(_ConfigUpdateModel):
maxWorkers: Optional[int]
hashtreeHeight: Optional[int]
frequency: Optional[int]
frequencyWhilePropagating: Optional[int]
aliveNodesCheckingFrequency: Optional[int]
loggingFrequency: Optional[int]
diffBatchSize: Optional[int]
diffPerNodeTimeout: Optional[int]
prePropagationTimeout: Optional[int]
propagationTimeout: Optional[int]
propagationLimit: Optional[int]
propagationDelay: Optional[int]
propagationConcurrency: Optional[int]
propagationBatchSize: Optional[int]


class _ReplicationConfigCreate(_ConfigCreateModel):
factor: Optional[int]
asyncEnabled: Optional[bool]
asyncConfig: Optional[_AsyncReplicationConfigCreate]
deletionStrategy: Optional[ReplicationDeletionStrategy]


class _ReplicationConfigUpdate(_ConfigUpdateModel):
factor: Optional[int]
asyncEnabled: Optional[bool]
asyncConfig: Optional[_AsyncReplicationConfigUpdate]
deletionStrategy: Optional[ReplicationDeletionStrategy]


Expand Down Expand Up @@ -2339,6 +2375,88 @@ def __add_props(
ret_dict["properties"] = existing_props


class _Replication:
@staticmethod
def async_config(
*,
max_workers: Optional[int] = None,
hashtree_height: Optional[int] = None,
frequency: Optional[int] = None,
frequency_while_propagating: Optional[int] = None,
alive_nodes_checking_frequency: Optional[int] = None,
logging_frequency: Optional[int] = None,
diff_batch_size: Optional[int] = None,
diff_per_node_timeout: Optional[int] = None,
pre_propagation_timeout: Optional[int] = None,
propagation_timeout: Optional[int] = None,
propagation_limit: Optional[int] = None,
propagation_delay: Optional[int] = None,
propagation_concurrency: Optional[int] = None,
propagation_batch_size: Optional[int] = None,
) -> _AsyncReplicationConfigCreate:
"""Create a configuration object create for async replication settings when creating a collection.

This is only available with WeaviateDB `>=v1.36.0`.
"""
return _AsyncReplicationConfigCreate(
maxWorkers=max_workers,
hashtreeHeight=hashtree_height,
frequency=frequency,
frequencyWhilePropagating=frequency_while_propagating,
aliveNodesCheckingFrequency=alive_nodes_checking_frequency,
loggingFrequency=logging_frequency,
diffBatchSize=diff_batch_size,
diffPerNodeTimeout=diff_per_node_timeout,
prePropagationTimeout=pre_propagation_timeout,
propagationTimeout=propagation_timeout,
propagationLimit=propagation_limit,
propagationDelay=propagation_delay,
propagationConcurrency=propagation_concurrency,
propagationBatchSize=propagation_batch_size,
)


class _ReplicationUpdate:
@staticmethod
def async_config(
*,
max_workers: Optional[int] = None,
hashtree_height: Optional[int] = None,
frequency: Optional[int] = None,
frequency_while_propagating: Optional[int] = None,
alive_nodes_checking_frequency: Optional[int] = None,
logging_frequency: Optional[int] = None,
diff_batch_size: Optional[int] = None,
diff_per_node_timeout: Optional[int] = None,
pre_propagation_timeout: Optional[int] = None,
propagation_timeout: Optional[int] = None,
propagation_limit: Optional[int] = None,
propagation_delay: Optional[int] = None,
propagation_concurrency: Optional[int] = None,
propagation_batch_size: Optional[int] = None,
) -> _AsyncReplicationConfigUpdate:
"""Create a configuration object for async replication settings when updating a collection.

This is only available with WeaviateDB `>=v1.36.0`.
"""
return _AsyncReplicationConfigUpdate(
maxWorkers=max_workers,
hashtreeHeight=hashtree_height,
frequency=frequency,
frequencyWhilePropagating=frequency_while_propagating,
aliveNodesCheckingFrequency=alive_nodes_checking_frequency,
loggingFrequency=logging_frequency,
diffBatchSize=diff_batch_size,
diffPerNodeTimeout=diff_per_node_timeout,
prePropagationTimeout=pre_propagation_timeout,
propagationTimeout=propagation_timeout,
propagationLimit=propagation_limit,
propagationDelay=propagation_delay,
propagationConcurrency=propagation_concurrency,
propagationBatchSize=propagation_batch_size,
)


class Configure:
"""Use this factory class to generate the correct object for use when using the `collections.create()` method. E.g., `.multi_tenancy()` will return a `MultiTenancyConfigCreate` object to be used in the `multi_tenancy_config` argument.

Expand All @@ -2354,6 +2472,7 @@ class Configure:
Vectors = _Vectors
MultiVectors = _MultiVectors
ObjectTTL = _ObjectTTL
Replication = _Replication

@staticmethod
def inverted_index(
Expand Down Expand Up @@ -2416,6 +2535,7 @@ def replication(
factor: Optional[int] = None,
async_enabled: Optional[bool] = None,
deletion_strategy: Optional[ReplicationDeletionStrategy] = None,
async_config: Optional[_AsyncReplicationConfigCreate] = None,
) -> _ReplicationConfigCreate:
"""Create a `ReplicationConfigCreate` object to be used when defining the replication configuration of Weaviate.

Expand All @@ -2425,11 +2545,13 @@ def replication(
factor: The replication factor.
async_enabled: Enabled async replication.
deletion_strategy: How conflicts between different nodes about deleted objects are resolved.
async_config: The configuration for async replication. This is only relevant if `async_enabled` is `True`.
"""
return _ReplicationConfigCreate(
factor=factor,
asyncEnabled=async_enabled,
deletionStrategy=deletion_strategy,
asyncConfig=async_config,
)

@staticmethod
Expand Down Expand Up @@ -2632,6 +2754,7 @@ class Reconfigure:
Generative = _Generative # config is the same for create and update
Reranker = _Reranker # config is the same for create and update
ObjectTTL = _ObjectTTLUpdate
Replication = _ReplicationUpdate

@staticmethod
def inverted_index(
Expand Down Expand Up @@ -2664,6 +2787,7 @@ def replication(
factor: Optional[int] = None,
async_enabled: Optional[bool] = None,
deletion_strategy: Optional[ReplicationDeletionStrategy] = None,
async_config: Optional[_AsyncReplicationConfigUpdate] = None,
) -> _ReplicationConfigUpdate:
"""Create a `ReplicationConfigUpdate` object.

Expand All @@ -2673,11 +2797,13 @@ def replication(
factor: The replication factor.
async_enabled: Enable async replication.
deletion_strategy: How conflicts between different nodes about deleted objects are resolved.
async_config: The async replication configuration. This is only applicable if `async_enabled` is set to `True`.
"""
return _ReplicationConfigUpdate(
factor=factor,
asyncEnabled=async_enabled,
deletionStrategy=deletion_strategy,
asyncConfig=async_config,
)

@staticmethod
Expand Down
Loading