Skip to content

Commit fe8232a

Browse files
antonlin1claude
andcommitted
fix: Extract ADLS account_name from URI hostname in FsspecFileIO
When tables created by Spark/Hadoop store fully-qualified ABFSS URIs (e.g. abfss://container@account.dfs.core.windows.net/path) but catalog properties don't include adls.account-name, FsspecFileIO would create AzureBlobFileSystem with account_name=None. adlfs then strips the URI to container/path via _strip_protocol(), losing the storage account, resulting in FileNotFoundError. The fix extracts account_name from the URI hostname as a fallback in _adls(), after SAS token extraction and explicit property checks, so existing configuration always takes priority. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7e66ccb commit fe8232a

File tree

2 files changed

+84
-11
lines changed

2 files changed

+84
-11
lines changed

pyiceberg/io/fsspec.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
TYPE_CHECKING,
3030
Any,
3131
)
32-
from urllib.parse import urlparse
32+
from urllib.parse import ParseResult, urlparse
3333

3434
import requests
3535
from fsspec import AbstractFileSystem
@@ -244,7 +244,7 @@ def _gs(properties: Properties) -> AbstractFileSystem:
244244
)
245245

246246

247-
def _adls(properties: Properties) -> AbstractFileSystem:
247+
def _adls(properties: Properties, hostname: str | None = None) -> AbstractFileSystem:
248248
# https://fsspec.github.io/adlfs/api/
249249

250250
from adlfs import AzureBlobFileSystem
@@ -259,6 +259,10 @@ def _adls(properties: Properties) -> AbstractFileSystem:
259259
if ADLS_SAS_TOKEN not in properties:
260260
properties[ADLS_SAS_TOKEN] = sas_token
261261

262+
# Fallback: extract account_name from URI hostname (e.g. "account.dfs.core.windows.net" -> "account")
263+
if hostname and ADLS_ACCOUNT_NAME not in properties:
264+
properties[ADLS_ACCOUNT_NAME] = hostname.split(".")[0]
265+
262266
class StaticTokenCredential(AsyncTokenCredential):
263267
_DEFAULT_EXPIRY_SECONDS = 3600
264268

@@ -300,7 +304,7 @@ def _hf(properties: Properties) -> AbstractFileSystem:
300304
)
301305

302306

