Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions rust/examples/json/batch/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::error::Error;

use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions,
TableProperties, ZerobusSdk, ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream};
use serde::Serialize;

/// Order struct that can be automatically serialized to JSON using JsonValue wrapper.
Expand Down Expand Up @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
// Not needed for JSON.
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.json()
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
26 changes: 6 additions & 20 deletions rust/examples/json/single/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::error::Error;

use databricks_zerobus_ingest_sdk::{
databricks::zerobus::RecordType, JsonString, JsonValue, StreamConfigurationOptions,
TableProperties, ZerobusSdk, ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{JsonString, JsonValue, ZerobusSdk, ZerobusStream};
use serde::Serialize;

/// Order struct that can be automatically serialized to JSON using JsonValue wrapper.
Expand Down Expand Up @@ -36,28 +33,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
// Not needed for JSON.
descriptor_proto: None,
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
record_type: RecordType::Json,
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.json()
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
25 changes: 6 additions & 19 deletions rust/examples/proto/batch/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::fs;
use prost::Message;
use prost_reflect::prost_types;

use databricks_zerobus_ingest_sdk::{
ProtoBytes, ProtoMessage, StreamConfigurationOptions, TableProperties, ZerobusSdk,
ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{ProtoBytes, ProtoMessage, ZerobusSdk, ZerobusStream};

pub mod orders {
include!("../output/orders.rs");
Expand All @@ -33,27 +30,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto =
load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders");
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: Some(descriptor_proto),
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
// RecordType::Proto is the default.
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.compiled_proto(descriptor_proto)
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
25 changes: 6 additions & 19 deletions rust/examples/proto/single/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::fs;
use prost::Message;
use prost_reflect::prost_types;

use databricks_zerobus_ingest_sdk::{
ProtoBytes, ProtoMessage, StreamConfigurationOptions, TableProperties, ZerobusSdk,
ZerobusStream,
};
use databricks_zerobus_ingest_sdk::{ProtoBytes, ProtoMessage, ZerobusSdk, ZerobusStream};

pub mod orders {
include!("../output/orders.rs");
Expand All @@ -33,27 +30,17 @@ const SERVER_ENDPOINT: &str = "https://<your-shard-id>.zerobus.<region>.cloud.da
async fn main() -> Result<(), Box<dyn Error>> {
let descriptor_proto =
load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders");
let table_properties = TableProperties {
table_name: TABLE_NAME.to_string(),
descriptor_proto: Some(descriptor_proto),
};
let stream_configuration_options = StreamConfigurationOptions {
max_inflight_requests: 100,
// RecordType::Proto is the default.
..Default::default()
};
let sdk_handle = ZerobusSdk::builder()
.endpoint(SERVER_ENDPOINT)
.unity_catalog_url(DATABRICKS_WORKSPACE_URL)
.build()?;

let mut stream = sdk_handle
.create_stream(
table_properties.clone(),
DATABRICKS_CLIENT_ID.to_string(),
DATABRICKS_CLIENT_SECRET.to_string(),
Some(stream_configuration_options),
)
.stream_builder(TABLE_NAME)
.oauth(DATABRICKS_CLIENT_ID, DATABRICKS_CLIENT_SECRET)
.compiled_proto(descriptor_proto)
.max_inflight_requests(100)
.build()
.await?;

ingest_with_offset_api(&mut stream).await?;
Expand Down
28 changes: 25 additions & 3 deletions rust/sdk/src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Builder API for creating Zerobus SDK instances.
//! Builder API for creating Zerobus SDK instances and ingestion streams.
//!
//! This module provides a fluent builder pattern for configuring and creating
//! SDK instances.
//! This module provides fluent builder patterns for configuring and creating
//! SDK instances and streams.
//!
//! # Examples
//!
//! ## SDK Builder
//!
//! ```no_run
//! use databricks_zerobus_ingest_sdk::ZerobusSdkBuilder;
//!
Expand All @@ -14,7 +16,27 @@
//! .build()?;
//! # Ok::<(), databricks_zerobus_ingest_sdk::ZerobusError>(())
//! ```
//!
//! ## Stream Builder
//!
//! ```rust,ignore
//! let stream = sdk
//! .stream_builder("catalog.schema.table")
//! .oauth("client-id", "client-secret")
//! .json()
//! .build()
//! .await?;
//! ```

mod sdk_builder;
mod stream_builder;
pub(crate) mod stream_format;

pub use sdk_builder::ZerobusSdkBuilder;
pub use stream_builder::StreamBuilder;
pub use stream_format::{
AuthReady, CompiledProto, GrpcFormat, HasAuth, Json, NoAuth, NoFormat, StreamFormat,
};

#[cfg(feature = "arrow-flight")]
pub use stream_format::Arrow;
Loading
Loading