Skip to content
Draft
11 changes: 7 additions & 4 deletions pgdog/src/frontend/router/parser/cache/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{collections::HashSet, ops::Deref};
use parking_lot::Mutex;
use std::sync::Arc;

use super::super::{
comment::comment, Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table,
};
use super::super::{Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table};
use super::{Fingerprint, Stats};
use crate::backend::schema::Schema;
use crate::frontend::router::parser::rewrite::statement::RewritePlan;
Expand Down Expand Up @@ -72,6 +70,8 @@ impl Ast {
schema: &ShardingSchema,
db_schema: &Schema,
prepared_statements: &mut PreparedStatements,
comment_shard: Option<Shard>,
comment_role: Option<Role>,
user: &str,
search_path: Option<&ParameterValue>,
) -> Result<Self, Error> {
Expand All @@ -81,7 +81,6 @@ impl Ast {
QueryParserEngine::PgQueryRaw => parse_raw(query),
}
.map_err(Error::PgQuery)?;
let (comment_shard, comment_role) = comment(query, schema)?;
let fingerprint =
Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?;

Expand Down Expand Up @@ -125,12 +124,16 @@ impl Ast {
query: &BufferedQuery,
ctx: &super::AstContext<'_>,
prepared_statements: &mut PreparedStatements,
shard: Option<Shard>,
role: Option<Role>,
) -> Result<Self, Error> {
Self::new(
query,
&ctx.sharding_schema,
&ctx.db_schema,
prepared_statements,
shard,
role,
ctx.user,
ctx.search_path,
)
Expand Down
41 changes: 38 additions & 3 deletions pgdog/src/frontend/router/parser/cache/cache_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use lru::LruCache;
use once_cell::sync::Lazy;
use pg_query::normalize;
use pgdog_config::QueryParserEngine;
use std::borrow::Cow;
use std::collections::HashMap;
use std::time::Duration;

Expand All @@ -11,6 +12,7 @@ use tracing::debug;

use super::super::{Error, Route};
use super::{Ast, AstContext};
use crate::frontend::router::parser::comment;
use crate::frontend::{BufferedQuery, PreparedStatements};

static CACHE: Lazy<Cache> = Lazy::new(Cache::new);
Expand Down Expand Up @@ -97,6 +99,10 @@ impl Cache {
/// Parse a statement by either getting it from cache
/// or using pg_query parser.
///
/// In the event of cache miss, we retry after removing all comments except
/// for pgdog metadata. We retain it for correctness, since a query with
/// that metadata must not map to an identical query without it.
///
/// N.B. There is a race here that allows multiple threads to
/// parse the same query. That's better imo than locking the data structure
/// while we parse the query.
Expand All @@ -118,12 +124,38 @@ impl Cache {
}
}

let (maybe_shard, maybe_role, maybe_filtered_query) =
comment::parse_comment(&query, &ctx.sharding_schema)?;

let query_to_cache: Cow<'_, str>;

if let Some(filtered_query) = maybe_filtered_query {
query_to_cache = Cow::Owned(filtered_query);

// Check cache again after removing comments from query
let mut guard = self.inner.lock();

let ast = guard.queries.get_mut(&*query_to_cache).map(|entry| {
entry.stats.lock().hits += 1;
entry.clone()
});

if let Some(ast) = ast {
guard.stats.hits += 1;
return Ok(ast);
}
} else {
query_to_cache = Cow::Borrowed(query.query());
}

// Parse query without holding lock.
let entry = Ast::with_context(query, ctx, prepared_statements)?;
let entry = Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?;
let parse_time = entry.stats.lock().parse_time;

let mut guard = self.inner.lock();
guard.queries.put(query.query().to_string(), entry.clone());
guard
.queries
.put(query_to_cache.into_owned(), entry.clone());
guard.stats.misses += 1;
guard.stats.parse_time += parse_time;

Expand All @@ -138,7 +170,10 @@ impl Cache {
ctx: &AstContext<'_>,
prepared_statements: &mut PreparedStatements,
) -> Result<Ast, Error> {
let mut entry = Ast::with_context(query, ctx, prepared_statements)?;
let (maybe_shard, maybe_role, _) = comment::parse_comment(&query, &ctx.sharding_schema)?;

let mut entry =
Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?;
entry.cached = false;

let parse_time = entry.stats.lock().parse_time;
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/frontend/router/parser/cache/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ fn test_tables_list() {
"DELETE FROM private_schema.test",
"DROP TABLE private_schema.test",
] {
let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, "", None).unwrap();
let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, None, None, "", None).unwrap();
let tables = ast.tables();
println!("{:?}", tables);
}
Expand Down
146 changes: 118 additions & 28 deletions pgdog/src/frontend/router/parser/comment.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use once_cell::sync::Lazy;
use pg_query::protobuf::ScanToken;
use pg_query::scan_raw;
use pg_query::{protobuf::Token, scan};
use pgdog_config::QueryParserEngine;
Expand All @@ -24,23 +25,26 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> {
.map(|m| m.as_str())
}

/// Extract shard number from a comment.
/// Extract shard number from a comment. Additionally returns the entire
/// comment string if it exists.
///
/// Comment style uses the C-style comments (not SQL comments!)
/// Comment style for the shard metadata uses the C-style comments (not SQL comments!)
/// as to allow the comment to appear anywhere in the query.
///
/// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect.
///
pub fn comment(
pub fn parse_comment(
query: &str,
schema: &ShardingSchema,
) -> Result<(Option<Shard>, Option<Role>), Error> {
) -> Result<(Option<Shard>, Option<Role>, Option<String>), Error> {
let tokens = match schema.query_parser_engine {
QueryParserEngine::PgQueryProtobuf => scan(query),
QueryParserEngine::PgQueryRaw => scan_raw(query),
}
.map_err(Error::PgQuery)?;
let mut shard = None;
let mut role = None;
let mut filtered_query = None;

for token in tokens.tokens.iter() {
if token.token == Token::CComment as i32 {
Expand All @@ -57,33 +61,99 @@ pub fn comment(
if let Some(cap) = SHARDING_KEY.captures(comment) {
if let Some(sharding_key) = get_matched_value(&cap) {
if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) {
return Ok((Some(schema.shard().into()), role));
shard = Some(schema.shard().into());
} else {
let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)?
.shards(schema.shards)
.build()?;
shard = Some(ctx.apply()?);
}
let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)?
.shards(schema.shards)
.build()?;
return Ok((Some(ctx.apply()?), role));
}
}
if let Some(cap) = SHARD.captures(comment) {
if let Some(shard) = cap.get(1) {
return Ok((
Some(
shard
.as_str()
.parse::<usize>()
.ok()
.map(Shard::Direct)
.unwrap_or(Shard::All),
),
role,
));
if let Some(s) = cap.get(1) {
shard = Some(
s.as_str()
.parse::<usize>()
.ok()
.map(Shard::Direct)
.unwrap_or(Shard::All),
);
}
}
}
}

Ok((None, role))
if has_comments(&tokens.tokens) {
filtered_query = Some(remove_comments(
query,
&tokens.tokens,
Some(&[&SHARD, &*SHARDING_KEY, &ROLE]),
)?);
}

Ok((shard, role, filtered_query))
}

pub fn has_comments(tokenized_query: &Vec<ScanToken>) -> bool {
tokenized_query
.iter()
.any(|st| st.token == Token::CComment as i32 || st.token == Token::SqlComment as i32)
}

pub fn remove_comments(
query: &str,
tokenized_query: &Vec<ScanToken>,
except: Option<&[&Regex]>,
) -> Result<String, Error> {
let mut cursor = 0;
let mut out = String::with_capacity(query.len());
let mut metadata = Vec::with_capacity(3);

for st in tokenized_query {
let start = st.start as usize;
let end = st.end as usize;

out.push_str(&query[cursor..start]);

match st.token {
t if t == Token::CComment as i32 => {
let comment = &query[start..end];

if let Some(except) = except {
let m = keep_only_matching(comment, except);

metadata.push(m.to_string());
}
}
_ => {
out.push_str(&query[start..end]);
}
}

cursor = end;
}

if cursor < query.len() {
out.push_str(&query[cursor..]);
}

metadata.sort_unstable();
out.insert_str(0, &metadata.join(""));

Ok(out)
}

fn keep_only_matching(comment: &str, regs: &[&Regex]) -> String {
let mut out = String::new();

for reg in regs {
for m in reg.find_iter(comment) {
out.push_str(m.as_str());
}
}

out
}

#[cfg(test)]
Expand Down Expand Up @@ -166,7 +236,7 @@ mod tests {
};

let query = "SELECT * FROM users /* pgdog_role: primary */";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.1, Some(Role::Primary));
}

