diff --git a/paimon-python/README.md b/paimon-python/README.md index e216dc00197c..d919aacb8c53 100644 --- a/paimon-python/README.md +++ b/paimon-python/README.md @@ -31,3 +31,77 @@ pip3 install dist/*.tar.gz The command will install the package and core dependencies to your local Python environment. +# HDFS without a local Hadoop install + +`pypaimon` supports HDFS through a pure-protocol client based on +[`hdfs-native`](https://github.com/Kimahriman/hdfs-native) (Rust + PyO3). +Use it when you want HDFS access **without** installing Hadoop, a JDK, +`libhdfs`, or wrestling with `CLASSPATH` / `LD_LIBRARY_PATH`. + +Install with the optional extra: + +```commandline +pip install 'pypaimon[hdfs]' +``` + +For `hdfs://` and `viewfs://` URIs this backend is now the default. +Switch back to the legacy `libhdfs` (JNI) path with: + +```python +catalog = CatalogFactory.create({ + "warehouse": "hdfs://ns1/warehouse", + "hdfs.client.impl": "pyarrow", # default: "native" +}) +``` + +## Sourcing the cluster wiring + +The client still needs to know about NameNode addresses, HA failover +groups, and `viewfs` mount tables. Three options: + +1. **Local xml** — set `HADOOP_CONF_DIR` (or the `hdfs.conf-dir` option) + to a directory containing `core-site.xml` / `hdfs-site.xml`. Only the + xml is required; no Hadoop binaries or JDK. + +2. **Catalog options (REST-friendly)** — pass the original Hadoop + key/values directly in catalog options. Keys with prefixes `dfs.`, + `fs.`, `hadoop.`, `ipc.`, `io.` are forwarded as-is. A REST catalog + can deliver these in its response, giving a fully zero-file client + experience: + + ```python + CatalogFactory.create({ + "warehouse": "viewfs://cluster/warehouse", + "dfs.nameservices": "ns1", + "dfs.ha.namenodes.ns1": "nn1,nn2", + "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020", + "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020", + "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod", + }) + ``` + +3. **Namespaced overrides** — use `hdfs.config.` to forward any + other Hadoop key not covered by the prefix whitelist. + +The three sources can be combined; catalog options take precedence over +xml. + +## Kerberos + +A secured cluster still needs the GSSAPI system library +(`libgssapi-krb5-2` on Debian/Ubuntu, `krb5` via Homebrew on macOS, +`krb5-libs` on RHEL) plus a `krb5.conf`. Provide credentials by either: + +- Running `kinit` yourself and pointing `KRB5CCNAME` at the cache, or +- Setting `security.kerberos.login.principal` and + `security.kerberos.login.keytab` in catalog options — `pypaimon` will + run `kinit` for you. + +## Fallback behaviour + +If the native backend fails to initialise (e.g. wheel missing on an +unsupported platform such as Windows), `pypaimon` automatically falls +back to the `pyarrow` (`libhdfs`/JVM) path and logs a warning. Disable +the fallback with `hdfs.client.fallback-to-pyarrow=false` if you want +hard failures instead. + diff --git a/paimon-python/pypaimon/benchmark/hdfs_io_bench.py b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py new file mode 100644 index 000000000000..5b6b12dd18de --- /dev/null +++ b/paimon-python/pypaimon/benchmark/hdfs_io_bench.py @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""HDFS FileIO benchmark: native (hdfs-native) vs pyarrow (libhdfs/JVM). + +Compares throughput of common FileIO operations between the two backends +against the same HDFS cluster. Each backend is exercised via the FileIO +factory by toggling the `hdfs.client.impl` option. + +Usage: + python -m pypaimon.benchmark.hdfs_io_bench \\ + --warehouse hdfs://localhost:8020/bench \\ + [--backend native|pyarrow|both] \\ + [--write-size-mb 256] \\ + [--list-files 1000] \\ + [--read-iters 3] + +Notes: +- `pyarrow` backend requires HADOOP_HOME + HADOOP_CONF_DIR + libhdfs. +- `native` backend requires `pip install pypaimon[hdfs]`. +- The benchmark writes/reads scratch files under /bench_/ + and removes them on exit. +""" + +import argparse +import os +import sys +import time +import uuid +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from pypaimon.common.file_io import FileIO # noqa: E402 +from pypaimon.common.options import Options # noqa: E402 + + +def _build_file_io(warehouse: str, backend: str) -> FileIO: + opts = Options({"hdfs.client.impl": backend}) + return FileIO.get(warehouse, opts) + + +def _human(seconds: float) -> str: + if seconds < 1e-3: + return f"{seconds * 1e6:.0f}us" + if seconds < 1: + return f"{seconds * 1e3:.1f}ms" + return f"{seconds:.2f}s" + + +def _bench_write(file_io: FileIO, root: str, size_mb: int) -> float: + payload = os.urandom(min(size_mb, 16) * 1024 * 1024) + path = f"{root}/write-{uuid.uuid4().hex[:8]}.bin" + t0 = time.perf_counter() + with file_io.new_output_stream(path) as stream: + written = 0 + target = size_mb * 1024 * 1024 + while written < target: + chunk = payload[: min(len(payload), target - written)] + n = stream.write(chunk) + written += n if isinstance(n, int) and n > 0 else len(chunk) + return time.perf_counter() - t0 + + +def _bench_read(file_io: FileIO, path: str) -> float: + t0 = time.perf_counter() + with file_io.new_input_stream(path) as stream: + while True: + chunk = stream.read(8 * 1024 * 1024) + if not chunk: + break + return time.perf_counter() - t0 + + +def _bench_list(file_io: FileIO, root: str, num_files: int) -> float: + scratch = f"{root}/list-{uuid.uuid4().hex[:8]}" + file_io.mkdirs(scratch) + try: + for i in range(num_files): + with file_io.new_output_stream(f"{scratch}/f-{i:06d}.txt") as s: + s.write(b"x") + t0 = time.perf_counter() + results = file_io.list_status(scratch) + _ = list(results) + return time.perf_counter() - t0 + finally: + file_io.delete(scratch, recursive=True) + + +def run_one(backend: str, args) -> None: + print(f"\n=== backend={backend} ===") + try: + file_io = _build_file_io(args.warehouse, backend) + except Exception as e: + print(f" init failed: {e}") + return + + bench_root = f"{args.warehouse.rstrip('/')}/bench_{uuid.uuid4().hex[:8]}" + file_io.mkdirs(bench_root) + try: + # Write + sample_path = f"{bench_root}/write-sample.bin" + with file_io.new_output_stream(sample_path) as stream: + payload = os.urandom(min(args.write_size_mb, 16) * 1024 * 1024) + written = 0 + target = args.write_size_mb * 1024 * 1024 + t0 = time.perf_counter() + while written < target: + chunk = payload[: min(len(payload), target - written)] + n = stream.write(chunk) + written += n if isinstance(n, int) and n > 0 else len(chunk) + write_elapsed = time.perf_counter() - t0 + mb_per_s = args.write_size_mb / write_elapsed if write_elapsed else 0 + print(f" write {args.write_size_mb}MB: " + f"{_human(write_elapsed)} ({mb_per_s:.1f} MB/s)") + + # Read (warm) + read_times = [] + for _ in range(args.read_iters): + read_times.append(_bench_read(file_io, sample_path)) + avg_read = sum(read_times) / len(read_times) + rmb_per_s = args.write_size_mb / avg_read if avg_read else 0 + print(f" read {args.write_size_mb}MB (avg of {args.read_iters}): " + f"{_human(avg_read)} ({rmb_per_s:.1f} MB/s)") + + # List + list_elapsed = _bench_list(file_io, bench_root, args.list_files) + print(f" list {args.list_files} files: {_human(list_elapsed)}") + + finally: + try: + file_io.delete(bench_root, recursive=True) + except Exception as e: + print(f" cleanup failed: {e}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--warehouse", required=True, + help="HDFS URI under which scratch files are created") + parser.add_argument("--backend", default="both", + choices=["native", "pyarrow", "both"]) + parser.add_argument("--write-size-mb", type=int, default=128) + parser.add_argument("--list-files", type=int, default=1000) + parser.add_argument("--read-iters", type=int, default=3) + args = parser.parse_args() + + backends = ["native", "pyarrow"] if args.backend == "both" else [args.backend] + for backend in backends: + run_one(backend, args) + + +if __name__ == "__main__": + main() diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 6f9758965b57..6fd4516a44a2 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -265,8 +265,11 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO': """ Returns a FileIO instance for accessing the file system identified by the given path. - LocalFileIO for local file system (file:// or no scheme) - - PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.) + - HdfsNativeFileIO for HDFS/ViewFS (default; pure protocol client, no Hadoop install) + - PyArrowFileIO for other remote file systems (oss://, s3://, gs://, ...), + and for HDFS when explicitly requested via hdfs.client.impl=pyarrow """ + import os as _os from urllib.parse import urlparse uri = urlparse(path) @@ -276,5 +279,39 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO': from pypaimon.filesystem.local_file_io import LocalFileIO return LocalFileIO(path, catalog_options) + opts = catalog_options or Options({}) + + if scheme in ("hdfs", "viewfs"): + from pypaimon.common.options.config import HdfsOptions + impl_source = "hdfs.client.impl option" + # Treat an empty option value the same as "unset" so callers can + # blank it out (common in templated configs) without tripping + # the unsupported-impl branch. + impl_value = opts.to_map().get(HdfsOptions.HDFS_CLIENT_IMPL.key()) + if not impl_value: + impl_value = _os.environ.get("PYPAIMON_HDFS_IMPL") + impl_source = "PYPAIMON_HDFS_IMPL env var" + if not impl_value: + impl_value = HdfsOptions.HDFS_CLIENT_IMPL.default_value() + impl_source = "default" + impl = impl_value.lower() + if impl == "native": + try: + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + return HdfsNativeFileIO(path, opts) + except (ImportError, RuntimeError) as e: + fallback = opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW) + if not fallback: + raise + logging.getLogger(__name__).warning( + "Native HDFS backend init failed, falling back to " + "pyarrow: %s", e, + ) + elif impl != "pyarrow": + raise ValueError( + f"Unsupported hdfs.client.impl '{impl_value}' " + f"(from {impl_source}). Supported: 'native', 'pyarrow'." + ) + from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO - return PyArrowFileIO(path, catalog_options or Options({})) + return PyArrowFileIO(path, opts) diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index 465671725d40..6189ebaf899d 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -104,6 +104,41 @@ class CatalogOptions: BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1 +class HdfsOptions: + HDFS_CLIENT_IMPL = ( + ConfigOptions.key("hdfs.client.impl") + .string_type() + .default_value("native") + .with_description( + "HDFS FileIO backend. Supported values: 'native' (default, uses " + "hdfs-native protocol client, no Hadoop install required), " + "'pyarrow' (legacy, requires HADOOP_HOME / libhdfs / JVM)." + ) + ) + HDFS_CLIENT_FALLBACK_TO_PYARROW = ( + ConfigOptions.key("hdfs.client.fallback-to-pyarrow") + .boolean_type() + .default_value(True) + .with_description( + "When the native backend fails to initialise (e.g. missing wheel " + "or unsupported platform), fall back to the pyarrow backend " + "instead of raising." + ) + ) + HDFS_CONF_DIR = ( + ConfigOptions.key("hdfs.conf-dir") + .string_type() + .no_default_value() + .with_description( + "Directory containing core-site.xml / hdfs-site.xml that the " + "native client should load. Defaults to $HADOOP_CONF_DIR." + ) + ) + + HDFS_CONFIG_PREFIX = "hdfs.config." + HDFS_NATIVE_CONFIG_KEY_PREFIXES = ("dfs.", "fs.", "hadoop.", "ipc.", "io.") + + class SecurityOptions: KERBEROS_PRINCIPAL = ( ConfigOptions.key("security.kerberos.login.principal") diff --git a/paimon-python/pypaimon/filesystem/_kerberos.py b/paimon-python/pypaimon/filesystem/_kerberos.py new file mode 100644 index 000000000000..00a31ba975eb --- /dev/null +++ b/paimon-python/pypaimon/filesystem/_kerberos.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Shared Kerberos helpers used by HDFS FileIO backends.""" + +import os +import subprocess +from typing import Optional + + +def kerberos_login_from_keytab(principal: str, keytab: str) -> None: + if not os.path.isfile(keytab): + raise FileNotFoundError(f"Kerberos keytab file not found: {keytab}") + if not os.access(keytab, os.R_OK): + raise PermissionError(f"Kerberos keytab file is not readable: {keytab}") + subprocess.run( + ['kinit', '-kt', keytab, principal], + check=True, capture_output=True, text=True, + ) + + +def get_ticket_cache_path() -> Optional[str]: + cc = os.environ.get('KRB5CCNAME') + if cc: + if cc.startswith('FILE:'): + return cc[5:] + return cc + default_path = f'/tmp/krb5cc_{os.getuid()}' + if os.path.exists(default_path): + return default_path + return None diff --git a/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py new file mode 100644 index 000000000000..68849bbd4fd1 --- /dev/null +++ b/paimon-python/pypaimon/filesystem/hdfs_native_file_io.py @@ -0,0 +1,638 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""HDFS FileIO backed by the hdfs-native protocol client (no JVM, no libhdfs).""" + +import logging +import os +import xml.etree.ElementTree as ET +from datetime import datetime, timezone +from pathlib import PurePosixPath +from typing import Dict, Optional +from urllib.parse import urlparse + +import pyarrow +import pyarrow.fs as pafs + +from pypaimon.common.file_io import FileIO +from pypaimon.common.options import Options +from pypaimon.common.options.config import HdfsOptions, SecurityOptions +from pypaimon.common.uri_reader import UriReaderFactory +from pypaimon.filesystem import _kerberos +from pypaimon.schema.data_types import AtomicType, DataField, PyarrowFieldParser +from pypaimon.write.blob_format_writer import BlobFormatWriter + + +class _HdfsFileInfo: + """pafs.FileInfo-shaped adapter built from hdfs_native.FileStatus.""" + __slots__ = ('path', 'size', 'type', 'mtime', 'base_name') + + def __init__(self, path: str, size: Optional[int], file_type, mtime): + self.path = path + self.size = size + self.type = file_type + self.mtime = mtime + self.base_name = path.rsplit('/', 1)[-1] if path else '' + + +class _HdfsWriterAdapter: + """File-like wrapper over hdfs_native.FileWriter.""" + + def __init__(self, fw): + self._fw = fw + self._pos = 0 + self._closed = False + + def write(self, buf) -> int: + n = self._fw.write(buf) + if n is None: + n = len(buf) if hasattr(buf, '__len__') else 0 + self._pos += n + return n + + def tell(self) -> int: + return self._pos + + def flush(self): + pass + + def close(self): + if not self._closed: + try: + self._fw.close() + finally: + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + def writable(self) -> bool: + return True + + def readable(self) -> bool: + return False + + def seekable(self) -> bool: + return False + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + return False + + +class _HdfsReaderAdapter: + """File-like wrapper over hdfs_native.FileReader. + + Delegates read/seek/tell straight to the underlying reader (which is an + io.RawIOBase subclass with full seek/tell support). The wrapper only + exists so that exiting a `with` block guarantees the underlying handle + is closed — hdfs-native's own FileReader.__exit__ is a no-op. + """ + + def __init__(self, fr): + self._fr = fr + self._closed = False + + def read(self, size: int = -1) -> bytes: + return self._fr.read(-1 if size is None else size) + + def read1(self, size: int = -1) -> bytes: + return self.read(size) + + def seek(self, pos: int, whence: int = 0) -> int: + self._fr.seek(pos, whence) + return self._fr.tell() + + def tell(self) -> int: + return self._fr.tell() + + def close(self): + if self._closed: + return + try: + close = getattr(self._fr, 'close', None) + if close is not None: + close() + finally: + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + def readable(self) -> bool: + return True + + def writable(self) -> bool: + return False + + def seekable(self) -> bool: + return True + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + return False + + +class HdfsNativeFileIO(FileIO): + """HDFS FileIO that speaks the HDFS RPC protocol directly. + + No JVM, no libhdfs, no Hadoop install required. Hadoop xml is still + consumed if present (HADOOP_CONF_DIR or `hdfs.conf-dir` option) for + viewfs mount tables and HA NameNode lists; alternatively the same + key/values can be delivered via the catalog options channel (a REST + catalog can therefore push the cluster wiring with the response). + """ + + NATIVE_KEY_PREFIXES = HdfsOptions.HDFS_NATIVE_CONFIG_KEY_PREFIXES + NS_PREFIX = HdfsOptions.HDFS_CONFIG_PREFIX + + def __init__(self, path: str, catalog_options: Options): + self.properties = catalog_options or Options({}) + self.logger = logging.getLogger(__name__) + self.uri_reader_factory = UriReaderFactory(self.properties) + + scheme, netloc, _ = self.parse_location(path) + if scheme not in {"hdfs", "viewfs"}: + raise ValueError( + f"HdfsNativeFileIO does not support scheme '{scheme}'" + ) + self._scheme = scheme + self._netloc = netloc + + try: + from hdfs_native import Client, WriteOptions + except ImportError as e: + raise ImportError( + "hdfs-native is not installed. " + "Install with: pip install 'pypaimon[hdfs]'" + ) from e + self._WriteOptions = WriteOptions + + self._setup_kerberos() + + config_dir = ( + self.properties.get(HdfsOptions.HDFS_CONF_DIR) + or os.environ.get("HADOOP_CONF_DIR") + ) + hadoop_xml = self._load_hadoop_xml(config_dir) + + config = self._build_config_dict() + self._maybe_inject_viewfs_fallback(scheme, netloc, config, hadoop_xml) + + # Stash for the lazy `filesystem` property (the fsspec/pyarrow facade + # is only built if a caller asks for it). + self._config = config + self._hadoop_xml = hadoop_xml + self._config_dir = config_dir + self._filesystem = None + + client_kwargs = {} + url = self._build_url(scheme, netloc) + if url: + client_kwargs["url"] = url + if config: + client_kwargs["config"] = config + if config_dir: + client_kwargs["config_dir"] = config_dir + + self._client = Client(**client_kwargs) + + def __reduce__(self): + """Pickle support for Ray / multiprocessing. + + hdfs_native.Client is a Rust binding that can't be pickled; rather + than try to serialise live handles, we serialise the constructor + inputs and let workers re-init their own Client. Same pattern + pyarrow.fs.HadoopFileSystem uses. + + Pin the resolved config_dir into the carried options. If the + driver resolved it from $HADOOP_CONF_DIR, a worker on a host with + a different env var would otherwise pick up the worker's value + and silently talk to a different cluster. + """ + netloc = self._netloc or "" + path = f"{self._scheme}://{netloc}" + props_map = dict(self.properties.to_map()) + if self._config_dir and not props_map.get( + HdfsOptions.HDFS_CONF_DIR.key() + ): + props_map[HdfsOptions.HDFS_CONF_DIR.key()] = self._config_dir + return (type(self), (path, Options(props_map))) + + @property + def filesystem(self): + """pyarrow.fs.FileSystem facade backed by hdfs_native.fsspec. + + Lazily constructed: FileIO-only call paths + (exists/list_status/new_input_stream/...) never pay the fsspec init + cost; only ds.dataset / open_input_file callers do. + """ + if self._filesystem is None: + import pyarrow.fs as pafs + try: + from hdfs_native.fsspec import ( + HdfsFileSystem, + ViewfsFileSystem, + ) + except ImportError as e: + raise RuntimeError( + "hdfs-native fsspec adapter is required to bridge " + "HdfsNativeFileIO to a pyarrow.fs filesystem; upgrade " + "hdfs-native (>=0.13)." + ) from e + cls = (ViewfsFileSystem if self._scheme == "viewfs" + else HdfsFileSystem) + # Merge xml + overrides so the fsspec instance can connect + # without relying on HADOOP_CONF_DIR (BaseFileSystem.__init__ + # only forwards storage_options to Client, not config_dir). + merged_config = {**self._hadoop_xml, **self._config} + fsspec_fs = cls(host=self._netloc, **merged_config) + self._filesystem = pafs.PyFileSystem( + pafs.FSSpecHandler(fsspec_fs)) + return self._filesystem + + @staticmethod + def parse_location(location: str): + uri = urlparse(location) + if not uri.scheme: + return "file", uri.netloc, os.path.abspath(location) + return uri.scheme, uri.netloc, uri.path + + @staticmethod + def _build_url(scheme: str, netloc: Optional[str]) -> Optional[str]: + if not netloc: + return None + return f"{scheme}://{netloc}" + + @staticmethod + def _load_hadoop_xml(config_dir: Optional[str]) -> Dict[str, str]: + """Parse core-site.xml + hdfs-site.xml from a Hadoop config dir into a + flat {name: value} dict. Returns empty dict if the dir is missing or + unreadable. + + Used only to discover viewfs mount-table state so we can polyfill the + linkFallback mount that hdfs-native requires but libhdfs tolerates. + The final config dir is still handed to hdfs-native for its own + (more complete) xml parsing. + """ + result: Dict[str, str] = {} + if not config_dir or not os.path.isdir(config_dir): + return result + for fname in ("core-site.xml", "hdfs-site.xml"): + path = os.path.join(config_dir, fname) + if not os.path.isfile(path): + continue + try: + tree = ET.parse(path) + except (ET.ParseError, OSError): + continue + for prop in tree.getroot().findall("property"): + name_el = prop.find("name") + value_el = prop.find("value") + if name_el is None or name_el.text is None: + continue + value = ( + value_el.text.strip() + if value_el is not None and value_el.text + else "" + ) + result[name_el.text.strip()] = value + return result + + @staticmethod + def _maybe_inject_viewfs_fallback( + scheme: str, + netloc: Optional[str], + overrides: Dict[str, str], + hadoop_xml: Dict[str, str], + ) -> None: + """If we're opening a viewfs URI and no linkFallback is configured for + the cluster, pick a usable nameservice URI from existing link.* + targets or dfs.nameservices and inject one into `overrides`. + + hdfs-native rejects viewfs init without a fallback mount; libhdfs + tolerates it. This bridges the gap without touching cluster xml. + """ + if scheme != "viewfs" or not netloc: + return + cluster = netloc + fallback_key = f"fs.viewfs.mounttable.{cluster}.linkFallback" + if fallback_key in overrides or fallback_key in hadoop_xml: + return + + link_prefix = f"fs.viewfs.mounttable.{cluster}.link." + for key, value in hadoop_xml.items(): + if key.startswith(link_prefix) and value: + parsed = urlparse(value) + if parsed.scheme == "hdfs" and parsed.netloc: + overrides[fallback_key] = f"hdfs://{parsed.netloc}/" + return + + nameservices = [ + ns.strip() + for ns in hadoop_xml.get("dfs.nameservices", "").split(",") + if ns.strip() + ] + if nameservices: + overrides[fallback_key] = f"hdfs://{nameservices[0]}/" + + def _setup_kerberos(self): + principal = ( + self.properties.get(SecurityOptions.KERBEROS_PRINCIPAL) + or self.properties.to_map().get("security.principal") + ) + keytab = ( + self.properties.get(SecurityOptions.KERBEROS_KEYTAB) + or self.properties.to_map().get("security.keytab") + ) + if bool(principal) != bool(keytab): + raise ValueError( + "security.kerberos.login.principal and " + "security.kerberos.login.keytab " + "must be both set or both unset" + ) + if principal and keytab: + _kerberos.kerberos_login_from_keytab(principal, keytab) + cache_path = _kerberos.get_ticket_cache_path() + if not cache_path: + raise RuntimeError( + "kinit succeeded but no ticket cache path could be " + "determined. Set the KRB5CCNAME environment variable " + "to specify the cache location." + ) + # hdfs-native's GSSAPI layer reads KRB5CCNAME from the process + # env, which is global state. If a different cache was already + # configured (typically because another HdfsNativeFileIO with + # a different principal lives in the same process), warn — the + # last writer wins and earlier instances will start using the + # new ticket, which is almost certainly not what the caller + # wanted. + existing = os.environ.get("KRB5CCNAME") + existing_stripped = ( + existing[5:] if existing and existing.startswith("FILE:") + else existing + ) + if existing_stripped and existing_stripped != cache_path: + self.logger.warning( + "Overwriting process-global KRB5CCNAME from %r to %r; " + "concurrent HdfsNativeFileIO instances with different " + "Kerberos principals share env state and will clobber " + "each other's ticket caches.", + existing, cache_path, + ) + # Preserve the `FILE:` qualifier if the existing value carried + # it — some GSSAPI tooling distinguishes cache types by prefix. + os.environ["KRB5CCNAME"] = ( + f"FILE:{cache_path}" + if existing and existing.startswith("FILE:") + else cache_path + ) + + def _build_config_dict(self) -> Dict[str, str]: + config: Dict[str, str] = {} + for key, value in self.properties.to_map().items(): + if value is None: + continue + if any(key.startswith(p) for p in self.NATIVE_KEY_PREFIXES): + config[key] = str(value) + elif key.startswith(self.NS_PREFIX): + config[key[len(self.NS_PREFIX):]] = str(value) + return config + + def to_filesystem_path(self, path: str) -> str: + # hdfs-native expects an absolute path within the cluster the Client is + # bound to; passing a full URI makes its Rust-side MountTable::resolve + # treat the string as a relative path (since it doesn't start with '/') + # and prepend the user's home dir, producing nonsense like + # `/user/foo/viewfs://cluster/...`. Strip the matching scheme+authority + # so a plain absolute path reaches the client. + parsed = urlparse(path) + if parsed.scheme in ("hdfs", "viewfs"): + if parsed.scheme == self._scheme and ( + not parsed.netloc or parsed.netloc == self._netloc + ): + return parsed.path or "/" + return path + + def _adapt_status(self, status, fallback_path: str = '') -> _HdfsFileInfo: + path = getattr(status, 'path', None) or fallback_path + is_dir = bool(getattr(status, 'isdir', False)) + length = getattr(status, 'length', 0) + mtime_ms = getattr(status, 'modification_time', None) + mtime = ( + datetime.fromtimestamp(mtime_ms / 1000.0, tz=timezone.utc) + if mtime_ms else None + ) + size = None if is_dir else int(length or 0) + ftype = pafs.FileType.Directory if is_dir else pafs.FileType.File + return _HdfsFileInfo(path, size, ftype, mtime) + + def new_input_stream(self, path: str): + path_str = self.to_filesystem_path(path) + reader = self._client.read(path_str) + return _HdfsReaderAdapter(reader) + + def new_output_stream(self, path: str): + path_str = self.to_filesystem_path(path) + writer = self._client.create( + path_str, + self._WriteOptions(create_parent=True, overwrite=True), + ) + return _HdfsWriterAdapter(writer) + + def get_file_status(self, path: str): + path_str = self.to_filesystem_path(path) + try: + status = self._client.get_file_info(path_str) + except FileNotFoundError: + raise FileNotFoundError(f"File {path} does not exist") + return self._adapt_status(status, path_str) + + def list_status(self, path: str): + path_str = self.to_filesystem_path(path) + return [self._adapt_status(s) for s in self._client.list_status(path_str)] + + def exists(self, path: str) -> bool: + path_str = self.to_filesystem_path(path) + try: + self._client.get_file_info(path_str) + return True + except FileNotFoundError: + return False + + def delete(self, path: str, recursive: bool = False) -> bool: + path_str = self.to_filesystem_path(path) + try: + status = self._client.get_file_info(path_str) + except FileNotFoundError: + return False + if bool(getattr(status, 'isdir', False)) and not recursive: + if next(iter(self._client.list_status(path_str)), None) is not None: + raise OSError(f"Directory {path} is not empty") + return bool(self._client.delete(path_str, recursive)) + + def mkdirs(self, path: str) -> bool: + path_str = self.to_filesystem_path(path) + try: + status = self._client.get_file_info(path_str) + except FileNotFoundError: + self._client.mkdirs(path_str, create_parent=True) + return True + if bool(getattr(status, 'isdir', False)): + return True + raise FileExistsError(f"Path exists but is not a directory: {path}") + + def rename(self, src: str, dst: str) -> bool: + src_str = self.to_filesystem_path(src) + dst_str = self.to_filesystem_path(dst) + dst_parent = str(PurePosixPath(dst_str).parent) + if dst_parent and dst_parent != '.': + try: + self._client.get_file_info(dst_parent) + except FileNotFoundError: + self._client.mkdirs(dst_parent, create_parent=True) + try: + dst_status = self._client.get_file_info(dst_str) + if not getattr(dst_status, 'isdir', False): + return False + src_name = PurePosixPath(src_str).name + dst_str = str(PurePosixPath(dst_str) / src_name) + try: + self._client.get_file_info(dst_str) + return False + except FileNotFoundError: + pass + except FileNotFoundError: + pass + try: + self._client.rename(src_str, dst_str) + return True + except FileNotFoundError: + return False + except (PermissionError, OSError): + return False + + def write_parquet(self, path: str, data: pyarrow.Table, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + try: + import pyarrow.parquet as pq + if compression.lower() == 'zstd': + kwargs['compression_level'] = zstd_level + with self.new_output_stream(path) as raw_stream: + stream = pyarrow.PythonFile(raw_stream, mode='wb') + try: + pq.write_table( + data, stream, compression=compression, **kwargs) + finally: + stream.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e + + def write_orc(self, path: str, data: pyarrow.Table, + compression: str = 'zstd', zstd_level: int = 1, **kwargs): + try: + import sys + import pyarrow.orc as orc + data = self._cast_time_columns_for_orc(data) + with self.new_output_stream(path) as raw_stream: + stream = pyarrow.PythonFile(raw_stream, mode='wb') + try: + if sys.version_info[:2] == (3, 6): + orc.write_table(data, stream, **kwargs) + else: + orc.write_table( + data, stream, compression=compression, **kwargs) + finally: + stream.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e + + def write_avro(self, path: str, data: pyarrow.Table, + avro_schema=None, compression: str = 'zstd', + zstd_level: int = 1, **kwargs): + import fastavro + if avro_schema is None: + avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) + + records_dict = data.to_pydict() + + def record_generator(): + num_rows = len(list(records_dict.values())[0]) + for i in range(num_rows): + record = {} + for col in records_dict.keys(): + value = records_dict[col][i] + if isinstance(value, datetime) and value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + record[col] = value + yield record + + codec_map = { + 'null': 'null', 'deflate': 'deflate', 'snappy': 'snappy', + 'bzip2': 'bzip2', 'xz': 'xz', 'zstandard': 'zstandard', + 'zstd': 'zstandard', + } + codec = codec_map.get(compression.lower()) + if codec is None: + raise ValueError( + f"Unsupported compression '{compression}' for Avro format. " + f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." + ) + if codec == 'zstandard': + kwargs['codec_compression_level'] = zstd_level + with self.new_output_stream(path) as output_stream: + fastavro.writer(output_stream, avro_schema, + record_generator(), codec=codec, **kwargs) + + def write_blob(self, path: str, data: pyarrow.Table, **kwargs): + try: + if data.num_columns != 1: + raise RuntimeError( + f"Blob format only supports a single column, " + f"got {data.num_columns} columns") + field = data.schema[0] + if pyarrow.types.is_large_binary(field.type): + fields = [DataField(0, field.name, AtomicType("BLOB"))] + else: + paimon_type = PyarrowFieldParser.to_paimon_type( + field.type, field.nullable) + fields = [DataField(0, field.name, paimon_type)] + records_dict = data.to_pydict() + num_rows = data.num_rows + field_name = fields[0].name + with self.new_output_stream(path) as output_stream: + writer = BlobFormatWriter(output_stream) + for i in range(num_rows): + writer.write_value(records_dict[field_name][i], + fields, self.uri_reader_factory) + writer.close() + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write blob file {path}: {e}") from e + + def close(self): + self._client = None diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index e7315f3effe9..5cbf46657226 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -329,26 +329,13 @@ def _initialize_gcs_fs(self) -> FileSystem: @staticmethod def _kerberos_login_from_keytab(principal: str, keytab: str): - if not os.path.isfile(keytab): - raise FileNotFoundError(f"Kerberos keytab file not found: {keytab}") - if not os.access(keytab, os.R_OK): - raise PermissionError(f"Kerberos keytab file is not readable: {keytab}") - subprocess.run( - ['kinit', '-kt', keytab, principal], - check=True, capture_output=True, text=True - ) + from pypaimon.filesystem import _kerberos + _kerberos.kerberos_login_from_keytab(principal, keytab) @staticmethod def _get_ticket_cache_path() -> Optional[str]: - cc = os.environ.get('KRB5CCNAME') - if cc: - if cc.startswith('FILE:'): - return cc[5:] - return cc - default_path = f'/tmp/krb5cc_{os.getuid()}' - if os.path.exists(default_path): - return default_path - return None + from pypaimon.filesystem import _kerberos + return _kerberos.get_ticket_cache_path() def new_input_stream(self, path: str): path_str = self.to_filesystem_path(path) diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/README.md b/paimon-python/pypaimon/tests/e2e/hdfs/README.md new file mode 100644 index 000000000000..47f63701ed12 --- /dev/null +++ b/paimon-python/pypaimon/tests/e2e/hdfs/README.md @@ -0,0 +1,75 @@ + + +# HDFS End-to-End Tests + +Verifies the native HDFS FileIO backend (`HdfsNativeFileIO`) against a live HDFS +cluster. No local Hadoop install or JVM required on the client side. + +## Quick start (Docker) + +```sh +# 1. Bring up a single-NameNode + single-DataNode cluster. +docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d + +# Wait ~30s for the cluster to become healthy; check with: +docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml ps + +# 2. Install the package with the hdfs extra. +pip install -e '.[hdfs]' + +# 3. Run the tests. +PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \ + python -m pytest pypaimon/tests/e2e/hdfs/ -v + +# 4. Teardown. +docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v +``` + +## REST-catalog config delivery mode (no local xml) + +The native backend accepts Hadoop key/values directly via catalog options. +Skip the `core-site.xml` / `hdfs-site.xml` dance entirely by configuring the +cluster wiring as options — exactly what a REST catalog would push to the +client in its response. Example: + +```python +catalog = CatalogFactory.create({ + "warehouse": "viewfs://cluster/warehouse", + "hdfs.client.impl": "native", + # Forwarded as-is to the underlying client: + "dfs.nameservices": "ns1,ns2", + "dfs.ha.namenodes.ns1": "nn1,nn2", + "dfs.namenode.rpc-address.ns1.nn1": "host-1:8020", + "dfs.namenode.rpc-address.ns1.nn2": "host-2:8020", + "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod", +}) +``` + +Keys matching the prefixes `dfs.` / `fs.` / `hadoop.` / `ipc.` / `io.` are +forwarded automatically. Use the `hdfs.config.` namespace for any other +key you want passed through. + +## Kerberos + +The cluster in `docker-compose.yml` runs without security to keep the +smoke test simple. For a Kerberized e2e: provision a krb5 + HDFS compose +separately, install `libgssapi-krb5-2` (or platform equivalent) on the +client, set `KRB5_CONFIG` and `KRB5CCNAME`, then either run `kinit` +yourself or pass `security.kerberos.login.principal` + `.keytab` as +catalog options (pypaimon will run `kinit` for you). diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/tests/e2e/hdfs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml new file mode 100644 index 000000000000..145708c3e629 --- /dev/null +++ b/paimon-python/pypaimon/tests/e2e/hdfs/docker-compose.yml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Single-NameNode HDFS cluster for integration testing the native HDFS backend. +# Brings up one NameNode (RPC 8020) and one DataNode (data 9866). +# +# Run: +# docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d +# PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \ +# python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v +# docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v + +services: + namenode: + image: apache/hadoop:3.3.6 + hostname: namenode + user: root + command: ["/bin/bash", "-c", "hdfs namenode -format -force -nonInteractive || true; hdfs namenode"] + environment: + HADOOP_HOME: /opt/hadoop + CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020 + HDFS-SITE.XML_dfs.namenode.rpc-address: namenode:8020 + HDFS-SITE.XML_dfs.replication: "1" + HDFS-SITE.XML_dfs.permissions.enabled: "false" + ports: + - "8020:8020" + - "9870:9870" + + datanode: + image: apache/hadoop:3.3.6 + user: root + command: ["hdfs", "datanode"] + environment: + HADOOP_HOME: /opt/hadoop + CORE-SITE.XML_fs.defaultFS: hdfs://namenode:8020 + HDFS-SITE.XML_dfs.datanode.use.datanode.hostname: "false" + HDFS-SITE.XML_dfs.permissions.enabled: "false" + depends_on: + - namenode + ports: + - "9866:9866" diff --git a/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py new file mode 100644 index 000000000000..5989b815faf7 --- /dev/null +++ b/paimon-python/pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""End-to-end tests for the native HDFS FileIO backend. + +Disabled by default. To run: + 1. Start an HDFS cluster — see docker-compose.yml in this directory. + 2. Install pypaimon with the hdfs extra: + pip install -e '.[hdfs]' + 3. Point the tests at the cluster and run: + PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \\ + python -m pytest pypaimon/tests/e2e/hdfs/hdfs_native_e2e_test.py -v + +To exercise the REST-catalog config-delivery path (no local xml), put the +Hadoop config k/v in catalog options under the `dfs.*` / `fs.*` namespaces +or via `hdfs.config.*` — both are forwarded to the underlying client. +""" + +import os +import unittest +import uuid + +import pandas as pd +import pyarrow as pa + +E2E_URL = os.environ.get("PYPAIMON_HDFS_E2E_URL") +SKIP_REASON = ("PYPAIMON_HDFS_E2E_URL not set; skipping HDFS e2e. " + "See docker-compose.yml in this directory.") + + +@unittest.skipIf(not E2E_URL, SKIP_REASON) +class HdfsNativeE2ETest(unittest.TestCase): + """Smoke-test the native HDFS backend end-to-end against a live cluster.""" + + @classmethod + def setUpClass(cls): + try: + import hdfs_native # noqa: F401 + except ImportError as e: + raise unittest.SkipTest( + "hdfs-native not installed. pip install 'pypaimon[hdfs]'" + ) from e + + from pypaimon.catalog.catalog_factory import CatalogFactory + cls.warehouse = ( + f"{E2E_URL}/pypaimon-e2e/warehouse-{uuid.uuid4().hex[:8]}" + ) + cls.catalog = CatalogFactory.create({ + "warehouse": cls.warehouse, + "hdfs.client.impl": "native", + }) + cls.catalog.create_database("default", True) + + def _make_table(self, name, schema): + from pypaimon.schema.schema import Schema + fqn = f"default.{name}" + s = Schema.from_pyarrow_schema( + schema, + options={"file.format": "parquet"}, + ) + self.catalog.create_table(fqn, s, False) + return self.catalog.get_table(fqn) + + def test_write_then_read_back(self): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("payload", pa.string()), + ]) + table = self._make_table(f"t_{uuid.uuid4().hex[:8]}", pa_schema) + + data = pd.DataFrame({ + "id": list(range(100)), + "payload": [f"row-{i}" for i in range(100)], + }) + + writer = table.new_batch_write_builder().new_write() + writer.write_pandas(data) + commit_msgs = writer.prepare_commit() + committer = table.new_batch_write_builder().new_commit() + committer.commit(commit_msgs) + writer.close() + committer.close() + + scan = table.new_read_builder().new_scan() + reader = table.new_read_builder().new_read() + splits = scan.plan().splits() + result = reader.to_arrow(splits) + + self.assertEqual(result.num_rows, 100) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/hdfs_native_test.py b/paimon-python/pypaimon/tests/hdfs_native_test.py new file mode 100644 index 000000000000..0c818d58336f --- /dev/null +++ b/paimon-python/pypaimon/tests/hdfs_native_test.py @@ -0,0 +1,832 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys +import tempfile +import types +import unittest +from unittest.mock import MagicMock, patch + +import pyarrow.fs as pafs + +from pypaimon.common.file_io import FileIO +from pypaimon.common.options import Options +from pypaimon.common.options.config import HdfsOptions + + +def _install_fake_hdfs_native(): + """Install a fake hdfs_native module (with .fsspec submodule) into + sys.modules. + + Returns (fake_module, mock_client_cls, mock_write_options_cls). + """ + fake = types.ModuleType("hdfs_native") + fake.Client = MagicMock(name="Client") + fake.WriteOptions = MagicMock(name="WriteOptions") + fsspec_mod = types.ModuleType("hdfs_native.fsspec") + fsspec_mod.HdfsFileSystem = MagicMock(name="HdfsFileSystem") + fsspec_mod.ViewfsFileSystem = MagicMock(name="ViewfsFileSystem") + fake.fsspec = fsspec_mod + sys.modules["hdfs_native"] = fake + sys.modules["hdfs_native.fsspec"] = fsspec_mod + return fake, fake.Client, fake.WriteOptions + + +def _uninstall_fake_hdfs_native(): + sys.modules.pop("hdfs_native", None) + sys.modules.pop("hdfs_native.fsspec", None) + # Also drop the cached HdfsNativeFileIO so a re-import sees the new fake + sys.modules.pop( + "pypaimon.filesystem.hdfs_native_file_io", None) + + +class HdfsOptionsTest(unittest.TestCase): + + def test_defaults(self): + opts = Options({}) + self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "native") + self.assertTrue(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)) + self.assertIsNone(opts.get(HdfsOptions.HDFS_CONF_DIR)) + + def test_explicit_pyarrow(self): + opts = Options({"hdfs.client.impl": "pyarrow"}) + self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "pyarrow") + + def test_explicit_fallback_false(self): + opts = Options({"hdfs.client.fallback-to-pyarrow": "false"}) + self.assertFalse(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)) + + +class HdfsNativeFileIORoutingTest(unittest.TestCase): + + def setUp(self): + _uninstall_fake_hdfs_native() + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def test_local_paths_unaffected(self): + fio = FileIO.get("file:///tmp/foo") + self.assertEqual(type(fio).__name__, "LocalFileIO") + + def test_default_hdfs_routes_to_native(self): + _install_fake_hdfs_native() + fio = FileIO.get("hdfs://ns/foo", Options({})) + self.assertEqual(type(fio).__name__, "HdfsNativeFileIO") + + def test_explicit_pyarrow_routes_to_pyarrow(self): + # No hdfs-native module needed; should go straight to pyarrow. + with patch( + "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__", + return_value=None, + ): + fio = FileIO.get( + "hdfs://ns/foo", + Options({"hdfs.client.impl": "pyarrow"}), + ) + self.assertEqual(type(fio).__name__, "PyArrowFileIO") + + def test_native_init_failure_falls_back_to_pyarrow(self): + # hdfs_native not installed; default fallback enabled. + _uninstall_fake_hdfs_native() + with patch( + "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__", + return_value=None, + ): + fio = FileIO.get("hdfs://ns/foo", Options({})) + self.assertEqual(type(fio).__name__, "PyArrowFileIO") + + def test_native_init_failure_no_fallback_raises(self): + _uninstall_fake_hdfs_native() + with self.assertRaises(ImportError): + FileIO.get( + "hdfs://ns/foo", + Options({"hdfs.client.fallback-to-pyarrow": "false"}), + ) + + def test_unsupported_impl_raises(self): + with self.assertRaises(ValueError) as ctx: + FileIO.get( + "hdfs://ns/foo", + Options({"hdfs.client.impl": "bogus"}), + ) + self.assertIn("Unsupported hdfs.client.impl", str(ctx.exception)) + + def test_env_var_override_when_option_absent(self): + _install_fake_hdfs_native() + with patch( + "pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__", + return_value=None, + ): + with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}): + fio = FileIO.get("hdfs://ns/foo", Options({})) + self.assertEqual(type(fio).__name__, "PyArrowFileIO") + + def test_option_wins_over_env_var(self): + _install_fake_hdfs_native() + with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}): + fio = FileIO.get( + "hdfs://ns/foo", + Options({"hdfs.client.impl": "native"}), + ) + self.assertEqual(type(fio).__name__, "HdfsNativeFileIO") + + def test_viewfs_scheme_routes_to_native(self): + _install_fake_hdfs_native() + fio = FileIO.get("viewfs://cluster/foo", Options({})) + self.assertEqual(type(fio).__name__, "HdfsNativeFileIO") + + def test_empty_impl_option_treated_as_unset(self): + # Templated configs sometimes blank the option to opt out — that + # should fall through to the default ("native"), not raise + # "Unsupported hdfs.client.impl ''". + _install_fake_hdfs_native() + fio = FileIO.get("hdfs://ns/foo", Options({"hdfs.client.impl": ""})) + self.assertEqual(type(fio).__name__, "HdfsNativeFileIO") + + +class HdfsNativeFileIOInitTest(unittest.TestCase): + + def setUp(self): + self._fake, self._client_cls, self._wo_cls = _install_fake_hdfs_native() + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def _make(self, path, props=None): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + return HdfsNativeFileIO(path, Options(props or {})) + + def test_constructs_client_with_url(self): + self._make("hdfs://ns1/warehouse") + self._client_cls.assert_called_once() + _, kwargs = self._client_cls.call_args + self.assertEqual(kwargs.get("url"), "hdfs://ns1") + self.assertNotIn("config", kwargs) + + def test_viewfs_scheme_passes_through(self): + self._make("viewfs://cluster1/") + _, kwargs = self._client_cls.call_args + self.assertEqual(kwargs.get("url"), "viewfs://cluster1") + + def test_native_hadoop_keys_forwarded_as_config(self): + self._make("hdfs://ns1/foo", { + "dfs.nameservices": "ns1", + "dfs.ha.namenodes.ns1": "nn1,nn2", + "dfs.namenode.rpc-address.ns1.nn1": "host1:8020", + "fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod", + "warehouse": "hdfs://ns1/warehouse", # should NOT be forwarded + }) + _, kwargs = self._client_cls.call_args + config = kwargs.get("config", {}) + self.assertEqual(config.get("dfs.nameservices"), "ns1") + self.assertEqual(config.get("dfs.ha.namenodes.ns1"), "nn1,nn2") + self.assertEqual( + config.get("dfs.namenode.rpc-address.ns1.nn1"), "host1:8020") + self.assertEqual( + config.get("fs.viewfs.mounttable.cluster.link./prod"), + "hdfs://ns1/prod") + self.assertNotIn("warehouse", config) + + def test_namespaced_overrides_forwarded(self): + self._make("hdfs://ns1/foo", { + "hdfs.config.dfs.client.read.shortcircuit": "true", + }) + _, kwargs = self._client_cls.call_args + config = kwargs.get("config", {}) + self.assertEqual( + config.get("dfs.client.read.shortcircuit"), "true") + + def test_conf_dir_from_option(self): + self._make("hdfs://ns1/foo", { + "hdfs.conf-dir": "/tmp/conf", + }) + _, kwargs = self._client_cls.call_args + self.assertEqual(kwargs.get("config_dir"), "/tmp/conf") + + def test_conf_dir_from_env(self): + env = dict(os.environ) + env["HADOOP_CONF_DIR"] = "/env/conf" + with patch.dict(os.environ, env, clear=True): + self._make("hdfs://ns1/foo") + _, kwargs = self._client_cls.call_args + self.assertEqual(kwargs.get("config_dir"), "/env/conf") + + def test_option_conf_dir_overrides_env(self): + env = dict(os.environ) + env["HADOOP_CONF_DIR"] = "/env/conf" + with patch.dict(os.environ, env, clear=True): + self._make("hdfs://ns1/foo", {"hdfs.conf-dir": "/opt/conf"}) + _, kwargs = self._client_cls.call_args + self.assertEqual(kwargs.get("config_dir"), "/opt/conf") + + @patch("pypaimon.filesystem._kerberos.subprocess.run") + def test_kerberos_principal_keytab_triggers_kinit(self, mock_kinit): + mock_kinit.return_value = MagicMock() + with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file: + with patch.dict(os.environ, {"KRB5CCNAME": "/tmp/kc_test"}): + self._make("hdfs://ns1/foo", { + "security.kerberos.login.principal": "user@REALM", + "security.kerberos.login.keytab": keytab_file.name, + }) + kinit_calls = [ + c for c in mock_kinit.call_args_list + if c[0][0][0] == "kinit" + ] + self.assertEqual(len(kinit_calls), 1) + self.assertEqual( + kinit_calls[0][0][0], + ["kinit", "-kt", keytab_file.name, "user@REALM"], + ) + + def test_principal_without_keytab_raises(self): + with self.assertRaises(ValueError) as ctx: + self._make("hdfs://ns1/foo", { + "security.kerberos.login.principal": "user@REALM", + }) + self.assertIn("must be both set or both unset", str(ctx.exception)) + + @patch("pypaimon.filesystem._kerberos.subprocess.run") + def test_kerberos_preserves_FILE_prefix_on_krb5ccname(self, mock_kinit): + # If KRB5CCNAME had a `FILE:` qualifier, the rewrite after kinit + # must keep it so GSSAPI cache-type detection isn't perturbed. + mock_kinit.return_value = MagicMock() + with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file: + with patch.dict(os.environ, + {"KRB5CCNAME": "FILE:/tmp/kc_test"}, + clear=True): + self._make("hdfs://ns1/foo", { + "security.kerberos.login.principal": "user@REALM", + "security.kerberos.login.keytab": keytab_file.name, + }) + self.assertEqual( + os.environ["KRB5CCNAME"], "FILE:/tmp/kc_test") + + @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path", + return_value="/tmp/freshly_kinit_cache") + @patch("pypaimon.filesystem._kerberos.subprocess.run") + def test_kerberos_warns_when_overwriting_different_cache( + self, mock_kinit, _mock_cache, + ): + # Multi-principal in the same process clobbers KRB5CCNAME; we warn + # so the operator sees the race instead of silently mis-routing + # earlier instances' RPCs. + mock_kinit.return_value = MagicMock() + with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file: + with patch.dict(os.environ, + {"KRB5CCNAME": "/tmp/some_other_cache"}, + clear=True): + with self.assertLogs( + "pypaimon.filesystem.hdfs_native_file_io", + level="WARNING", + ) as log_ctx: + self._make("hdfs://ns1/foo", { + "security.kerberos.login.principal": "user@REALM", + "security.kerberos.login.keytab": keytab_file.name, + }) + self.assertTrue( + any("Overwriting process-global KRB5CCNAME" in m + for m in log_ctx.output), + log_ctx.output, + ) + + @patch("pypaimon.filesystem._kerberos.get_ticket_cache_path", + return_value="/tmp/kc_test") + @patch("pypaimon.filesystem._kerberos.subprocess.run") + def test_kerberos_no_warning_when_cache_unchanged( + self, mock_kinit, _mock_cache, + ): + import logging as _logging + mock_kinit.return_value = MagicMock() + with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file: + # Pre-existing value matches the kinit-resolved cache → no warn. + with patch.dict(os.environ, + {"KRB5CCNAME": "/tmp/kc_test"}, + clear=True): + # assertNoLogs is 3.10+; patch warning explicitly so older + # interpreters keep working too. + logger = _logging.getLogger( + "pypaimon.filesystem.hdfs_native_file_io") + with patch.object(logger, "warning") as warn: + self._make("hdfs://ns1/foo", { + "security.kerberos.login.principal": "user@REALM", + "security.kerberos.login.keytab": keytab_file.name, + }) + warn.assert_not_called() + + def test_unsupported_scheme_raises(self): + with self.assertRaises(ValueError): + self._make("s3://bucket/key") + + +class HdfsNativeFileIOOpsTest(unittest.TestCase): + + def setUp(self): + self._fake, self._client_cls, self._wo_cls = _install_fake_hdfs_native() + self._mock_client = MagicMock(name="ClientInstance") + self._client_cls.return_value = self._mock_client + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + self.fio = HdfsNativeFileIO("hdfs://ns/", Options({})) + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def _file_status(self, path, isdir=False, length=0, mtime=0): + s = MagicMock() + s.path = path + s.isdir = isdir + s.length = length + s.modification_time = mtime + return s + + def test_exists_true(self): + self._mock_client.get_file_info.return_value = self._file_status("/x") + self.assertTrue(self.fio.exists("/x")) + + def test_exists_false(self): + self._mock_client.get_file_info.side_effect = FileNotFoundError("nope") + self.assertFalse(self.fio.exists("/missing")) + + def test_get_file_status_adapts_to_pafs_filetype(self): + self._mock_client.get_file_info.return_value = self._file_status( + "/x", isdir=False, length=42, mtime=1700000000000, + ) + info = self.fio.get_file_status("/x") + self.assertEqual(info.path, "/x") + self.assertEqual(info.size, 42) + self.assertEqual(info.type, pafs.FileType.File) + self.assertIsNotNone(info.mtime) + + def test_list_status(self): + self._mock_client.list_status.return_value = iter([ + self._file_status("/x/a", isdir=False, length=10), + self._file_status("/x/b", isdir=True), + ]) + infos = self.fio.list_status("/x") + self.assertEqual(len(infos), 2) + self.assertEqual(infos[0].type, pafs.FileType.File) + self.assertEqual(infos[1].type, pafs.FileType.Directory) + self.assertIsNone(infos[1].size) + + def test_delete_missing_returns_false(self): + self._mock_client.get_file_info.side_effect = FileNotFoundError("nope") + self.assertFalse(self.fio.delete("/missing")) + self._mock_client.delete.assert_not_called() + + def test_delete_file(self): + self._mock_client.get_file_info.return_value = self._file_status("/x") + self._mock_client.delete.return_value = True + self.assertTrue(self.fio.delete("/x")) + self._mock_client.delete.assert_called_once_with("/x", False) + + def test_delete_nonempty_dir_without_recursive_raises(self): + self._mock_client.get_file_info.return_value = self._file_status( + "/x", isdir=True) + self._mock_client.list_status.return_value = iter([ + self._file_status("/x/a")]) + with self.assertRaises(OSError): + self.fio.delete("/x", recursive=False) + + def test_mkdirs_creates_when_missing(self): + self._mock_client.get_file_info.side_effect = FileNotFoundError("nope") + self.assertTrue(self.fio.mkdirs("/new")) + self._mock_client.mkdirs.assert_called_once_with( + "/new", create_parent=True) + + def test_mkdirs_idempotent_on_existing_dir(self): + self._mock_client.get_file_info.return_value = self._file_status( + "/x", isdir=True) + self.assertTrue(self.fio.mkdirs("/x")) + self._mock_client.mkdirs.assert_not_called() + + def test_mkdirs_existing_file_raises(self): + self._mock_client.get_file_info.return_value = self._file_status( + "/x", isdir=False) + with self.assertRaises(FileExistsError): + self.fio.mkdirs("/x") + + +class HdfsNativeAdaptersTest(unittest.TestCase): + + def setUp(self): + _install_fake_hdfs_native() + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def test_writer_adapter_tracks_position_and_closes_once(self): + from pypaimon.filesystem.hdfs_native_file_io import _HdfsWriterAdapter + fw = MagicMock() + fw.write.side_effect = lambda buf: len(buf) + adapter = _HdfsWriterAdapter(fw) + adapter.write(b"abc") + adapter.write(b"defg") + self.assertEqual(adapter.tell(), 7) + adapter.close() + adapter.close() # idempotent + fw.close.assert_called_once() + + def test_reader_adapter_seek_and_read(self): + from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter + fr = MagicMock() + fr.tell.side_effect = [20, 30] + fr.read.return_value = b"x" * 10 + adapter = _HdfsReaderAdapter(fr) + self.assertEqual(adapter.seek(20), 20) + fr.seek.assert_called_once_with(20, 0) + data = adapter.read(10) + self.assertEqual(data, b"x" * 10) + fr.read.assert_called_once_with(10) + self.assertEqual(adapter.tell(), 30) + + def test_reader_adapter_read_negative_reads_all(self): + from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter + fr = MagicMock() + fr.read.return_value = b"all-content" + adapter = _HdfsReaderAdapter(fr) + self.assertEqual(adapter.read(), b"all-content") + fr.read.assert_called_once_with(-1) + + def test_reader_adapter_close_releases_underlying(self): + from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter + fr = MagicMock() + adapter = _HdfsReaderAdapter(fr) + adapter.close() + adapter.close() # idempotent + fr.close.assert_called_once() + self.assertTrue(adapter.closed) + + +def _write_hadoop_xml(path, entries): + """Write a minimal Hadoop-style xml file at `path`.""" + body = ['', ""] + for name, value in entries.items(): + body.append( + f" {name}{value}" + ) + body.append("") + with open(path, "w") as f: + f.write("\n".join(body)) + + +class ViewFsFallbackTest(unittest.TestCase): + """Cover _load_hadoop_xml + _maybe_inject_viewfs_fallback polyfill.""" + + def setUp(self): + _install_fake_hdfs_native() + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + self.Fio = HdfsNativeFileIO + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def test_load_hadoop_xml_merges_two_files(self): + with tempfile.TemporaryDirectory() as d: + _write_hadoop_xml(os.path.join(d, "core-site.xml"), + {"fs.defaultFS": "viewfs://c1"}) + _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"), + {"dfs.nameservices": "ns1"}) + cfg = self.Fio._load_hadoop_xml(d) + self.assertEqual(cfg.get("fs.defaultFS"), "viewfs://c1") + self.assertEqual(cfg.get("dfs.nameservices"), "ns1") + + def test_load_hadoop_xml_missing_dir_returns_empty(self): + self.assertEqual(self.Fio._load_hadoop_xml(None), {}) + self.assertEqual(self.Fio._load_hadoop_xml("/no/such/dir/xyz"), {}) + + def test_load_hadoop_xml_malformed_file_skipped(self): + with tempfile.TemporaryDirectory() as d: + with open(os.path.join(d, "core-site.xml"), "w") as f: + f.write(" absolute-path normalisation for hdfs-native.""" + + def setUp(self): + _install_fake_hdfs_native() + + def tearDown(self): + _uninstall_fake_hdfs_native() + + def _make(self, root): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + return HdfsNativeFileIO(root, Options({})) + + def test_viewfs_uri_same_cluster_returns_path(self): + fio = self._make("viewfs://cluster1/") + self.assertEqual( + fio.to_filesystem_path("viewfs://cluster1/home/hudi/x"), + "/home/hudi/x", + ) + + def test_viewfs_uri_no_path_returns_root(self): + fio = self._make("viewfs://cluster1/") + self.assertEqual(fio.to_filesystem_path("viewfs://cluster1"), "/") + + def test_viewfs_absolute_path_unchanged(self): + fio = self._make("viewfs://cluster1/") + self.assertEqual(fio.to_filesystem_path("/foo/bar"), "/foo/bar") + + def test_hdfs_uri_same_ns_returns_path(self): + fio = self._make("hdfs://ns1/") + self.assertEqual( + fio.to_filesystem_path("hdfs://ns1/foo/bar"), + "/foo/bar", + ) + + def test_hdfs_uri_different_ns_unchanged(self): + fio = self._make("hdfs://ns1/") + self.assertEqual( + fio.to_filesystem_path("hdfs://nsX/foo"), + "hdfs://nsX/foo", + ) + + def test_hdfs_client_with_viewfs_uri_unchanged(self): + fio = self._make("hdfs://ns1/") + # Different scheme; let hdfs-native error rather than silently rewrite. + self.assertEqual( + fio.to_filesystem_path("viewfs://cluster1/foo"), + "viewfs://cluster1/foo", + ) + + def test_exists_passes_path_only_to_client(self): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + fio = HdfsNativeFileIO("viewfs://cluster1/", Options({})) + client = sys.modules["hdfs_native"].Client.return_value + client.get_file_info.return_value = MagicMock( + path="/home/hudi/x", isdir=False, length=0, modification_time=0) + fio.exists("viewfs://cluster1/home/hudi/x") + client.get_file_info.assert_called_once_with("/home/hudi/x") + + +class FilesystemPropertyTest(unittest.TestCase): + """Cover the lazy pyarrow.fs facade backed by hdfs_native.fsspec.""" + + def setUp(self): + _install_fake_hdfs_native() + self._patcher = patch("pyarrow.fs.PyFileSystem") + self._handler_patcher = patch("pyarrow.fs.FSSpecHandler") + self.MockPyFs = self._patcher.start() + self.MockHandler = self._handler_patcher.start() + + def tearDown(self): + self._patcher.stop() + self._handler_patcher.stop() + _uninstall_fake_hdfs_native() + + def _make(self, root, props=None, xml_entries=None): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + if xml_entries: + d = tempfile.mkdtemp() + self.addCleanup(lambda: __import__("shutil").rmtree(d, ignore_errors=True)) + _write_hadoop_xml(os.path.join(d, "hdfs-site.xml"), xml_entries) + base_props = {"hdfs.conf-dir": d} + base_props.update(props or {}) + props = base_props + return HdfsNativeFileIO(root, Options(props or {})) + + def test_viewfs_uses_viewfs_fsspec_class(self): + fio = self._make("viewfs://cluster1/") + fs_instance = fio.filesystem # trigger lazy + VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem + HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem + VFs.assert_called_once() + HFs.assert_not_called() + _, kwargs = VFs.call_args + self.assertEqual(kwargs.get("host"), "cluster1") + self.assertIs(fs_instance, self.MockPyFs.return_value) + + def test_hdfs_uses_hdfs_fsspec_class(self): + self._make("hdfs://ns1/").filesystem + HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem + VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem + HFs.assert_called_once() + VFs.assert_not_called() + _, kwargs = HFs.call_args + self.assertEqual(kwargs.get("host"), "ns1") + + def test_lazy_caches_after_first_access(self): + fio = self._make("hdfs://ns1/") + first = fio.filesystem + second = fio.filesystem + self.assertIs(first, second) + HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem + self.assertEqual(HFs.call_count, 1) + + def test_xml_and_catalog_options_merged_into_fsspec_storage_options(self): + fio = self._make( + "hdfs://ns1/", + props={"dfs.client.read.shortcircuit": "true"}, + xml_entries={"dfs.nameservices": "ns1"}, + ) + fio.filesystem # trigger + HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem + _, kwargs = HFs.call_args + # Both xml and option keys should land in the fsspec kwargs. + self.assertEqual(kwargs.get("dfs.nameservices"), "ns1") + self.assertEqual(kwargs.get("dfs.client.read.shortcircuit"), "true") + + def test_catalog_option_overrides_xml(self): + fio = self._make( + "hdfs://ns1/", + props={"dfs.foo": "v_user"}, + xml_entries={"dfs.foo": "v_xml"}, + ) + fio.filesystem + HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem + _, kwargs = HFs.call_args + self.assertEqual(kwargs.get("dfs.foo"), "v_user") + + def test_missing_fsspec_raises_clear_error(self): + fio = self._make("hdfs://ns1/") + # Remove the fsspec submodule but keep hdfs_native itself, to + # simulate an old/partial install. + sys.modules.pop("hdfs_native.fsspec", None) + sys.modules["hdfs_native"].fsspec = None + with self.assertRaises(RuntimeError) as ctx: + fio.filesystem + self.assertIn("hdfs-native fsspec adapter", str(ctx.exception)) + + +class PickleTest(unittest.TestCase): + """Cover __reduce__ so Ray / multiprocessing can ship FileIO.""" + + def setUp(self): + _install_fake_hdfs_native() + # Isolate from any HADOOP_CONF_DIR on the host so __reduce__'s + # env-derived config_dir pinning is deterministic across machines. + self._env_patcher = patch.dict(os.environ, {}, clear=True) + self._env_patcher.start() + + def tearDown(self): + self._env_patcher.stop() + _uninstall_fake_hdfs_native() + + def _make(self, path, props=None): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + return HdfsNativeFileIO(path, Options(props or {})) + + def test_reduce_returns_class_and_args(self): + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + fio = self._make("viewfs://cluster1/some/sub/path", + {"dfs.nameservices": "ns1"}) + cls, args = fio.__reduce__() + self.assertIs(cls, HdfsNativeFileIO) + path, options = args + # Path is rebuilt from scheme+netloc (path segment dropped) — that + # is intentional because __init__ ignores path beyond scheme+netloc. + self.assertEqual(path, "viewfs://cluster1") + self.assertEqual(options.to_map(), {"dfs.nameservices": "ns1"}) + + def test_reduce_for_empty_netloc(self): + fio = self._make("hdfs://") + _, (path, _) = fio.__reduce__() + self.assertEqual(path, "hdfs://") + + def test_reduce_pins_env_resolved_config_dir_into_options(self): + # config_dir resolved from $HADOOP_CONF_DIR should be carried into + # the pickled options so a worker on a host with a different env + # value still uses the driver's resolved directory. + with tempfile.TemporaryDirectory() as d: + with patch.dict(os.environ, {"HADOOP_CONF_DIR": d}, clear=True): + fio = self._make("hdfs://ns1/foo") + _, (_, options) = fio.__reduce__() + self.assertEqual(options.to_map().get("hdfs.conf-dir"), d) + + def test_reduce_does_not_override_explicit_conf_dir_option(self): + with tempfile.TemporaryDirectory() as opt_dir: + with patch.dict(os.environ, + {"HADOOP_CONF_DIR": "/env/dir"}, clear=True): + fio = self._make("hdfs://ns1/foo", + {"hdfs.conf-dir": opt_dir}) + _, (_, options) = fio.__reduce__() + self.assertEqual(options.to_map().get("hdfs.conf-dir"), opt_dir) + + def test_pickle_roundtrip_preserves_type_and_options(self): + import pickle + fio = self._make("hdfs://ns1/foo", + {"dfs.foo": "bar", "fs.viewfs.x": "y"}) + client_cls = sys.modules["hdfs_native"].Client + client_cls.reset_mock() + # Roundtrip via the highest pickle protocol. + blob = pickle.dumps(fio, protocol=pickle.HIGHEST_PROTOCOL) + restored = pickle.loads(blob) + from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO + self.assertIsInstance(restored, HdfsNativeFileIO) + self.assertEqual(restored.properties.to_map(), + {"dfs.foo": "bar", "fs.viewfs.x": "y"}) + # The original __init__ ran once; the unpickle ran __init__ again. + self.assertEqual(client_cls.call_count, 1) + + def test_pickle_with_viewfs_scheme(self): + import pickle + fio = self._make("viewfs://cluster1/") + restored = pickle.loads(pickle.dumps(fio)) + self.assertEqual(restored._scheme, "viewfs") + self.assertEqual(restored._netloc, "cluster1") + + def test_pickle_does_not_serialise_live_client(self): + # If the live _client were pickled, the call would fail (MagicMocks + # are picklable but the real RawClient would not be). This test + # documents the contract: __reduce__ MUST sidestep _client. + import pickle + fio = self._make("hdfs://ns1/") + blob = pickle.dumps(fio) + # The pickled blob should reference the constructor inputs only; + # specifically it should not embed the literal mock _client. + self.assertNotIn(b"_client", blob) + + +if __name__ == "__main__": + unittest.main() diff --git a/paimon-python/pypaimon/tests/kerberos_test.py b/paimon-python/pypaimon/tests/kerberos_test.py index 06d98d90d6ad..722043d77899 100644 --- a/paimon-python/pypaimon/tests/kerberos_test.py +++ b/paimon-python/pypaimon/tests/kerberos_test.py @@ -235,7 +235,7 @@ def test_get_ticket_cache_no_cache(self): @patch("pypaimon.filesystem.pyarrow_file_io.subprocess.run") @patch("pypaimon.filesystem.pyarrow_file_io.pafs.HadoopFileSystem") def test_hdfs_with_fallback_keys(self, mock_hdfs_fs, mock_subprocess_run): - """Verify that Java-compatible fallback keys security.principal / security.keytab work.""" + """Verify that the secondary fallback keys security.principal / security.keytab work.""" mock_subprocess_run.return_value = MagicMock(stdout="/some/classpath") with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file: diff --git a/paimon-python/setup.py b/paimon-python/setup.py index aaba800fecb5..5e7b86c36fa7 100644 --- a/paimon-python/setup.py +++ b/paimon-python/setup.py @@ -181,6 +181,9 @@ def read_requirements(): 'pypaimon-rust; python_version>="3.10"', 'datafusion>=52; python_version>="3.10"', ], + 'hdfs': [ + 'hdfs-native>=0.13,<1; platform_system!="Windows"', + ], }, description="Apache Paimon Python API", long_description=long_description,