Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 66 additions & 36 deletions src/sinks/topsql_data_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ use crate::sources::topsql_v2::upstream::consts::{
};

use lazy_static::lazy_static;

const TABLE_TOPSQL_TIDB: &str = "topsql_tidb";
const TABLE_TOPSQL_TIKV: &str = "topsql_tikv";
lazy_static! {
static ref TOPSQL_SCHEMA: serde_json::Map<String, serde_json::Value> = {
let mut schema_info = serde_json::Map::new();
Expand Down Expand Up @@ -88,6 +91,13 @@ lazy_static! {
"is_nullable": true
}),
);
schema_info.insert(
LABEL_INSTANCE_KEY.into(),
serde_json::json!({
"mysql_type": "text",
"is_nullable": true
}),
);
schema_info.insert(
METRIC_NAME_CPU_TIME_MS.into(),
serde_json::json!({
Expand Down Expand Up @@ -455,17 +465,31 @@ impl TopSQLDeltaLakeSink {
}

fn extract_table_name(log_event: &LogEvent) -> Option<String> {
log_event
if let Some(source_table) = log_event
.get(LABEL_SOURCE_TABLE)
.and_then(|value| value.as_str())
{
if source_table == SOURCE_TABLE_TOPRU {
return Some(source_table.to_string());
}
}

match log_event
.get(LABEL_INSTANCE_KEY)
.and_then(|value| value.as_str())
.map(|value| value.to_string())
.or_else(|| {
log_event
.get(LABEL_SOURCE_TABLE)
.and_then(|value| value.as_str())
.filter(|value| *value == SOURCE_TABLE_TOPRU)
.map(|value| value.to_string())
})
{
Some(instance_key) if instance_key.starts_with("topsql_tidb_") => {
Some(TABLE_TOPSQL_TIDB.to_string())
}
Some(instance_key) if instance_key.starts_with("topsql_tikv_") => {
Some(TABLE_TOPSQL_TIKV.to_string())
}
Some(instance_key) => {
error!("Unexpected TopSQL instance_key format: {}", instance_key);
None
}
None => None,
}
}

async fn resolve_writer_key(
Expand Down Expand Up @@ -519,41 +543,34 @@ impl TopSQLDeltaLakeSink {
}

fn build_table_path(&self, table_name: &str, route: Option<&KeyspaceRoute>) -> PathBuf {
let (table_type, table_instance) = Self::table_partition_values(table_name);
let Some(component) = Self::component_name(table_name) else {
error!(
"Unexpected table_name format (expected grouped TopSQL/TiKV name or `topsql_topru`): {}",
table_name
);
return Self::join_path(&self.base_path, &["type=topsql", "component=unknown"]);
};

let mut segments = Vec::new();
if let Some(route) = route {
segments.push(format!("org={}", route.org_id));
segments.push(format!("cluster={}", route.cluster_id));
}
segments.push(format!("type=topsql_{}", table_type));
segments.push(format!("instance={}", table_instance));
segments.push("type=topsql".to_string());
segments.push(format!("component={}", component));

let segment_refs: Vec<&str> = segments.iter().map(|segment| segment.as_str()).collect();
Self::join_path(&self.base_path, &segment_refs)
}

fn table_partition_values(table_name: &str) -> (&str, &str) {
if table_name == SOURCE_TABLE_TOPRU {
("topru", "default")
} else {
match table_name
.strip_prefix("topsql_")
.and_then(|rest| rest.split_once('_'))
{
Some((table_type, table_instance))
if !table_type.is_empty() && !table_instance.is_empty() =>
{
(table_type, table_instance)
}
_ => {
error!(
"Unexpected table_name format (expected `topsql_{{type}}_{{instance}}` or `topsql_topru`): {}",
table_name
);
("unknown", "unknown")
}
}
fn component_name(table_name: &str) -> Option<&'static str> {
match table_name {
SOURCE_TABLE_TOPRU => Some("topru"),
TABLE_TOPSQL_TIDB => Some("tidb"),
TABLE_TOPSQL_TIKV => Some("tikv"),
_ if table_name.starts_with("topsql_tidb_") => Some("tidb"),
_ if table_name.starts_with("topsql_tikv_") => Some("tikv"),
_ => None,
}
}

Expand Down Expand Up @@ -758,13 +775,13 @@ mod tests {
assert_eq!(
table_path,
PathBuf::from(
"s3://o11y-prod-shared-us-west-2-premium/deltalake/org=1369847559692509642/cluster=10110362358366286743/type=topsql_tidb/instance=127.0.0.1:10080"
"s3://o11y-prod-shared-us-west-2-premium/deltalake/org=1369847559692509642/cluster=10110362358366286743/type=topsql/component=tidb"
)
);
}

#[test]
fn test_build_table_path_without_meta_route_preserves_existing_layout() {
fn test_build_table_path_without_meta_route_uses_component_layout() {
let (sink, _) = TopSQLDeltaLakeSink::new_for_test(
PathBuf::from("/tmp/deltalake"),
vec![],
Expand All @@ -781,7 +798,20 @@ mod tests {

assert_eq!(
table_path,
PathBuf::from("/tmp/deltalake/type=topsql_topru/instance=default")
PathBuf::from("/tmp/deltalake/type=topsql/component=topru")
);
}

#[test]
fn test_extract_table_name_groups_instance_key_by_component() {
let mut event = Event::Log(LogEvent::default());
event
.as_mut_log()
.insert(LABEL_INSTANCE_KEY, "topsql_tidb_127.0.0.1:10080");

assert_eq!(
TopSQLDeltaLakeSink::extract_table_name(event.as_log()),
Some(TABLE_TOPSQL_TIDB.to_string())
);
}

Expand Down
30 changes: 24 additions & 6 deletions src/sinks/topsql_meta_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,31 @@ impl TopSQLDeltaLakeSink {
}

fn build_table_path(&self, table_name: &str, route: Option<&KeyspaceRoute>) -> PathBuf {
let Some(component) = Self::component_name(table_name) else {
error!("Unknown TopSQL meta table_name: {}", table_name);
return Self::join_path(&self.base_path, &["type=topsql", "component=unknown"]);
};

let mut segments = Vec::new();
if let Some(route) = route {
segments.push(format!("org={}", route.org_id));
segments.push(format!("cluster={}", route.cluster_id));
}
segments.push(format!("type={}", table_name));
segments.push("type=topsql".to_string());
segments.push(format!("component={}", component));

let segment_refs: Vec<&str> = segments.iter().map(|segment| segment.as_str()).collect();
Self::join_path(&self.base_path, &segment_refs)
}

fn component_name(table_name: &str) -> Option<&'static str> {
match table_name {
SOURCE_TABLE_TOPSQL_SQL_META => Some("topsql_meta_sql"),
SOURCE_TABLE_TOPSQL_PLAN_META => Some("topsql_meta_plan"),
_ => None,
}
}

fn is_cloud_path(base_path: &PathBuf) -> bool {
let s = base_path.to_string_lossy();
s.starts_with("s3://") || s.starts_with("abfss://") || s.starts_with("gs://")
Expand Down Expand Up @@ -729,13 +743,13 @@ mod tests {
assert_eq!(
table_path,
PathBuf::from(
"s3://o11y-prod-shared-us-west-2-premium/deltalake/org=30018/cluster=10762701230946915645/type=topsql_sql_meta"
"s3://o11y-prod-shared-us-west-2-premium/deltalake/org=30018/cluster=10762701230946915645/type=topsql/component=topsql_meta_sql"
)
);
}

#[test]
fn test_build_table_path_without_meta_route_preserves_existing_layout() {
fn test_build_table_path_without_meta_route_uses_component_layout() {
let (sink, _) = TopSQLDeltaLakeSink::new_for_test(
PathBuf::from("/tmp/deltalake"),
vec![],
Expand All @@ -753,7 +767,7 @@ mod tests {

assert_eq!(
table_path,
PathBuf::from("/tmp/deltalake/type=topsql_plan_meta")
PathBuf::from("/tmp/deltalake/type=topsql/component=topsql_meta_plan")
);
}

Expand All @@ -774,11 +788,15 @@ mod tests {
let log_event = create_sql_meta_event("test_keyspace", "sql_digest_1", "2026-03-16");
let route_a = WriterKey {
table_name: SOURCE_TABLE_TOPSQL_SQL_META.to_string(),
table_path: PathBuf::from("/tmp/deltalake/org=30018/cluster=101/type=topsql_sql_meta"),
table_path: PathBuf::from(
"/tmp/deltalake/org=30018/cluster=101/type=topsql/component=topsql_meta_sql",
),
};
let route_b = WriterKey {
table_name: SOURCE_TABLE_TOPSQL_SQL_META.to_string(),
table_path: PathBuf::from("/tmp/deltalake/org=30019/cluster=102/type=topsql_sql_meta"),
table_path: PathBuf::from(
"/tmp/deltalake/org=30019/cluster=102/type=topsql/component=topsql_meta_sql",
),
};

let key_a = sink.extract_event_key(&log_event, &route_a);
Expand Down
6 changes: 3 additions & 3 deletions src/sources/file_list/arch.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ The `file_list` source lists and filters files (or Delta Lake table paths) from
| **raw_logs** | Gzip-compressed raw logs | `diagnosis/data/{cluster_id}/merged-logs/{YYYYMMDDHH}/tidb/*.log` |
| **slowlog** | Delta Lake slowlog table | `deltalake/{project_id}/{uuid}/slowlogs/` (discovered) |
| **sql_statement** | Delta Lake sqlstatement table | `deltalake/{project_id}/{uuid}/sqlstatement/` (discovered) |
| **top_sql** | Delta Lake TopSQL per instance | `deltalake/org={project_id}/cluster={cluster_id}/type=topsql_tidb/instance=*` |
| **top_sql** | Delta Lake TiDB TopSQL table | `deltalake/org={project_id}/cluster={cluster_id}/type=topsql/component=tidb/` |
| **conprof** | Pprof compressed files | `0/{project_id}/{conprof_org_id}/{cluster_id}/profiles/*.log.gz` |

Example URLs (for reference):

- Raw log: `.../diagnosis/data/10324983984131567830/merged-logs/2026010804/tidb/db-*-tidb-0.log`
- Slowlog: `.../deltalake/1372813089209061633/019aedbc-.../slowlogs/_delta_log/_last_checkpoint`
- TopSQL: `.../deltalake/org=1372813089209061633/cluster=10324983984131567830/type=topsql_tidb/instance=db.tidb-0/...`
- TopSQL: `.../deltalake/org=1372813089209061633/cluster=10324983984131567830/type=topsql/component=tidb/...`
- Conprof: `.../0/1372813089209061633/1372813089454544954/10324983984131567830/profiles/1767830400-pd-cpu-....log.gz`

## Architecture
Expand All @@ -39,7 +39,7 @@ file_list/
├── checkpoint.rs # Checkpoint load/save (completed prefix keys for OOM/restart recovery)
├── path_resolver.rs # DataTypeKind enum and path resolution (cluster_id + types + time → list requests)
├── controller.rs # Runs list (legacy or by-request) and emits events
├── file_lister.rs # list_files_at, list_delta_table_paths, list_topsql_instance_paths
├── file_lister.rs # list_files_at, list_delta_table_paths, list_topsql_table_paths
└── object_store_builder.rs # Multi-cloud ObjectStore builder
```

Expand Down
2 changes: 1 addition & 1 deletion src/sources/file_list/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ impl Controller {
}
let paths = self
.file_lister
.list_topsql_instance_paths(&t.list_prefix)
.list_topsql_table_paths(&t.list_prefix)
.await?;
let n = paths.len();
for path in &paths {
Expand Down
25 changes: 7 additions & 18 deletions src/sources/file_list/file_lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,33 +317,22 @@ impl FileLister {
Ok(out)
}

/// List TopSQL instance paths under list_prefix (deltalake/org=X/cluster=Y/type=topsql_tidb/).
/// Returns paths like "deltalake/org=X/cluster=Y/type=topsql_tidb/instance=db.tidb-0".
pub async fn list_topsql_instance_paths(&self, list_prefix: &str) -> vector::Result<Vec<String>> {
/// Return the TopSQL table root when any object exists under
/// `deltalake/org=X/cluster=Y/type=topsql/component=tidb/`.
pub async fn list_topsql_table_paths(&self, list_prefix: &str) -> vector::Result<Vec<String>> {
let prefix_path = ObjectStorePath::from(list_prefix.trim_end_matches('/'));
let mut instances = HashSet::new();
let mut stream = self.object_store.list(Some(&prefix_path));

while let Some(result) = stream.next().await {
match result {
Ok(meta) => {
let loc = meta.location.to_string();
// location is like "instance=db.tidb-0/_delta_log/..." or "instance=db.tidb-0/part.parquet"
if let Some(inst) = loc.split('/').next() {
if inst.starts_with("instance=") {
let path = format!("{}/{}", list_prefix.trim_end_matches('/'), inst);
instances.insert(path);
}
}
}
Ok(_) => return Ok(vec![list_prefix.trim_end_matches('/').to_string()]),
Err(e) => {
error!("Error listing TopSQL instances: {}", e);
error!("Error listing TopSQL table path: {}", e);
}
}
}
let mut out: Vec<_> = instances.into_iter().collect();
out.sort();
Ok(out)

Ok(Vec::new())
}

/// List immediate subdirectory names under `prefix` (e.g. prefix "diagnosis/data/o11y/merged-logs/2026020411/"
Expand Down
10 changes: 5 additions & 5 deletions src/sources/file_list/path_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum DataTypeKind {
/// Delta Lake sqlstatement table: deltalake/{project_id}/{uuid}/sqlstatement/
SqlStatement,

/// Delta Lake TopSQL per instance: deltalake/org={project_id}/cluster={cluster_id}/type=topsql_tidb/instance=*/
/// Delta Lake TopSQL table root: deltalake/org={project_id}/cluster={cluster_id}/type=topsql/component=tidb/
TopSql,

/// Conprof pprof compressed files: 0/{project_id}/{conprof_org_id}/{cluster_id}/profiles/*.log.gz
Expand Down Expand Up @@ -56,10 +56,10 @@ pub struct DeltaTableRequest {
pub table_subdir: String,
}

/// TopSQL: list instance=* under type=topsql_tidb and emit each instance path.
/// TopSQL: list the TiDB TopSQL table root under type=topsql/component=tidb.
#[derive(Debug, Clone)]
pub struct TopSqlListRequest {
/// Prefix: deltalake/org={project_id}/cluster={cluster_id}/type=topsql_tidb/
/// Prefix: deltalake/org={project_id}/cluster={cluster_id}/type=topsql/component=tidb/
pub list_prefix: String,
}

Expand Down Expand Up @@ -161,7 +161,7 @@ pub fn resolve_requests(
.ok_or("top_sql requires project_id")?;
out.push(ListRequest::TopSql(TopSqlListRequest {
list_prefix: format!(
"deltalake/org={}/cluster={}/type=topsql_tidb/",
"deltalake/org={}/cluster={}/type=topsql/component=tidb/",
pid, cluster_id
),
}));
Expand Down Expand Up @@ -352,7 +352,7 @@ mod tests {
ListRequest::TopSql(t) => {
assert_eq!(
t.list_prefix,
"deltalake/org=1372813089209061633/cluster=10324983984131567830/type=topsql_tidb/"
"deltalake/org=1372813089209061633/cluster=10324983984131567830/type=topsql/component=tidb/"
);
}
_ => panic!("expected TopSql"),
Expand Down
Loading