Expand All @@ -181,11 +251,31 @@ mod tests {
};

let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 2 */";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.0, Some(Shard::Direct(2)));
assert_eq!(result.1, Some(Role::Replica));
}

#[test]
fn test_remove_comments_no_exceptions() {
let query = "SELECT * FROM table /* comment */ WHERE id = 1";
let tokens = scan(query).unwrap().tokens;

let result = remove_comments(query, &tokens, None).unwrap();

assert_eq!(result, "SELECT * FROM table WHERE id = 1");
}

#[test]
fn test_remove_comments_with_exceptions() {
let query = "SELECT /* comment */ * FROM table /* pgdog_shard: 4 comment */ WHERE id = 1";
let tokens = scan(query).unwrap().tokens;

let result = remove_comments(query, &tokens, Some(&[&SHARD])).unwrap();

assert_eq!(result, "pgdog_shard: 4SELECT * FROM table WHERE id = 1");
}

#[test]
fn test_replica_role_detection() {
use crate::backend::ShardedTables;
Expand All @@ -197,7 +287,7 @@ mod tests {
};

let query = "SELECT * FROM users /* pgdog_role: replica */";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.1, Some(Role::Replica));
}

Expand All @@ -212,7 +302,7 @@ mod tests {
};

let query = "SELECT * FROM users /* pgdog_role: invalid */";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.1, None);
}

Expand All @@ -227,7 +317,7 @@ mod tests {
};

let query = "SELECT * FROM users";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.1, None);
}

Expand All @@ -252,7 +342,7 @@ mod tests {
};

let query = "SELECT * FROM users /* pgdog_sharding_key: sales */";
let result = comment(query, &schema).unwrap();
let result = parse_comment(query, &schema).unwrap();
assert_eq!(result.0, Some(Shard::Direct(1)));
}
}
Loading