Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"pymilvus", # with pandas, numpy, ujson
"ujson",
"hdrhistogram>=0.10.1",
"ir_datasets",
]
dynamic = ["version"]

Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class config:
LOG_FILE = env.str("LOG_FILE", "logs/vectordb_bench.log")

DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL)
DATASET_SOURCE = env.str("DATASET_SOURCE", "S3") # Options "S3" or "AliyunOSS"
DATASET_SOURCE = env.str("DATASET_SOURCE", "S3") # Options "S3", "AliyunOSS", or "IR_DATASETS"
DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset")
NUM_PER_BATCH = env.int("NUM_PER_BATCH", 100)
TIME_PER_BATCH = 1 # 1s. for streaming insertion.
Expand Down
25 changes: 23 additions & 2 deletions vectordb_bench/backend/assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from vectordb_bench.models import TaskConfig

from .cases import CaseLabel
from .dataset import FtsDatasetManager
from .task_runner import CaseRunner, RunningStatus, TaskRunner

log = logging.getLogger(__name__)
Expand All @@ -24,15 +25,23 @@ def assemble(cls, run_id: str, task: TaskConfig, source: DatasetSource) -> CaseR
c_cls = task.case_config.case_id.case_cls

c = c_cls(task.case_config.custom_case)
if type(task.db_case_config) is not EmptyDBCaseConfig:
# Auto-select data source based on dataset type
actual_source = DatasetSource.IR_DATASETS if isinstance(c.dataset, FtsDatasetManager) else source

if (
type(task.db_case_config) is not EmptyDBCaseConfig
and not isinstance(c.dataset, FtsDatasetManager)
and hasattr(c.dataset.data, "metric_type")
):
# FTS datasets don't have metric_type, skip setting it
task.db_case_config.metric_type = c.dataset.data.metric_type

return CaseRunner(
run_id=run_id,
config=task,
ca=c,
status=RunningStatus.PENDING,
dataset_source=source,
dataset_source=actual_source,
)

