Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions paimon-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,77 @@ pip3 install dist/*.tar.gz

The command will install the package and core dependencies to your local Python environment.

# HDFS without a local Hadoop install

`pypaimon` supports HDFS through a pure-protocol client based on
[`hdfs-native`](https://github.com/Kimahriman/hdfs-native) (Rust + PyO3).
Use it when you want HDFS access **without** installing Hadoop, a JDK,
`libhdfs`, or wrestling with `CLASSPATH` / `LD_LIBRARY_PATH`.

Install with the optional extra:

```commandline
pip install 'pypaimon[hdfs]'
```

For `hdfs://` and `viewfs://` URIs this backend is now the default.
Switch back to the legacy `libhdfs` (JNI) path with:

```python
catalog = CatalogFactory.create({
"warehouse": "hdfs://ns1/warehouse",
"hdfs.client.impl": "pyarrow", # default: "native"
})
```

## Sourcing the cluster wiring

The client still needs to know about NameNode addresses, HA failover
groups, and `viewfs` mount tables. Three options:

1. **Local xml** — set `HADOOP_CONF_DIR` (or the `hdfs.conf-dir` option)
to a directory containing `core-site.xml` / `hdfs-site.xml`. Only the
xml is required; no Hadoop binaries or JDK.

2. **Catalog options (REST-friendly)** — pass the original Hadoop
key/values directly in catalog options. Keys with prefixes `dfs.`,
`fs.`, `hadoop.`, `ipc.`, `io.` are forwarded as-is. A REST catalog
can deliver these in its response, giving a fully zero-file client
experience:

```python
CatalogFactory.create({
"warehouse": "viewfs://cluster/warehouse",
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "host-1:8020",
"dfs.namenode.rpc-address.ns1.nn2": "host-2:8020",
"fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
})
```

3. **Namespaced overrides** — use `hdfs.config.<key>` to forward any
other Hadoop key not covered by the prefix whitelist.

The three sources can be combined; catalog options take precedence over
xml.

## Kerberos

A secured cluster still needs the GSSAPI system library
(`libgssapi-krb5-2` on Debian/Ubuntu, `krb5` via Homebrew on macOS,
`krb5-libs` on RHEL) plus a `krb5.conf`. Provide credentials by either:

- Running `kinit` yourself and pointing `KRB5CCNAME` at the cache, or
- Setting `security.kerberos.login.principal` and
`security.kerberos.login.keytab` in catalog options — `pypaimon` will
run `kinit` for you.

## Fallback behaviour

If the native backend fails to initialise (e.g. wheel missing on an
unsupported platform such as Windows), `pypaimon` automatically falls
back to the `pyarrow` (`libhdfs`/JVM) path and logs a warning. Disable
the fallback with `hdfs.client.fallback-to-pyarrow=false` if you want
hard failures instead.

168 changes: 168 additions & 0 deletions paimon-python/pypaimon/benchmark/hdfs_io_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""HDFS FileIO benchmark: native (hdfs-native) vs pyarrow (libhdfs/JVM).

Compares throughput of common FileIO operations between the two backends
against the same HDFS cluster. Each backend is exercised via the FileIO
factory by toggling the `hdfs.client.impl` option.

Usage:
python -m pypaimon.benchmark.hdfs_io_bench \\
--warehouse hdfs://localhost:8020/bench \\
[--backend native|pyarrow|both] \\
[--write-size-mb 256] \\
[--list-files 1000] \\
[--read-iters 3]

Notes:
- `pyarrow` backend requires HADOOP_HOME + HADOOP_CONF_DIR + libhdfs.
- `native` backend requires `pip install pypaimon[hdfs]`.
- The benchmark writes/reads scratch files under <warehouse>/bench_<uuid>/
and removes them on exit.
"""

import argparse
import os
import sys
import time
import uuid
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from pypaimon.common.file_io import FileIO # noqa: E402
from pypaimon.common.options import Options # noqa: E402


def _build_file_io(warehouse: str, backend: str) -> FileIO:
opts = Options({"hdfs.client.impl": backend})
return FileIO.get(warehouse, opts)


def _human(seconds: float) -> str:
if seconds < 1e-3:
return f"{seconds * 1e6:.0f}us"
if seconds < 1:
return f"{seconds * 1e3:.1f}ms"
return f"{seconds:.2f}s"


def _bench_write(file_io: FileIO, root: str, size_mb: int) -> float:
payload = os.urandom(min(size_mb, 16) * 1024 * 1024)
path = f"{root}/write-{uuid.uuid4().hex[:8]}.bin"
t0 = time.perf_counter()
with file_io.new_output_stream(path) as stream:
written = 0
target = size_mb * 1024 * 1024
while written < target:
chunk = payload[: min(len(payload), target - written)]
n = stream.write(chunk)
written += n if isinstance(n, int) and n > 0 else len(chunk)
return time.perf_counter() - t0


def _bench_read(file_io: FileIO, path: str) -> float:
t0 = time.perf_counter()
with file_io.new_input_stream(path) as stream:
while True:
chunk = stream.read(8 * 1024 * 1024)
if not chunk:
break
return time.perf_counter() - t0


def _bench_list(file_io: FileIO, root: str, num_files: int) -> float:
scratch = f"{root}/list-{uuid.uuid4().hex[:8]}"
file_io.mkdirs(scratch)
try:
for i in range(num_files):
with file_io.new_output_stream(f"{scratch}/f-{i:06d}.txt") as s:
s.write(b"x")
t0 = time.perf_counter()
results = file_io.list_status(scratch)
_ = list(results)
return time.perf_counter() - t0
finally:
file_io.delete(scratch, recursive=True)


def run_one(backend: str, args) -> None:
print(f"\n=== backend={backend} ===")
try:
file_io = _build_file_io(args.warehouse, backend)
except Exception as e:
print(f" init failed: {e}")
return

bench_root = f"{args.warehouse.rstrip('/')}/bench_{uuid.uuid4().hex[:8]}"
file_io.mkdirs(bench_root)
try:
# Write
sample_path = f"{bench_root}/write-sample.bin"
with file_io.new_output_stream(sample_path) as stream:
payload = os.urandom(min(args.write_size_mb, 16) * 1024 * 1024)
written = 0
target = args.write_size_mb * 1024 * 1024
t0 = time.perf_counter()
while written < target:
chunk = payload[: min(len(payload), target - written)]
n = stream.write(chunk)
written += n if isinstance(n, int) and n > 0 else len(chunk)
write_elapsed = time.perf_counter() - t0
mb_per_s = args.write_size_mb / write_elapsed if write_elapsed else 0
print(f" write {args.write_size_mb}MB: "
f"{_human(write_elapsed)} ({mb_per_s:.1f} MB/s)")

# Read (warm)
read_times = []
for _ in range(args.read_iters):
read_times.append(_bench_read(file_io, sample_path))
avg_read = sum(read_times) / len(read_times)
rmb_per_s = args.write_size_mb / avg_read if avg_read else 0
print(f" read {args.write_size_mb}MB (avg of {args.read_iters}): "
f"{_human(avg_read)} ({rmb_per_s:.1f} MB/s)")

# List
list_elapsed = _bench_list(file_io, bench_root, args.list_files)
print(f" list {args.list_files} files: {_human(list_elapsed)}")

finally:
try:
file_io.delete(bench_root, recursive=True)
except Exception as e:
print(f" cleanup failed: {e}")


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--warehouse", required=True,
help="HDFS URI under which scratch files are created")
parser.add_argument("--backend", default="both",
choices=["native", "pyarrow", "both"])
parser.add_argument("--write-size-mb", type=int, default=128)
parser.add_argument("--list-files", type=int, default=1000)
parser.add_argument("--read-iters", type=int, default=3)
args = parser.parse_args()

backends = ["native", "pyarrow"] if args.backend == "both" else [args.backend]
for backend in backends:
run_one(backend, args)


if __name__ == "__main__":
main()
41 changes: 39 additions & 2 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,11 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
"""
Returns a FileIO instance for accessing the file system identified by the given path.
- LocalFileIO for local file system (file:// or no scheme)
- PyArrowFileIO for remote file systems (oss://, s3://, hdfs://, etc.)
- HdfsNativeFileIO for HDFS/ViewFS (default; pure protocol client, no Hadoop install)
- PyArrowFileIO for other remote file systems (oss://, s3://, gs://, ...),
and for HDFS when explicitly requested via hdfs.client.impl=pyarrow
"""
import os as _os
from urllib.parse import urlparse

uri = urlparse(path)
Expand All @@ -276,5 +279,39 @@ def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
from pypaimon.filesystem.local_file_io import LocalFileIO
return LocalFileIO(path, catalog_options)

opts = catalog_options or Options({})

if scheme in ("hdfs", "viewfs"):
from pypaimon.common.options.config import HdfsOptions
impl_source = "hdfs.client.impl option"
# Treat an empty option value the same as "unset" so callers can
# blank it out (common in templated configs) without tripping
# the unsupported-impl branch.
impl_value = opts.to_map().get(HdfsOptions.HDFS_CLIENT_IMPL.key())
if not impl_value:
impl_value = _os.environ.get("PYPAIMON_HDFS_IMPL")
impl_source = "PYPAIMON_HDFS_IMPL env var"
if not impl_value:
impl_value = HdfsOptions.HDFS_CLIENT_IMPL.default_value()
impl_source = "default"
impl = impl_value.lower()
if impl == "native":
try:
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
return HdfsNativeFileIO(path, opts)
except (ImportError, RuntimeError) as e:
fallback = opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW)
if not fallback:
raise
logging.getLogger(__name__).warning(
"Native HDFS backend init failed, falling back to "
"pyarrow: %s", e,
)
elif impl != "pyarrow":
raise ValueError(
f"Unsupported hdfs.client.impl '{impl_value}' "
f"(from {impl_source}). Supported: 'native', 'pyarrow'."
)

from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
return PyArrowFileIO(path, catalog_options or Options({}))
return PyArrowFileIO(path, opts)
35 changes: 35 additions & 0 deletions paimon-python/pypaimon/common/options/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,41 @@ class CatalogOptions:
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1


class HdfsOptions:
HDFS_CLIENT_IMPL = (
ConfigOptions.key("hdfs.client.impl")
.string_type()
.default_value("native")
.with_description(
"HDFS FileIO backend. Supported values: 'native' (default, uses "
"hdfs-native protocol client, no Hadoop install required), "
"'pyarrow' (legacy, requires HADOOP_HOME / libhdfs / JVM)."
)
)
HDFS_CLIENT_FALLBACK_TO_PYARROW = (
ConfigOptions.key("hdfs.client.fallback-to-pyarrow")
.boolean_type()
.default_value(True)
.with_description(
"When the native backend fails to initialise (e.g. missing wheel "
"or unsupported platform), fall back to the pyarrow backend "
"instead of raising."
)
)
HDFS_CONF_DIR = (
ConfigOptions.key("hdfs.conf-dir")
.string_type()
.no_default_value()
.with_description(
"Directory containing core-site.xml / hdfs-site.xml that the "
"native client should load. Defaults to $HADOOP_CONF_DIR."
)
)

HDFS_CONFIG_PREFIX = "hdfs.config."
HDFS_NATIVE_CONFIG_KEY_PREFIXES = ("dfs.", "fs.", "hadoop.", "ipc.", "io.")


class SecurityOptions:
KERBEROS_PRINCIPAL = (
ConfigOptions.key("security.kerberos.login.principal")
Expand Down
45 changes: 45 additions & 0 deletions paimon-python/pypaimon/filesystem/_kerberos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Shared Kerberos helpers used by HDFS FileIO backends."""

import os
import subprocess
from typing import Optional


def kerberos_login_from_keytab(principal: str, keytab: str) -> None:
if not os.path.isfile(keytab):
raise FileNotFoundError(f"Kerberos keytab file not found: {keytab}")
if not os.access(keytab, os.R_OK):
raise PermissionError(f"Kerberos keytab file is not readable: {keytab}")
subprocess.run(
['kinit', '-kt', keytab, principal],
check=True, capture_output=True, text=True,
)


def get_ticket_cache_path() -> Optional[str]:
cc = os.environ.get('KRB5CCNAME')
if cc:
if cc.startswith('FILE:'):
return cc[5:]
return cc
default_path = f'/tmp/krb5cc_{os.getuid()}'
if os.path.exists(default_path):
return default_path
return None
Loading
Loading