diff --git a/Cargo.lock b/Cargo.lock index ee89c8bda..63c95ddca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,7 +139,7 @@ dependencies = [ "snap", "strum", "strum_macros", - "thiserror", + "thiserror 2.0.18", "uuid", "zstd", ] @@ -660,8 +660,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -1684,6 +1686,7 @@ dependencies = [ "pyo3-async-runtimes", "pyo3-build-config", "pyo3-log", + "pyo3-object_store", "serde_json", "tokio", "url", @@ -2702,7 +2705,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "url", @@ -3007,6 +3010,8 @@ version = "0.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf85e27e86080aafd5a22eae58a162e133a589551542b3e5cee4beb27e54f8e1" dependencies = [ + "chrono", + "indexmap", "libc", "once_cell", "portable-atomic", @@ -3084,6 +3089,29 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "pyo3-object_store" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3056dc8a77db5d44d32e7f740db0b01e5b65a43d181487a6917576959c6eb9a" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "http", + "humantime", + "itertools", + "object_store", + "percent-encoding", + "pyo3", + "pyo3-async-runtimes", + "serde", + "thiserror 1.0.69", + "tokio", + "url", +] + [[package]] name = "quad-rand" version = "0.2.3" @@ -3114,7 +3142,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "web-time", @@ -3135,7 +3163,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror", + "thiserror 2.0.18", "tinyvec", "tracing", "web-time", @@ -3857,13 +3885,33 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -4130,7 +4178,7 @@ dependencies = [ "serde", "serde_json", "syn 2.0.117", - "thiserror", + "thiserror 2.0.18", "unicode-ident", ] diff --git a/Cargo.toml b/Cargo.toml index 346f6da3e..89d23bda6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ async-trait = "0.1.89" futures = "0.3" cstr = "0.2" object_store = { version = "0.13.1" } +pyo3-object_store = { version = "0.9" } url = "2" log = "0.4.29" parking_lot = "0.12" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3e2b01c8e..7e9b1f274 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -63,6 +63,7 @@ async-trait = { workspace = true } futures = { workspace = true } cstr = { workspace = true } object_store = { workspace = true, features = ["aws", "gcp", "azure", "http"] } +pyo3-object_store = { workspace = true } url = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 53994d2f5..7f233ca14 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -59,10 +59,15 @@ use datafusion_python_util::{ get_tokio_runtime, spawn_future, wait_for_future, }; use object_store::ObjectStore; +use object_store::aws::AmazonS3Builder; +use object_store::azure::MicrosoftAzureBuilder; +use object_store::gcp::GoogleCloudStorageBuilder; +use object_store::http::HttpBuilder; use pyo3::IntoPyObjectExt; use pyo3::exceptions::{PyKeyError, PyValueError}; use pyo3::prelude::*; use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple}; +use pyo3_object_store::PyObjectStore; use url::Url; use uuid::Uuid; @@ -81,7 +86,6 @@ use crate::physical_plan::PyExecutionPlan; use crate::record_batch::PyRecordBatchStream; use crate::sql::logical::PyLogicalPlan; use crate::sql::util::replace_placeholders_with_strings; -use crate::store::StorageContexts; use crate::table::{PyTable, RustWrappedPyTableProviderFactory}; use crate::udaf::PyAggregateUDF; use crate::udf::PyScalarUDF; @@ -410,32 +414,19 @@ impl PySessionContext { Ok(Self { ctx, logical_codec }) } - /// Register an object store with the given name - #[pyo3(signature = (scheme, store, host=None))] - pub fn register_object_store( - &self, - scheme: &str, - store: StorageContexts, - host: Option<&str>, - ) -> PyResult<()> { - // for most stores the "host" is the bucket name and can be inferred from the store - let (store, upstream_host): (Arc, String) = match store { - StorageContexts::AmazonS3(s3) => (s3.inner, s3.bucket_name), - StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name), - StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name), - StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()), - StorageContexts::HTTP(http) => (http.store, http.url), - }; - - // let users override the host to match the api signature from upstream - let derived_host = if let Some(host) = host { - host - } else { - &upstream_host - }; - let url_string = format!("{scheme}{derived_host}"); - let url = Url::parse(&url_string).unwrap(); - self.ctx.runtime_env().register_object_store(&url, store); + /// Register an object store for a given URL prefix. + /// + /// `url` is any URL whose scheme+host identifies the store prefix, e.g. + /// ``"s3://my-bucket"`` or ``"https://my-account.blob.core.windows.net"``. + /// The ``store`` must be a :py:class:`datafusion.object_store.ObjectStore` + /// instance, such as :py:class:`~datafusion.object_store.S3Store`. + #[pyo3(signature = (url, store))] + pub fn register_object_store(&self, url: &str, store: PyObjectStore) -> PyResult<()> { + let url = Url::parse(url) + .map_err(|e| pyo3::exceptions::PyValueError::new_err(format!("invalid URL: {e}")))?; + self.ctx + .runtime_env() + .register_object_store(&url, store.into_dyn()); Ok(()) } @@ -818,7 +809,8 @@ impl PySessionContext { file_extension=".parquet", skip_metadata=true, schema=None, - file_sort_order=None))] + file_sort_order=None, + object_store=None))] pub fn register_parquet( &self, name: &str, @@ -829,8 +821,10 @@ impl PySessionContext { skip_metadata: bool, schema: Option>, file_sort_order: Option>>, + object_store: Option, py: Python, ) -> PyDataFusionResult<()> { + self.prepare_store_for_path(path, object_store); let mut options = ParquetReadOptions::default() .table_partition_cols( table_partition_cols @@ -855,12 +849,14 @@ impl PySessionContext { #[pyo3(signature = (name, path, - options=None))] + options=None, + object_store=None))] pub fn register_csv( &self, name: &str, path: &Bound<'_, PyAny>, options: Option<&PyCsvReadOptions>, + object_store: Option, py: Python, ) -> PyDataFusionResult<()> { let options = options @@ -870,10 +866,14 @@ impl PySessionContext { if path.is_instance_of::() { let paths = path.extract::>()?; + for p in &paths { + self.prepare_store_for_path(p, object_store.clone()); + } let result = self.register_csv_from_multiple_paths(name, paths, options); wait_for_future(py, result)??; } else { let path = path.extract::()?; + self.prepare_store_for_path(&path, object_store); let result = self.ctx.register_csv(name, &path, options); wait_for_future(py, result)??; } @@ -888,7 +888,8 @@ impl PySessionContext { schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], - file_compression_type=None))] + file_compression_type=None, + object_store=None))] pub fn register_json( &self, name: &str, @@ -898,11 +899,13 @@ impl PySessionContext { file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, file_compression_type: Option, + object_store: Option, py: Python, ) -> PyDataFusionResult<()> { let path = path .to_str() .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + self.prepare_store_for_path(path, object_store); let mut options = JsonReadOptions::default() .file_compression_type(parse_file_compression_type(file_compression_type)?) @@ -927,7 +930,8 @@ impl PySessionContext { path, schema=None, file_extension=".avro", - table_partition_cols=vec![]))] + table_partition_cols=vec![], + object_store=None))] pub fn register_avro( &self, name: &str, @@ -935,12 +939,14 @@ impl PySessionContext { schema: Option>, file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, + object_store: Option, py: Python, ) -> PyDataFusionResult<()> { let path = path .to_str() .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + self.prepare_store_for_path(path, object_store); let mut options = AvroReadOptions::default().table_partition_cols( table_partition_cols .into_iter() @@ -1051,7 +1057,7 @@ impl PySessionContext { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))] + #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None, object_store=None))] pub fn read_json( &self, path: PathBuf, @@ -1060,11 +1066,13 @@ impl PySessionContext { file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, file_compression_type: Option, + object_store: Option, py: Python, ) -> PyDataFusionResult { let path = path .to_str() .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; + self.prepare_store_for_path(path, object_store); let mut options = JsonReadOptions::default() .table_partition_cols( table_partition_cols @@ -1088,11 +1096,13 @@ impl PySessionContext { #[pyo3(signature = ( path, - options=None))] + options=None, + object_store=None))] pub fn read_csv( &self, path: &Bound<'_, PyAny>, options: Option<&PyCsvReadOptions>, + object_store: Option, py: Python, ) -> PyDataFusionResult { let options = options @@ -1102,12 +1112,16 @@ impl PySessionContext { if path.is_instance_of::() { let paths = path.extract::>()?; + for p in &paths { + self.prepare_store_for_path(p, object_store.clone()); + } let paths = paths.iter().map(|p| p as &str).collect::>(); let result = self.ctx.read_csv(paths, options); let df = PyDataFrame::new(wait_for_future(py, result)??); Ok(df) } else { let path = path.extract::()?; + self.prepare_store_for_path(&path, object_store); let result = self.ctx.read_csv(path, options); let df = PyDataFrame::new(wait_for_future(py, result)??); Ok(df) @@ -1122,7 +1136,8 @@ impl PySessionContext { file_extension=".parquet", skip_metadata=true, schema=None, - file_sort_order=None))] + file_sort_order=None, + object_store=None))] pub fn read_parquet( &self, path: &str, @@ -1132,8 +1147,10 @@ impl PySessionContext { skip_metadata: bool, schema: Option>, file_sort_order: Option>>, + object_store: Option, py: Python, ) -> PyDataFusionResult { + self.prepare_store_for_path(path, object_store); let mut options = ParquetReadOptions::default() .table_partition_cols( table_partition_cols @@ -1157,15 +1174,17 @@ impl PySessionContext { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro"))] + #[pyo3(signature = (path, schema=None, table_partition_cols=vec![], file_extension=".avro", object_store=None))] pub fn read_avro( &self, path: &str, schema: Option>, table_partition_cols: Vec<(String, PyArrowType)>, file_extension: &str, + object_store: Option, py: Python, ) -> PyDataFusionResult { + self.prepare_store_for_path(path, object_store); let mut options = AvroReadOptions::default().table_partition_cols( table_partition_cols .into_iter() @@ -1260,6 +1279,100 @@ impl PySessionContext { self.ctx.table(name).await } + /// Auto-detect and register the appropriate [`ObjectStore`] for a URL. + /// + /// Parses `url_str` and, based on its scheme, constructs and registers an + /// object store so callers do not have to call `register_object_store` + /// explicitly. Supported schemes: + /// + /// * `http` / `https` – registers an [`HttpStore`] for the URL origin. + /// * `s3` – registers an anonymous [`AmazonS3`] store (no credentials / signature). + /// Users requiring authenticated access should pass an explicit store via the + /// `object_store` parameter instead. + /// * `gs` / `gcs` – registers a [`GoogleCloudStorage`] store seeded from + /// environment variables. + /// * `az` / `abfss` – registers a [`MicrosoftAzure`] store seeded from + /// environment variables. + /// + /// Errors from building cloud stores (e.g. missing credentials) are silently + /// ignored so that the subsequent DataFusion operation surfaces a meaningful + /// diagnostic. Non-URL strings (local paths) are silently skipped. + fn try_register_url_store(&self, url_str: &str) { + let Ok(url) = Url::parse(url_str) else { + return; // local path or unparsable – nothing to do + }; + // Skip auto-registration if a store is already registered for this URL + // (e.g. the user called register_object_store() explicitly beforehand). + if self + .ctx + .runtime_env() + .object_store_registry + .get_store(&url) + .is_ok() + { + return; + } + let store: Arc = match url.scheme() { + "http" | "https" => match HttpBuilder::new() + .with_url(url.origin().ascii_serialization()) + .build() + { + Ok(s) => Arc::new(s), + Err(_) => return, + }, + "s3" => { + let bucket = url.host_str().unwrap_or_default(); + match AmazonS3Builder::new() + .with_bucket_name(bucket) + .with_skip_signature(true) + .build() + { + Ok(s) => Arc::new(s), + Err(_) => return, + } + } + "gs" | "gcs" => { + let bucket = url.host_str().unwrap_or_default(); + match GoogleCloudStorageBuilder::from_env() + .with_bucket_name(bucket) + .build() + { + Ok(s) => Arc::new(s), + Err(_) => return, + } + } + "az" | "abfss" => { + let container = url.host_str().unwrap_or_default(); + match MicrosoftAzureBuilder::from_env() + .with_container_name(container) + .build() + { + Ok(s) => Arc::new(s), + Err(_) => return, + } + } + _ => return, // unsupported scheme (e.g. "file://") + }; + self.ctx.runtime_env().register_object_store(&url, store); + } + + /// Register an object store for `path`, either using a caller-supplied + /// store or by auto-detecting one from the URL scheme via + /// [`try_register_url_store`][Self::try_register_url_store]. + fn prepare_store_for_path(&self, path: &str, store: Option) { + if let Some(store) = store { + // Caller supplied a store: register it directly and skip auto-detection. + let Ok(url) = Url::parse(path) else { + return; + }; + self.ctx + .runtime_env() + .register_object_store(&url, store.into_dyn()); + } else { + self.try_register_url_store(path); + } + } + async fn register_csv_from_multiple_paths( &self, name: &str, diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fc2d006d3..d6071a39c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -49,7 +49,6 @@ mod pyarrow_filter_expression; pub mod pyarrow_util; mod record_batch; pub mod sql; -pub mod store; pub mod table; pub mod unparser; @@ -119,9 +118,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { functions::init_module(&funcs)?; m.add_submodule(&funcs)?; - let store = PyModule::new(py, "object_store")?; - store::init_module(&store)?; - m.add_submodule(&store)?; + pyo3_object_store::register_store_module(py, &m, "datafusion._internal", "object_store")?; let options = PyModule::new(py, "options")?; options::init_module(&options)?; diff --git a/crates/core/src/store.rs b/crates/core/src/store.rs index 8535e83b7..923bc72b4 100644 --- a/crates/core/src/store.rs +++ b/crates/core/src/store.rs @@ -202,7 +202,7 @@ pub struct PyAmazonS3Context { #[pymethods] impl PyAmazonS3Context { #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, session_token=None, endpoint=None, allow_http=false, imdsv1_fallback=false))] + #[pyo3(signature = (bucket_name, region=None, access_key_id=None, secret_access_key=None, session_token=None, endpoint=None, allow_http=false, imdsv1_fallback=false, skip_signature=false))] #[new] fn new( bucket_name: String, @@ -214,9 +214,16 @@ impl PyAmazonS3Context { //retry_config: RetryConfig, allow_http: bool, imdsv1_fallback: bool, + skip_signature: bool, ) -> Self { - // start w/ the options that come directly from the environment - let mut builder = AmazonS3Builder::from_env(); + // When skip_signature is set (anonymous access to public buckets) start + // from a blank builder so that no credential provider chain (including + // EC2 IMDS) is established. Otherwise seed from environment variables. + let mut builder = if skip_signature { + AmazonS3Builder::new() + } else { + AmazonS3Builder::from_env() + }; if let Some(region) = region { builder = builder.with_region(region); @@ -246,6 +253,7 @@ impl PyAmazonS3Context { .with_bucket_name(bucket_name.clone()) //.with_retry_config(retry_config) #TODO: add later .with_allow_http(allow_http) + .with_skip_signature(skip_signature) .build() .expect("failed to build AmazonS3"); diff --git a/python/datafusion/context.py b/python/datafusion/context.py index c8edc816f..6158c224d 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -556,17 +556,30 @@ def enable_url_table(self) -> SessionContext: obj.ctx = self.ctx.enable_url_table() return obj - def register_object_store( - self, schema: str, store: Any, host: str | None = None - ) -> None: + def register_object_store(self, url: str, store: Any) -> None: """Add a new object store into the session. + Registers ``store`` for all URLs whose prefix matches ``url``. The + ``url`` should include the scheme and, where applicable, the bucket or + container name so that DataFusion can route requests correctly. + + Example:: + + from datafusion import SessionContext + from datafusion.object_store import S3Store + + ctx = SessionContext() + store = S3Store("my-bucket", region="us-east-1") + ctx.register_object_store("s3://my-bucket", store) + df = ctx.read_parquet("s3://my-bucket/path/to/file.parquet") + Args: - schema: The data source schema. - store: The :py:class:`~datafusion.object_store.ObjectStore` to register. - host: URL for the host. + url: URL prefix that identifies the object store, e.g. + ``"s3://my-bucket"`` or + ``"https://my-account.blob.core.windows.net"``. + store: A :py:class:`~datafusion.object_store.ObjectStore` instance. """ - self.ctx.register_object_store(schema, store, host) + self.ctx.register_object_store(url, store) def register_listing_table( self, @@ -918,6 +931,7 @@ def register_parquet( skip_metadata: bool = True, schema: pa.Schema | None = None, file_sort_order: Sequence[Sequence[SortKey]] | None = None, + object_store: Any | None = None, ) -> None: """Register a Parquet file as a table. @@ -939,6 +953,10 @@ def register_parquet( file_sort_order: Sort order for the file. Each sort key can be specified as a column name (``str``), an expression (``Expr``), or a ``SortExpr``. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. If ``None``, the store is + auto-detected from the URL scheme. """ if table_partition_cols is None: table_partition_cols = [] @@ -952,6 +970,7 @@ def register_parquet( skip_metadata, schema, self._convert_file_sort_order(file_sort_order), + object_store, ) def register_csv( @@ -965,6 +984,7 @@ def register_csv( file_extension: str = ".csv", file_compression_type: str | None = None, options: CsvReadOptions | None = None, + object_store: Any | None = None, ) -> None: """Register a CSV file as a table. @@ -986,6 +1006,9 @@ def register_csv( file_compression_type: File compression type. options: Set advanced options for CSV reading. This cannot be combined with any of the other options in this method. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. """ path_arg = [str(p) for p in path] if isinstance(path, list) else str(path) @@ -1024,6 +1047,7 @@ def register_csv( name, path_arg, options.to_inner(), + object_store, ) def register_json( @@ -1035,6 +1059,7 @@ def register_json( file_extension: str = ".json", table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, file_compression_type: str | None = None, + object_store: Any | None = None, ) -> None: """Register a JSON file as a table. @@ -1051,6 +1076,9 @@ def register_json( selected for data input. table_partition_cols: Partition columns. file_compression_type: File compression type. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. """ if table_partition_cols is None: table_partition_cols = [] @@ -1063,6 +1091,7 @@ def register_json( file_extension, table_partition_cols, file_compression_type, + object_store, ) def register_avro( @@ -1072,6 +1101,7 @@ def register_avro( schema: pa.Schema | None = None, file_extension: str = ".avro", table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, + object_store: Any | None = None, ) -> None: """Register an Avro file as a table. @@ -1084,12 +1114,15 @@ def register_avro( schema: The data source schema. file_extension: File extension to select. table_partition_cols: Partition columns. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. """ if table_partition_cols is None: table_partition_cols = [] table_partition_cols = _convert_table_partition_cols(table_partition_cols) self.ctx.register_avro( - name, str(path), schema, file_extension, table_partition_cols + name, str(path), schema, file_extension, table_partition_cols, object_store ) def register_dataset(self, name: str, dataset: pa.dataset.Dataset) -> None: @@ -1149,6 +1182,7 @@ def read_json( file_extension: str = ".json", table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, file_compression_type: str | None = None, + object_store: Any | None = None, ) -> DataFrame: """Read a line-delimited JSON data source. @@ -1161,6 +1195,9 @@ def read_json( selected for data input. table_partition_cols: Partition columns. file_compression_type: File compression type. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. Returns: DataFrame representation of the read JSON files. @@ -1176,6 +1213,7 @@ def read_json( file_extension, table_partition_cols, file_compression_type, + object_store, ) ) @@ -1190,6 +1228,7 @@ def read_csv( table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, file_compression_type: str | None = None, options: CsvReadOptions | None = None, + object_store: Any | None = None, ) -> DataFrame: """Read a CSV data source. @@ -1209,6 +1248,9 @@ def read_csv( file_compression_type: File compression type. options: Set advanced options for CSV reading. This cannot be combined with any of the other options in this method. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. Returns: DataFrame representation of the read CSV files @@ -1252,6 +1294,7 @@ def read_csv( self.ctx.read_csv( path_arg, options.to_inner(), + object_store, ) ) @@ -1264,6 +1307,7 @@ def read_parquet( skip_metadata: bool = True, schema: pa.Schema | None = None, file_sort_order: Sequence[Sequence[SortKey]] | None = None, + object_store: Any | None = None, ) -> DataFrame: """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`. @@ -1283,6 +1327,9 @@ def read_parquet( file_sort_order: Sort order for the file. Each sort key can be specified as a column name (``str``), an expression (``Expr``), or a ``SortExpr``. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. Returns: DataFrame representation of the read Parquet files @@ -1300,6 +1347,7 @@ def read_parquet( skip_metadata, schema, file_sort_order, + object_store, ) ) @@ -1309,6 +1357,7 @@ def read_avro( schema: pa.Schema | None = None, file_partition_cols: list[tuple[str, str | pa.DataType]] | None = None, file_extension: str = ".avro", + object_store: Any | None = None, ) -> DataFrame: """Create a :py:class:`DataFrame` for reading Avro data source. @@ -1317,6 +1366,9 @@ def read_avro( schema: The data source schema. file_partition_cols: Partition columns. file_extension: File extension to select. + object_store: An optional + :py:class:`~datafusion.object_store.ObjectStore` instance to + use for accessing the file. Returns: DataFrame representation of the read Avro file @@ -1325,7 +1377,9 @@ def read_avro( file_partition_cols = [] file_partition_cols = _convert_table_partition_cols(file_partition_cols) return DataFrame( - self.ctx.read_avro(str(path), schema, file_partition_cols, file_extension) + self.ctx.read_avro( + str(path), schema, file_partition_cols, file_extension, object_store + ) ) def read_table( diff --git a/python/datafusion/object_store.py b/python/datafusion/object_store.py index 6298526f5..071e724f0 100644 --- a/python/datafusion/object_store.py +++ b/python/datafusion/object_store.py @@ -14,14 +14,49 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Object store functionality.""" +"""Object store functionality. -from ._internal import object_store +The store classes in this module are provided by `pyo3-object-store`_ and give +Python-level builders for every backend supported by the Rust ``object_store`` +crate. Pass any of these to +:py:meth:`~datafusion.context.SessionContext.register_object_store` to make a +remote prefix available to DataFusion queries. -AmazonS3 = object_store.AmazonS3 -GoogleCloud = object_store.GoogleCloud -LocalFileSystem = object_store.LocalFileSystem -MicrosoftAzure = object_store.MicrosoftAzure -Http = object_store.Http +.. _pyo3-object-store: https://github.com/developmentseed/obstore +""" -__all__ = ["AmazonS3", "GoogleCloud", "Http", "LocalFileSystem", "MicrosoftAzure"] +from ._internal import object_store as _os + +# Primary API — names match pyo3-object-store conventions. +S3Store = _os.S3Store +GCSStore = _os.GCSStore +AzureStore = _os.AzureStore +HTTPStore = _os.HTTPStore +LocalStore = _os.LocalStore +MemoryStore = _os.MemoryStore + +# Convenience factory that constructs the right store from a URL string. +from_url = _os.from_url + +# Backward-compatible aliases for the names used in older datafusion-python +# releases. New code should prefer the names above. +AmazonS3 = S3Store +GoogleCloud = GCSStore +MicrosoftAzure = AzureStore +Http = HTTPStore +LocalFileSystem = LocalStore + +__all__ = [ + "AmazonS3", + "AzureStore", + "GCSStore", + "GoogleCloud", + "HTTPStore", + "Http", + "LocalFileSystem", + "LocalStore", + "MemoryStore", + "MicrosoftAzure", + "S3Store", + "from_url", +] diff --git a/python/tests/test_sql.py b/python/tests/test_sql.py index 1ed1746e1..36fb6c0e3 100644 --- a/python/tests/test_sql.py +++ b/python/tests/test_sql.py @@ -22,7 +22,6 @@ import pyarrow.dataset as ds import pytest from datafusion import SessionContext, col, udf -from datafusion.object_store import Http from pyarrow.csv import write_csv from . import generic as helpers @@ -141,14 +140,52 @@ def test_register_csv_list(ctx, tmp_path): def test_register_http_csv(ctx): + """Object store is auto-registered from the URL scheme - no manual call needed.""" url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv" - ctx.register_object_store("", Http(url)) ctx.register_csv("remote", url) assert ctx.table_exist("remote") res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist() assert res["total"] > 0 +def test_read_http_csv(ctx): + """Reproduce the Polars test_read_web_file pattern using DataFusion. + + Polars: pl.read_csv(url) where url is an HTTPS URL. + DataFusion: ctx.read_csv(url) - object store auto-registered from scheme. + """ + url = "https://raw.githubusercontent.com/pola-rs/polars/main/examples/datasets/foods1.csv" + df = ctx.read_csv(url) + assert df.count() == 27 + + +def test_read_https_parquet(ctx): + """Read a Parquet file from GitHub via HTTPS - object store auto-registered. + + Uses the canonical Apache parquet-testing reference file (8 rows, 9 columns). + """ + url = "https://raw.githubusercontent.com/apache/parquet-testing/master/data/alltypes_plain.parquet" + df = ctx.read_parquet(url) + assert df.count() == 8 + assert "id" in df.schema().names + + +def test_read_s3_parquet_explicit(ctx): + """Read a Parquet file from a public S3 bucket using the object_store parameter. + + Passes S3Store directly to read_parquet() - no separate register_object_store() + call needed. Uses the coiled-datasets public bucket (region us-east-2). + The airbnb parquet part file is ~188 KB so the test stays fast. + """ + from datafusion.object_store import S3Store + + store = S3Store("coiled-datasets", region="us-east-2", skip_signature=True) + url = "s3://coiled-datasets/airbnb-monogo/description-and-ratings.parquet/part.0.parquet" + df = ctx.read_parquet(url, object_store=store) + assert df.count() == 221 + assert "description" in df.schema().names + + def test_register_parquet(ctx, tmp_path): path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data()) ctx.register_parquet("t", path)