@classmethod
Expand All @@ -48,6 +57,7 @@ def assemble_all(
load_runners = [r for r in runners if r.ca.label == CaseLabel.Load]
perf_runners = [r for r in runners if r.ca.label == CaseLabel.Performance]
streaming_runners = [r for r in runners if r.ca.label == CaseLabel.Streaming]
fts_runners = [r for r in runners if r.ca.label == CaseLabel.FullTextSearchPerformance]

# group by db
db2runner: dict[DB, list[CaseRunner]] = {}
Expand All @@ -57,6 +67,14 @@ def assemble_all(
db2runner[db] = []
db2runner[db].append(r)

# group FTS runners by db
db2fts_runner: dict[DB, list[CaseRunner]] = {}
for r in fts_runners:
db = r.config.db
if db not in db2fts_runner:
db2fts_runner[db] = []
db2fts_runner[db].append(r)

# check
for db, runners in db2runner.items():
db_instance = db.init_cls
Expand All @@ -73,6 +91,9 @@ def assemble_all(
all_runners.extend(streaming_runners)
for v in db2runner.values():
all_runners.extend(v)
# Add FTS runners
for v in db2fts_runner.values():
all_runners.extend(v)

return TaskRunner(
run_id=run_id,
Expand Down
51 changes: 50 additions & 1 deletion vectordb_bench/backend/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from vectordb_bench.base import BaseModel
from vectordb_bench.frontend.components.custom.getCustomConfig import CustomDatasetConfig

from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType
from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType, FtsDataset, FtsDatasetManager

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,6 +57,8 @@ class CaseType(Enum):

NewIntFilterPerformanceCase = 400

FTSmsmarcoPerformance = 503

def case_cls(self, custom_configs: dict | None = None) -> type["Case"]:
if custom_configs is None:
return type2case.get(self)()
Expand All @@ -79,6 +81,7 @@ class CaseLabel(Enum):
Load = auto()
Performance = auto()
Streaming = auto()
FullTextSearchPerformance = auto()


class Case(BaseModel):
Expand Down Expand Up @@ -631,6 +634,51 @@ def filters(self) -> Filter:
return LabelFilter(label_percentage=self.label_percentage)


class FtsCase(Case):
"""Base class for FTS test cases - independent from Case.

FTS cases test full-text search capabilities (BM25) rather than
vector similarity search.

Fields:
case_id: Unique case identifier from CaseType enum
label: Case category (Performance, Load, etc.)
name: Display name for the case
description: Detailed description of what this case tests
dataset: FtsDatasetManager for this case
load_timeout: Timeout for data loading phase
optimize_timeout: Timeout for index building phase
"""

case_id: CaseType
label: CaseLabel
name: str
description: str
dataset: FtsDatasetManager
load_timeout: float | int | None = None
optimize_timeout: float | int | None = None


class FtsPerformanceCase(FtsCase):
"""Base class for FTS performance testing cases."""

label: CaseLabel = CaseLabel.FullTextSearchPerformance
load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_DEFAULT


class FTSmsmarcoPerformance(FtsPerformanceCase):
case_id: CaseType = CaseType.FTSmsmarcoPerformance
name: str = "FTS Performance Test"
description: str = (
"This case tests full-text search performance using BM25 on the MS MARCO dataset."
"It measures index building time, recall, NDCG, MRR, and search QPS."
)
dataset: FtsDatasetManager = FtsDataset.MSMARCO.manager(100_000)
load_timeout: float | int = config.LOAD_TIMEOUT_768D_100K
optimize_timeout: float | int | None = config.OPTIMIZE_TIMEOUT_768D_100K


type2case = {
CaseType.CapacityDim960: CapacityDim960,
CaseType.CapacityDim128: CapacityDim128,
Expand All @@ -655,4 +703,5 @@ def filters(self) -> Filter:
CaseType.StreamingCustomDataset: StreamingCustomDataset,
CaseType.NewIntFilterPerformanceCase: NewIntFilterPerformanceCase,
CaseType.LabelFilterPerformanceCase: LabelFilterPerformanceCase,
CaseType.FTSmsmarcoPerformance: FTSmsmarcoPerformance,
}
3 changes: 2 additions & 1 deletion vectordb_bench/backend/clients/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class IndexType(str, Enum):
IVF_RABITQ = "IVF_RABITQ"
Flat = "FLAT"
AUTOINDEX = "AUTOINDEX"
FTS_AUTOINDEX = "FTS_AUTOINDEX"
ES_HNSW = "hnsw"
ES_HNSW_INT8 = "int8_hnsw"
ES_HNSW_INT4 = "int4_hnsw"
Expand Down Expand Up @@ -88,7 +89,7 @@ def common_long_configs() -> list[str]:
def to_dict(self) -> dict:
raise NotImplementedError

@validator("*")
@validator("*", allow_reuse=True)
def not_empty_field(cls, v: any, field: any):
if field.name in cls.common_short_configs() or field.name in cls.common_long_configs():
return v
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/doris/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DorisConfig(DBConfig):
db_name: str = "test"
ssl: bool = False

@validator("*")
@validator("*", allow_reuse=True)
def not_empty_field(cls, v: any, field: any):
return v

Expand Down
45 changes: 45 additions & 0 deletions vectordb_bench/backend/clients/milvus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,48 @@ def MilvusGPUCAGRA(**parameters: Unpack[MilvusGPUCAGRATypedDict]):
),
**parameters,
)


class MilvusFTSTypedDict(CommonTypedDict, MilvusTypedDict):
"""TypedDict for Milvus FTS command parameters."""

drop_ratio_search: Annotated[
float | None,
click.option(
"--drop-ratio-search",
type=float,
help="Drop ratio for search (optional, for performance tuning)",
required=False,
default=None,
),
]


@cli.command()
@click_parameter_decorators_from_typed_dict(MilvusFTSTypedDict)
def MilvusFTS(**parameters: Unpack[MilvusFTSTypedDict]):
"""Run FTS (Full-Text Search) benchmark on Milvus using BM25.

This command uses the MS MARCO dev/small dataset for FTS testing.
"""
from .config import MilvusConfig, MilvusFtsConfig

# Set default case_type to large dataset if not specified
if parameters.get("case_type") == "Performance1536D50K": # Default from CommonTypedDict
parameters["case_type"] = "FTSmsmarcoPerformance"

run(
db=DBTYPE,
db_config=MilvusConfig(
db_label=parameters["db_label"],
uri=SecretStr(parameters["uri"]),
user=parameters["user_name"],
password=SecretStr(parameters["password"]) if parameters["password"] else None,
num_shards=int(parameters["num_shards"]),
replica_number=int(parameters["replica_number"]),
),
db_case_config=MilvusFtsConfig(
drop_ratio_search=parameters.get("drop_ratio_search"),
),
**parameters,
)
93 changes: 92 additions & 1 deletion vectordb_bench/backend/clients/milvus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def to_dict(self) -> dict:
"replica_number": self.replica_number,
}

@validator("*")
@validator("*", allow_reuse=True)
def not_empty_field(cls, v: any, field: any):
if (
field.name in cls.common_short_configs()
Expand Down Expand Up @@ -441,8 +441,99 @@ def search_param(self) -> dict:
}


class MilvusFtsConfig(BaseModel, DBCaseConfig):
"""
1. inverted_index_algo: Index algorithm selection
- "DAAT_MAXSCORE" (default): Suitable for high k values or queries with many terms, balanced performance
- "DAAT_WAND": Suitable for small k values or short queries, faster
- "TAAT_NAIVE": Dynamically adapts to collection changes (e.g., avgdl), but slower
2. bm25_k1: BM25 term frequency saturation control [1.2, 2.0], default 1.5
- Higher values: Increase importance of term frequency in document ranking
- Recommended range: 1.2-1.8, adjust based on query characteristics
3. bm25_b: BM25 document length normalization control [0.0, 1.0], default 0.75
- 1.0: No length normalization, longer documents have advantage
- 0.0: Full normalization, shorter documents have advantage
- 0.75: Balanced length normalization, commonly used default
4. analyzer_tokenizer: Tokenizer type, default "standard"
- "standard": Standard tokenizer, suitable for English text
- "whitespace": Split by whitespace characters
- "keyword": No tokenization, preserve original text
5. analyzer_enable_lowercase: Enable lowercase conversion, default True
- True: Convert all text to lowercase, improve matching rate
- False: Preserve original case
6. analyzer_max_token_length: Maximum length of individual tokens, default 40
- Limit length of overly long words
- Set to None to disable this limit
7. analyzer_stop_words: Stop words list, default None
- Comma-separated stop words, e.g., "of,to,the,and,or"
- These words will be filtered out and not participate in indexing/search
8. drop_ratio_search: Ratio of minimum values to ignore during search [0.0, 1.0], default None
- 0.0: Keep all values, highest recall
- 0.1-0.3: Improve search speed by 10-20%, slight impact on recall
"""

index_type: str = "SPARSE_INVERTED_INDEX"
metric_type: str = "BM25"
inverted_index_algo: str = "DAAT_MAXSCORE" # DAAT_MAXSCORE | DAAT_WAND | TAAT_NAIVE
bm25_k1: float = 1.5
bm25_b: float = 0.75
analyzer_tokenizer: str = "standard"
analyzer_enable_lowercase: bool = True
analyzer_max_token_length: int | None = 40
analyzer_stop_words: str | None = None
drop_ratio_search: float | None = None

def index_param(self) -> dict:
params = {
"inverted_index_algo": self.inverted_index_algo,
}
if self.bm25_k1 is not None:
params["bm25_k1"] = self.bm25_k1
if self.bm25_b is not None:
params["bm25_b"] = self.bm25_b

# Build analyzer parameters
analyzer_params = {"type": "english"}
# Set tokenizer
if self.analyzer_tokenizer:
analyzer_params["tokenizer"] = self.analyzer_tokenizer
# Build filters array
filters = []
if self.analyzer_enable_lowercase:
filters.append("lowercase")

if self.analyzer_max_token_length:
filters.append({"type": "length", "max": self.analyzer_max_token_length})

if self.analyzer_stop_words:
stop_words = [word.strip() for word in self.analyzer_stop_words.split(",") if word.strip()]
if stop_words:
filters.append({"type": "stop", "stop_words": stop_words})

if filters:
analyzer_params["filter"] = filters

return {
"index_type": self.index_type,
"metric_type": self.metric_type,
"params": params,
"analyzer_params": analyzer_params,
}

def search_param(self) -> dict:

params: dict = {}
if self.drop_ratio_search is not None:
params["drop_ratio_search"] = self.drop_ratio_search
return {
"metric_type": self.metric_type,
"params": params,
}


_milvus_case_config = {
IndexType.AUTOINDEX: AutoIndexConfig,
IndexType.FTS_AUTOINDEX: MilvusFtsConfig,
IndexType.HNSW: HNSWConfig,
IndexType.HNSW_SQ: HNSWSQConfig,
IndexType.HNSW_PQ: HNSWPQConfig,
Expand Down
Loading