diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index 2be0e0fb..c80c560e 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -7,9 +7,7 @@ use std::{collections::HashSet, ops::Deref}; use parking_lot::Mutex; use std::sync::Arc; -use super::super::{ - comment::comment, Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table, -}; +use super::super::{Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table}; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; use crate::frontend::router::parser::rewrite::statement::RewritePlan; @@ -72,6 +70,8 @@ impl Ast { schema: &ShardingSchema, db_schema: &Schema, prepared_statements: &mut PreparedStatements, + comment_shard: Option, + comment_role: Option, user: &str, search_path: Option<&ParameterValue>, ) -> Result { @@ -81,7 +81,6 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -125,12 +124,16 @@ impl Ast { query: &BufferedQuery, ctx: &super::AstContext<'_>, prepared_statements: &mut PreparedStatements, + shard: Option, + role: Option, ) -> Result { Self::new( query, &ctx.sharding_schema, &ctx.db_schema, prepared_statements, + shard, + role, ctx.user, ctx.search_path, ) diff --git a/pgdog/src/frontend/router/parser/cache/cache_impl.rs b/pgdog/src/frontend/router/parser/cache/cache_impl.rs index 4c50e61f..4b28343b 100644 --- a/pgdog/src/frontend/router/parser/cache/cache_impl.rs +++ b/pgdog/src/frontend/router/parser/cache/cache_impl.rs @@ -2,6 +2,7 @@ use lru::LruCache; use once_cell::sync::Lazy; use pg_query::normalize; use pgdog_config::QueryParserEngine; +use std::borrow::Cow; use std::collections::HashMap; use std::time::Duration; @@ -11,6 +12,7 @@ use tracing::debug; use super::super::{Error, Route}; use super::{Ast, AstContext}; +use crate::frontend::router::parser::comment; use crate::frontend::{BufferedQuery, PreparedStatements}; static CACHE: Lazy = Lazy::new(Cache::new); @@ -97,6 +99,10 @@ impl Cache { /// Parse a statement by either getting it from cache /// or using pg_query parser. /// + /// In the event of cache miss, we retry after removing all comments except + /// for pgdog metadata. We retain it for correctness, since a query with + /// that metadata must not map to an identical query without it. + /// /// N.B. There is a race here that allows multiple threads to /// parse the same query. That's better imo than locking the data structure /// while we parse the query. @@ -118,12 +124,38 @@ impl Cache { } } + let (maybe_shard, maybe_role, maybe_filtered_query) = + comment::parse_comment(&query, &ctx.sharding_schema)?; + + let query_to_cache: Cow<'_, str>; + + if let Some(filtered_query) = maybe_filtered_query { + query_to_cache = Cow::Owned(filtered_query); + + // Check cache again after removing comments from query + let mut guard = self.inner.lock(); + + let ast = guard.queries.get_mut(&*query_to_cache).map(|entry| { + entry.stats.lock().hits += 1; + entry.clone() + }); + + if let Some(ast) = ast { + guard.stats.hits += 1; + return Ok(ast); + } + } else { + query_to_cache = Cow::Borrowed(query.query()); + } + // Parse query without holding lock. - let entry = Ast::with_context(query, ctx, prepared_statements)?; + let entry = Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; let parse_time = entry.stats.lock().parse_time; let mut guard = self.inner.lock(); - guard.queries.put(query.query().to_string(), entry.clone()); + guard + .queries + .put(query_to_cache.into_owned(), entry.clone()); guard.stats.misses += 1; guard.stats.parse_time += parse_time; @@ -138,7 +170,10 @@ impl Cache { ctx: &AstContext<'_>, prepared_statements: &mut PreparedStatements, ) -> Result { - let mut entry = Ast::with_context(query, ctx, prepared_statements)?; + let (maybe_shard, maybe_role, _) = comment::parse_comment(&query, &ctx.sharding_schema)?; + + let mut entry = + Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; entry.cached = false; let parse_time = entry.stats.lock().parse_time; diff --git a/pgdog/src/frontend/router/parser/cache/test.rs b/pgdog/src/frontend/router/parser/cache/test.rs index f996b916..c15641c8 100644 --- a/pgdog/src/frontend/router/parser/cache/test.rs +++ b/pgdog/src/frontend/router/parser/cache/test.rs @@ -121,7 +121,7 @@ fn test_tables_list() { "DELETE FROM private_schema.test", "DROP TABLE private_schema.test", ] { - let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, "", None).unwrap(); + let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, None, None, "", None).unwrap(); let tables = ast.tables(); println!("{:?}", tables); } diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883ad..a6ad9f2c 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -1,4 +1,5 @@ use once_cell::sync::Lazy; +use pg_query::protobuf::ScanToken; use pg_query::scan_raw; use pg_query::{protobuf::Token, scan}; use pgdog_config::QueryParserEngine; @@ -24,23 +25,26 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number from a comment. Additionally returns the entire +/// comment string if it exists. /// -/// Comment style uses the C-style comments (not SQL comments!) +/// Comment style for the shard metadata uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// /// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. /// -pub fn comment( +pub fn parse_comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; + let mut shard = None; let mut role = None; + let mut filtered_query = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -57,33 +61,99 @@ pub fn comment( if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + shard = Some(schema.shard().into()); + } else { + let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? + .shards(schema.shards) + .build()?; + shard = Some(ctx.apply()?); } - let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? - .shards(schema.shards) - .build()?; - return Ok((Some(ctx.apply()?), role)); } } if let Some(cap) = SHARD.captures(comment) { - if let Some(shard) = cap.get(1) { - return Ok(( - Some( - shard - .as_str() - .parse::() - .ok() - .map(Shard::Direct) - .unwrap_or(Shard::All), - ), - role, - )); + if let Some(s) = cap.get(1) { + shard = Some( + s.as_str() + .parse::() + .ok() + .map(Shard::Direct) + .unwrap_or(Shard::All), + ); } } } } - Ok((None, role)) + if has_comments(&tokens.tokens) { + filtered_query = Some(remove_comments( + query, + &tokens.tokens, + Some(&[&SHARD, &*SHARDING_KEY, &ROLE]), + )?); + } + + Ok((shard, role, filtered_query)) +} + +pub fn has_comments(tokenized_query: &Vec) -> bool { + tokenized_query + .iter() + .any(|st| st.token == Token::CComment as i32 || st.token == Token::SqlComment as i32) +} + +pub fn remove_comments( + query: &str, + tokenized_query: &Vec, + except: Option<&[&Regex]>, +) -> Result { + let mut cursor = 0; + let mut out = String::with_capacity(query.len()); + let mut metadata = Vec::with_capacity(3); + + for st in tokenized_query { + let start = st.start as usize; + let end = st.end as usize; + + out.push_str(&query[cursor..start]); + + match st.token { + t if t == Token::CComment as i32 => { + let comment = &query[start..end]; + + if let Some(except) = except { + let m = keep_only_matching(comment, except); + + metadata.push(m.to_string()); + } + } + _ => { + out.push_str(&query[start..end]); + } + } + + cursor = end; + } + + if cursor < query.len() { + out.push_str(&query[cursor..]); + } + + metadata.sort_unstable(); + out.insert_str(0, &metadata.join("")); + + Ok(out) +} + +fn keep_only_matching(comment: &str, regs: &[&Regex]) -> String { + let mut out = String::new(); + + for reg in regs { + for m in reg.find_iter(comment) { + out.push_str(m.as_str()); + } + } + + out } #[cfg(test)] @@ -166,7 +236,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: primary */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, Some(Role::Primary)); } @@ -181,11 +251,31 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 2 */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(2))); assert_eq!(result.1, Some(Role::Replica)); } + #[test] + fn test_remove_comments_no_exceptions() { + let query = "SELECT * FROM table /* comment */ WHERE id = 1"; + let tokens = scan(query).unwrap().tokens; + + let result = remove_comments(query, &tokens, None).unwrap(); + + assert_eq!(result, "SELECT * FROM table WHERE id = 1"); + } + + #[test] + fn test_remove_comments_with_exceptions() { + let query = "SELECT /* comment */ * FROM table /* pgdog_shard: 4 comment */ WHERE id = 1"; + let tokens = scan(query).unwrap().tokens; + + let result = remove_comments(query, &tokens, Some(&[&SHARD])).unwrap(); + + assert_eq!(result, "pgdog_shard: 4SELECT * FROM table WHERE id = 1"); + } + #[test] fn test_replica_role_detection() { use crate::backend::ShardedTables; @@ -197,7 +287,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: replica */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, Some(Role::Replica)); } @@ -212,7 +302,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: invalid */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, None); } @@ -227,7 +317,7 @@ mod tests { }; let query = "SELECT * FROM users"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, None); } @@ -252,7 +342,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_sharding_key: sales */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(1))); } } diff --git a/pgdog/src/frontend/router/parser/query/explain.rs b/pgdog/src/frontend/router/parser/query/explain.rs index 6e7b382d..bf1e0f5f 100644 --- a/pgdog/src/frontend/router/parser/query/explain.rs +++ b/pgdog/src/frontend/router/parser/query/explain.rs @@ -81,6 +81,38 @@ mod tests { &cluster.sharding_schema(), &cluster.schema(), &mut stmts, + None, + None, + "", + None, + ) + .unwrap(); + let mut buffer = ClientRequest::from(vec![Query::new(sql).into()]); + buffer.ast = Some(ast); + + let params = Parameters::default(); + let ctx = RouterContext::new(&buffer, &cluster, ¶ms, None, Sticky::new()).unwrap(); + + match QueryParser::default().parse(ctx).unwrap().clone() { + Command::Query(route) => route, + _ => panic!("expected Query command"), + } + } + + fn route_comment(sql: &str) -> Route { + enable_expanded_explain(); + let cluster = Cluster::new_test(&config()); + let mut stmts = PreparedStatements::default(); + + let (shard, role, _) = comment::parse_comment(sql, &cluster.sharding_schema()).unwrap(); + + let ast = Ast::new( + &BufferedQuery::Query(Query::new(sql)), + &cluster.sharding_schema(), + &cluster.schema(), + &mut stmts, + shard, + role, "", None, ) @@ -119,6 +151,8 @@ mod tests { &cluster.sharding_schema(), &cluster.schema(), &mut stmts, + None, + None, "", None, ) @@ -244,7 +278,7 @@ mod tests { #[test] fn test_explain_with_comment_override() { - let r = route("/* pgdog_shard: 5 */ EXPLAIN SELECT * FROM sharded"); + let r = route_comment("/* pgdog_shard: 5 */ EXPLAIN SELECT * FROM sharded"); assert_eq!(r.shard(), &Shard::Direct(5)); let lines = r.explain().unwrap().render_lines(); assert_eq!(lines[3], " Shard 5: manual override to shard=5"); diff --git a/pgdog/src/frontend/router/parser/query/show.rs b/pgdog/src/frontend/router/parser/query/show.rs index f59f1c6b..7e7be98a 100644 --- a/pgdog/src/frontend/router/parser/query/show.rs +++ b/pgdog/src/frontend/router/parser/query/show.rs @@ -51,6 +51,8 @@ mod test_show { &c.sharding_schema(), &c.schema(), &mut PreparedStatements::default(), + None, + None, "", None, ) @@ -72,6 +74,8 @@ mod test_show { &c.sharding_schema(), &c.schema(), &mut PreparedStatements::default(), + None, + None, "", None, )