Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7ce3e3e
fix topru proto
zhoucai-pingcap Mar 9, 2026
387215d
change type to component
zhoucai-pingcap Mar 16, 2026
6aaf6af
fix native jeprof issue
zhoucai-pingcap Mar 18, 2026
8667b91
remove arm vector first
zhoucai-pingcap Mar 19, 2026
ddbe3a1
native jeprof support heap_v2
zhoucai-pingcap Mar 19, 2026
71b0006
topsql: add manager discovery and topru keyspace routing
zeminzhou Mar 22, 2026
6dbf0fa
topsql: infer topru keyspace from user
zeminzhou Mar 22, 2026
b7cdb9c
Revert "topsql: infer topru keyspace from user"
zeminzhou Mar 22, 2026
f56120c
feat: shard manager tidb by keyspace name
zeminzhou Mar 23, 2026
167a2de
fix: address manager topology review feedback
zeminzhou Mar 23, 2026
b8f997f
feat: add tikv topsql collection switch
zeminzhou Mar 23, 2026
9e8807f
feat: update topru storage path layout
zeminzhou Mar 23, 2026
6c2ebe5
Revert "feat: update topru storage path layout"
zeminzhou Mar 24, 2026
f6c4670
topsql: restore component-based data path layout
zeminzhou Mar 24, 2026
835aad0
Merge remote-tracking branch 'origin/0.49' into HEAD
zeminzhou Mar 24, 2026
54c1fda
topru: address remaining review feedback
zeminzhou Mar 24, 2026
17a2eb7
topru: finish remaining review fixes
zeminzhou Mar 24, 2026
c212665
Add keyspace routing for TopSQL meta events
zeminzhou Mar 25, 2026
665ee2e
Validate keyspace routing path templates
zeminzhou Mar 25, 2026
dabaa59
refactor: simplify manager response parsing and address review feedback
zeminzhou Mar 26, 2026
da53898
fix: treat empty manager active TiDB response as valid
zeminzhou Mar 26, 2026
44d6dcb
fix: defer dedup LRU commit until flush succeeds to prevent data loss…
zeminzhou Mar 26, 2026
4d3e974
fix: commit meta dedup keys only after successful writes
zeminzhou Mar 26, 2026
1ab14e7
Merge remote-tracking branch 'tmp/0.49' into HEAD
zeminzhou Mar 30, 2026
a4e4233
Merge remote-tracking branch 'origin/0.49' into codex/topru-manager-k…
zeminzhou Mar 30, 2026
70c71e9
refactor: extract route_resolution_retry_delay to common keyspace_clu…
zeminzhou Mar 30, 2026
3610661
revert unused changes
zeminzhou Apr 8, 2026
b65bea7
fix: deduplicate concurrent PD keyspace lookups with per-keyspace loc…
zeminzhou Apr 8, 2026
a232fa9
fix: clean up keyspace_locks on all exit paths to prevent unbounded g…
zeminzhou Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
592 changes: 592 additions & 0 deletions src/common/keyspace_cluster.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod checkpointer;
pub mod deltalake_s3;
pub mod deltalake_writer;
pub mod features;
pub mod keyspace_cluster;
pub mod topology;
10 changes: 10 additions & 0 deletions src/common/topology/arch.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Components (Sources/Sinks)
- **TiKV**: Discovers TiKV store instances
- **TiFlash**: Discovers TiFlash instances
- **Store**: Discovers store information
- **Manager-based TiDB discovery**: In legacy mode, TiDB instances can be fetched from manager `/api/tidb/get_active_tidb`

### Next-Gen Support