303-
SCHEME_TO_FS = {
307+
SCHEME_TO_FS: dict[str, Callable[..., AbstractFileSystem]] = {
304308
"": _file,
305309
"file": _file,
306310
"s3": _s3,
@@ -313,6 +317,8 @@ def _hf(properties: Properties) -> AbstractFileSystem:
313317
"hf": _hf,
314318
}
315319

320+
_ADLS_SCHEMES = frozenset({"abfs", "abfss", "wasb", "wasbs"})
321+
316322

317323
class FsspecInputFile(InputFile):
318324
"""An input file implementation for the FsspecFileIO.
@@ -414,8 +420,7 @@ class FsspecFileIO(FileIO):
414420
"""A FileIO implementation that uses fsspec."""
415421

416422
def __init__(self, properties: Properties):
417-
self._scheme_to_fs = {}
418-
self._scheme_to_fs.update(SCHEME_TO_FS)
423+
self._scheme_to_fs: dict[str, Callable[..., AbstractFileSystem]] = dict(SCHEME_TO_FS)
419424
self._thread_locals = threading.local()
420425
super().__init__(properties=properties)
421426

@@ -429,7 +434,7 @@ def new_input(self, location: str) -> FsspecInputFile:
429434
FsspecInputFile: An FsspecInputFile instance for the given location.
430435
"""
431436
uri = urlparse(location)
432-
fs = self.get_fs(uri.scheme)
437+
fs = self._get_fs_from_uri(uri)
433438
return FsspecInputFile(location=location, fs=fs)
434439

435440
def new_output(self, location: str) -> FsspecOutputFile:
@@ -442,7 +447,7 @@ def new_output(self, location: str) -> FsspecOutputFile:
442447
FsspecOutputFile: An FsspecOutputFile instance for the given location.
443448
"""
444449
uri = urlparse(location)
445-
fs = self.get_fs(uri.scheme)
450+
fs = self._get_fs_from_uri(uri)
446451
return FsspecOutputFile(location=location, fs=fs)
447452

448453
def delete(self, location: str | InputFile | OutputFile) -> None:
@@ -459,20 +464,30 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
459464
str_location = location
460465

461466
uri = urlparse(str_location)
462-
fs = self.get_fs(uri.scheme)
467+
fs = self._get_fs_from_uri(uri)
463468
fs.rm(str_location)
464469

465-
def get_fs(self, scheme: str) -> AbstractFileSystem:
470+
def _get_fs_from_uri(self, uri: "ParseResult") -> AbstractFileSystem:
471+
"""Get a filesystem from a parsed URI, using hostname for ADLS account resolution."""
472+
if uri.scheme in _ADLS_SCHEMES:
473+
return self.get_fs(uri.scheme, uri.hostname)
474+
return self.get_fs(uri.scheme)
475+
476+
def get_fs(self, scheme: str, hostname: str | None = None) -> AbstractFileSystem:
466477
"""Get a filesystem for a specific scheme, cached per thread."""
467478
if not hasattr(self._thread_locals, "get_fs_cached"):
468479
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
469480

470-
return self._thread_locals.get_fs_cached(scheme)
481+
return self._thread_locals.get_fs_cached(scheme, hostname)
471482

472-
def _get_fs(self, scheme: str) -> AbstractFileSystem:
483+
def _get_fs(self, scheme: str, hostname: str | None = None) -> AbstractFileSystem:
473484
"""Get a filesystem for a specific scheme."""
474485
if scheme not in self._scheme_to_fs:
475486
raise ValueError(f"No registered filesystem for scheme: {scheme}")
487+
488+
if scheme in _ADLS_SCHEMES:
489+
return _adls(self.properties, hostname)
490+
476491
return self._scheme_to_fs[scheme](self.properties)
477492

478493
def __getstate__(self) -> dict[str, Any]:

tests/io/test_fsspec.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,64 @@ def test_adls_account_name_sas_token_extraction() -> None:
606606
)
607607

608608

609+
def test_adls_account_name_extracted_from_uri_hostname() -> None:
610+
"""Test that account_name is extracted from the ABFSS URI hostname when not in properties."""
611+
session_properties: Properties = {
612+
"adls.tenant-id": "test-tenant-id",
613+
"adls.client-id": "test-client-id",
614+
"adls.client-secret": "test-client-secret",
615+
}
616+
617+
with mock.patch("adlfs.AzureBlobFileSystem") as mock_adlfs:
618+
adls_fileio = FsspecFileIO(properties=session_properties)
619+
620+
adls_fileio.new_input(
621+
location="abfss://dd-michelada-us3-prod-dog@usagestorageprod.dfs.core.windows.net"
622+
"/unified_datasets/aggregated/data/file.parquet"
623+
)
624+
625+
mock_adlfs.assert_called_with(
626+
connection_string=None,
627+
credential=None,
628+
account_name="usagestorageprod",
629+
account_key=None,
630+
sas_token=None,
631+
tenant_id="test-tenant-id",
632+
client_id="test-client-id",
633+
client_secret="test-client-secret",
634+
account_host=None,
635+
anon=None,
636+
)
637+
638+
639+
def test_adls_account_name_not_overridden_when_in_properties() -> None:
640+
"""Test that explicit adls.account-name in properties is not overridden by URI hostname."""
641+
session_properties: Properties = {
642+
"adls.account-name": "explicitly-configured-account",
643+
"adls.tenant-id": "test-tenant-id",
644+
"adls.client-id": "test-client-id",
645+
"adls.client-secret": "test-client-secret",
646+
}
647+
648+
with mock.patch("adlfs.AzureBlobFileSystem") as mock_adlfs:
649+
adls_fileio = FsspecFileIO(properties=session_properties)
650+
651+
adls_fileio.new_input(location="abfss://container@usagestorageprod.dfs.core.windows.net/path/file.parquet")
652+
653+
mock_adlfs.assert_called_with(
654+
connection_string=None,
655+
credential=None,
656+
account_name="explicitly-configured-account",
657+
account_key=None,
658+
sas_token=None,
659+
tenant_id="test-tenant-id",
660+
client_id="test-client-id",
661+
client_secret="test-client-secret",
662+
account_host=None,
663+
anon=None,
664+
)
665+
666+
609667
@pytest.mark.gcs
610668
def test_fsspec_new_input_file_gcs(fsspec_fileio_gcs: FsspecFileIO) -> None:
611669
"""Test creating a new input file from a fsspec file-io"""

0 commit comments

Comments
 (0)