Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ tokio = { version = "^1.43", default-features = false, features = [
tokio-stream = { version = "0.1.17", features = ["fs"] }
tokio-util = { version = "0.7" }

# perf
hotpath = { version = "0.16.0", optional = true, features = [
"hotpath",
"hotpath-cpu",
"hotpath-alloc",
"tokio"
] }
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# # Logging and Metrics
# opentelemetry-proto = { version = "0.30.0", features = [
# "gen-tonic",
Expand Down Expand Up @@ -211,6 +219,15 @@ assets-sha1 = "a7523ef819d38678275ae165c443564b2f9a3fc1"

[features]
debug = []
hotpath = [
"dep:hotpath",
"hotpath/hotpath",
"hotpath/hotpath-cpu",
"hotpath/hotpath-alloc",
"hotpath/tokio",
]
hotpath-alloc = ["hotpath"]
hotpath-cpu = ["hotpath"]
kafka = [
"rdkafka",
"rdkafka/ssl-vendored",
Expand Down
9 changes: 9 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ pub struct Options {
)]
pub max_field_statistics: usize,

// collect statistics for dataset fields
#[arg(
long,
env = "P_COLLECT_DATASET_STATS",
default_value = "true",
help = "Collect statistics for dataset fields"
)]
pub collect_dataset_stats: bool,

#[arg(
long,
env = "P_MAX_EVENT_PAYLOAD_SIZE",
Expand Down
7 changes: 6 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl EventFormat for Event {

// convert the incoming json to a vector of json values
// also extract the arrow schema, tags and metadata from the incoming json
#[cfg_attr(feature = "hotpath", hotpath::measure)]
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
Expand Down Expand Up @@ -156,7 +157,7 @@ impl EventFormat for Event {
infer_schema.clone(),
]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?;
is_first = true;
let schema = infer_schema
let schema: Vec<Arc<Field>> = infer_schema
.fields
.iter()
.filter(|field| !field.data_type().is_null())
Expand Down Expand Up @@ -327,6 +328,10 @@ fn rename_json_keys(values: Vec<Value>) -> Result<Vec<Value>, anyhow::Error> {
.into_iter()
.map(|value| {
if let Value::Object(map) = value {
if !map.keys().any(|key| key.starts_with('@')) {
return Ok(Value::Object(map));
}

// Collect original keys to check for collisions
let original_keys: HashSet<String> = map.keys().cloned().collect();

Expand Down
23 changes: 23 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub trait EventFormat: Sized {
/// Returns the UTC time at ingestion
fn get_p_timestamp(&self) -> DateTime<Utc>;

#[cfg_attr(feature = "hotpath", hotpath::measure)]
fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
Expand Down Expand Up @@ -606,6 +607,28 @@ pub fn rename_per_record_type_mismatches(
let Value::Object(map) = value else {
return value;
};
let needs_rename = map.iter().any(|(key, val)| {
if val.is_null() {
return false;
}
let target_type = existing_schema
.get(key)
.map(|f| f.data_type())
.or_else(|| inferred_types.get(key.as_str()).copied());
let Some(target_type) = target_type else {
return false;
};
if (val.is_array() || val.is_object())
&& (target_type.is_list()
|| matches!(target_type, DataType::Struct(_) | DataType::Map(_, _)))
{
return false;
}
!value_compatible_with_type(val, target_type, schema_version)
});
if !needs_rename {
return Value::Object(map);
}
let new_map: serde_json::Map<String, Value> = map
.into_iter()
.map(|(key, val)| {
Expand Down
1 change: 1 addition & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl Event {
is_first_event = self.is_first_event
)
)]
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
Expand Down
16 changes: 2 additions & 14 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use once_cell::sync::Lazy;
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{error, info};

use crate::sync::shutdown_local_sync_flush_and_convert;
use crate::utils::get_tenant_id_from_request;
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};

Expand Down Expand Up @@ -84,20 +85,7 @@ async fn perform_sync_operations() {
}

async fn perform_local_sync() {
let mut local_sync_joinset = JoinSet::new();

// Sync staging
PARSEABLE
.streams
.flush_and_convert(&mut local_sync_joinset, false, true);

while let Some(res) = local_sync_joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}
shutdown_local_sync_flush_and_convert().await;
}

async fn perform_object_store_sync() {
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub async fn flatten_and_push_logs(
skip(json, log_source, p_custom_fields, time_partition, telemetry_type, tenant_id),
fields(stream_name, record_count = tracing::field::Empty)
)]
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn push_logs(
stream_name: &str,
json: Value,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub use datafusion;
pub use handlers::http::modal::{
ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server,
};
#[cfg(feature = "hotpath")]
pub use hotpath;
use once_cell::sync::Lazy;
pub use openid;
use parseable::PARSEABLE;
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Registry, fmt};

#[actix_web::main]
#[cfg_attr(feature = "hotpath", hotpath::main)]
async fn main() -> anyhow::Result<()> {
init_logger();
// Install the rustls crypto provider before any TLS operations.
Expand Down
2 changes: 1 addition & 1 deletion src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {

if log_record.body.is_some() {
let body = &log_record.body;
let body_json = collect_json_from_values(body, &"body".to_string());
let body_json = collect_json_from_values(body, "body");
for (key, value) in &body_json {
// Always insert the original body field as is
log_record_json.insert(key.clone(), value.clone());
Expand Down
Loading
Loading