Expand All @@ -71,6 +72,15 @@ pub struct TopologyFetcher {
}
```

Legacy manager-based TiDB discovery also supports keyspace-aware sharding:

- `manager_server_address`: fetch active TiDB instances from manager instead of etcd
- `tidb_namespace`: namespace list sent to `/api/tidb/get_active_tidb?namespace=...`; required when `manager_server_address` is set
- `VECTOR_STS_REPLICA_COUNT`: total number of Vector StatefulSet replicas
- `VECTOR_STS_ID`: current Vector StatefulSet ordinal

When both shard envs are set, the manager response must include `keyspace_name`. The topology fetcher hashes `keyspace_name`, applies modulo `VECTOR_STS_REPLICA_COUNT`, and only keeps entries whose shard matches `VECTOR_STS_ID`. Entries without `keyspace_name` are skipped in this mode.

## Topology Data

### Component Information
Expand Down
124 changes: 121 additions & 3 deletions src/common/topology/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod models;
mod pd;
mod store;
mod tidb;
mod tidb_manager;
mod utils;

pub mod tidb_nextgen;
Expand All @@ -21,6 +22,20 @@ use vector::config::ProxyConfig;
use vector::http::HttpClient;
use vector::tls::{MaybeTlsSettings, TlsConfig};

pub(super) fn normalize_namespace_list(namespaces: Option<&str>) -> Option<String> {
let namespaces = namespaces?;
let normalized = namespaces
.split(',')
.map(str::trim)
.filter(|namespace| !namespace.is_empty())
.collect::<Vec<_>>();
if normalized.is_empty() {
None
} else {
Some(normalized.join(","))
}
}

#[derive(Debug, Snafu)]
pub enum FetchError {
#[snafu(display("Failed to build TLS settings: {}", source))]
Expand All @@ -43,6 +58,10 @@ pub enum FetchError {
FetchPDTopology { source: pd::FetchError },
#[snafu(display("Failed to fetch tidb topology: {}", source))]
FetchTiDBTopology { source: tidb::FetchError },
#[snafu(display("Failed to fetch tidb topology from manager server: {}", source))]
FetchTiDBFromManagerServerTopology { source: tidb_manager::FetchError },
#[snafu(display("Failed to read manager shard config: {}", source))]
ReadManagerShardConfig { source: tidb_manager::FetchError },
#[snafu(display("Failed to fetch store topology: {}", source))]
FetchStoreTopology { source: store::FetchError },
#[snafu(display("Failed to fetch tidb nextgen topology: {}", source))]
Expand All @@ -56,22 +75,42 @@ pub enum FetchError {
// Legacy topology fetcher
pub struct LegacyTopologyFetcher {
pd_address: String,
manager_server_address: Option<String>,
tidb_namespace: Option<String>,
manager_shard_config: Option<tidb_manager::ManagerShardConfig>,
http_client: HttpClient<hyper::Body>,
pub etcd_client: etcd_client::Client,
}

impl LegacyTopologyFetcher {
pub async fn new(
pd_address: String,
manager_server_address: Option<String>,
tidb_namespace: Option<String>,
tls_config: Option<TlsConfig>,
proxy_config: &ProxyConfig,
) -> Result<Self, FetchError> {
let pd_address = Self::polish_address(pd_address, &tls_config)?;
let manager_server_address = manager_server_address
.map(Self::polish_manager_server_address)
.transpose()?;
let tidb_namespace =
Self::normalize_tidb_namespace(manager_server_address.as_deref(), tidb_namespace)?;
let manager_shard_config = if manager_server_address.is_some() {
// Shard env vars are process-scoped, so we parse them once during fetcher init.
tidb_manager::read_manager_shard_config_from_env()
.context(ReadManagerShardConfigSnafu)?
} else {
None
};
let http_client = Self::build_http_client(tls_config.as_ref(), proxy_config)?;
let etcd_client = Self::build_etcd_client(&pd_address, &tls_config).await?;

Ok(Self {
pd_address,
manager_server_address,
tidb_namespace,
manager_shard_config,
http_client,
etcd_client,
})
Expand All @@ -85,10 +124,22 @@ impl LegacyTopologyFetcher {
.get_up_pds(components)
.await
.context(FetchPDTopologySnafu)?;
tidb::TiDBTopologyFetcher::new(&mut self.etcd_client)
if let Some(manager_server_address) = self.manager_server_address.as_deref() {
tidb_manager::TiDBManagerTopologyFetcher::new(
manager_server_address,
self.tidb_namespace.as_deref(),
&self.http_client,
self.manager_shard_config,
)
.get_up_tidbs(components)
.await
.context(FetchTiDBTopologySnafu)?;
.context(FetchTiDBFromManagerServerTopologySnafu)?;
} else {
tidb::TiDBTopologyFetcher::new(&mut self.etcd_client)
.get_up_tidbs(components)
.await
.context(FetchTiDBTopologySnafu)?;
}
store::StoreTopologyFetcher::new(&self.pd_address, &self.http_client)
.get_up_stores(components)
.await
Expand All @@ -114,6 +165,33 @@ impl LegacyTopologyFetcher {
Ok(address)
}

fn normalize_tidb_namespace(
manager_server_address: Option<&str>,
tidb_namespace: Option<String>,
) -> Result<Option<String>, FetchError> {
let tidb_namespace = normalize_namespace_list(tidb_namespace.as_deref());

if manager_server_address.is_some() && tidb_namespace.is_none() {
return Err(FetchError::ConfigurationError {
message: "tidb_namespace is required when manager_server_address is configured"
.to_string(),
});
}

Ok(tidb_namespace)
}

fn polish_manager_server_address(mut address: String) -> Result<String, FetchError> {
let uri: hyper::Uri = address.parse().context(ParseAddressSnafu)?;
if uri.scheme().is_none() {
address = format!("http://{address}");
}
if address.ends_with('/') {
address.pop();
}
Ok(address)
}

fn build_http_client(
tls_config: Option<&TlsConfig>,
proxy_config: &ProxyConfig,
Expand Down Expand Up @@ -234,6 +312,8 @@ impl TopologyFetcher {
/// Create a new topology fetcher based on the current feature configuration
pub async fn new(
pd_address: Option<String>,
manager_server_address: Option<String>,
tidb_namespace: Option<String>,
tls_config: Option<TlsConfig>,
proxy_config: &ProxyConfig,
tidb_group: Option<String>,
Expand All @@ -252,7 +332,14 @@ impl TopologyFetcher {
let pd_address = pd_address.ok_or_else(|| FetchError::ConfigurationError {
message: "PD address is required in legacy mode".to_string(),
})?;
let fetcher = LegacyTopologyFetcher::new(pd_address, tls_config, proxy_config).await?;
let fetcher = LegacyTopologyFetcher::new(
pd_address,
manager_server_address,
tidb_namespace,
tls_config,
proxy_config,
)
.await?;
Ok(Self {
inner: TopologyFetcherImpl::Legacy(Box::new(fetcher)),
})
Expand Down Expand Up @@ -313,3 +400,34 @@ impl TopologyFetcher {
// println!("{:?}", components);
// }
// }

#[cfg(test)]
mod tests {
use super::LegacyTopologyFetcher;

#[test]
fn normalize_tidb_namespace_requires_value_when_manager_is_configured() {
let err =
LegacyTopologyFetcher::normalize_tidb_namespace(Some("http://manager:8080"), None)
.expect_err("expected missing namespace to fail");

assert!(matches!(err, super::FetchError::ConfigurationError { .. }));
}

#[test]
fn normalize_tidb_namespace_trims_and_joins_values() {
let normalized = LegacyTopologyFetcher::normalize_tidb_namespace(
Some("http://manager:8080"),
Some(" ns-a, ns-b , ".to_string()),
)
.unwrap();

assert_eq!(normalized.as_deref(), Some("ns-a,ns-b"));
}

#[test]
fn normalize_tidb_namespace_allows_missing_value_without_manager() {
let normalized = LegacyTopologyFetcher::normalize_tidb_namespace(None, None).unwrap();
assert_eq!(normalized, None);
}
}
Loading
Loading