From e94ff7794bdb3b9f9a20d9f98324e48d3b54688a Mon Sep 17 00:00:00 2001 From: ajpotts Date: Thu, 11 Dec 2025 12:19:34 -0500 Subject: [PATCH] Closes #5141: Index pandas extension accessor add from_return_msg and other functions --- arkouda/pandas/__init__.py | 6 + arkouda/pandas/extension/_index_accessor.py | 167 ++++++++++++++++- arkouda/pandas/index.py | 15 +- tests/pandas/extension/index_accessor.py | 168 +++++++++++++++++- .../pandas/extension/io/index_accessor_io.py | 140 +++++++++++++++ 5 files changed, 491 insertions(+), 5 deletions(-) create mode 100644 tests/pandas/extension/io/index_accessor_io.py diff --git a/arkouda/pandas/__init__.py b/arkouda/pandas/__init__.py index 45886cd0756..7cb2263fa2a 100644 --- a/arkouda/pandas/__init__.py +++ b/arkouda/pandas/__init__.py @@ -12,6 +12,8 @@ ArkoudaStringDtype, ArkoudaUint8Dtype, ArkoudaUint64Dtype, + ArkoudaIndexAccessor, + ArkoudaDataFrameAccessor, ) from arkouda.pandas.join import compute_join_size, gen_ranges, join_on_eq_with_dt from arkouda.pandas.row import Row @@ -19,6 +21,7 @@ from arkouda.pandas.typing import ArkoudaArrayLike from arkouda.pandas.conversion import from_series + from arkouda.pandas.accessor import ( CachedAccessor, DatetimeAccessor, @@ -27,3 +30,6 @@ date_operators, string_operators, ) + +from arkouda.pandas import extension + diff --git a/arkouda/pandas/extension/_index_accessor.py b/arkouda/pandas/extension/_index_accessor.py index 1910fe8ad40..f1e37544e94 100644 --- a/arkouda/pandas/extension/_index_accessor.py +++ b/arkouda/pandas/extension/_index_accessor.py @@ -55,7 +55,7 @@ from __future__ import annotations -from typing import Union +from typing import Literal, Union import pandas as pd @@ -63,6 +63,7 @@ from arkouda.index import Index as ak_Index from arkouda.index import MultiIndex as ak_MultiIndex +from arkouda.numpy.pdarrayclass import pdarray from arkouda.pandas.extension import ArkoudaExtensionArray @@ -487,3 +488,167 @@ def is_arkouda(self) -> bool: return isinstance(arr, ArkoudaExtensionArray) else: return False + + # ------------------------------------------------------------------ + # Legacy delegation: thin wrappers over ak.Index / ak.MultiIndex + # ------------------------------------------------------------------ + + @staticmethod + def from_return_msg(rep_msg: str) -> Union[pd.Index, pd.MultiIndex]: + """ + Construct a pandas Index or MultiIndex from a legacy Arkouda + return message produced by index operations. + + This is a thin wrapper around ``ak.Index.from_return_msg`` that + immediately wraps the resulting Arkouda index back into a + pandas object backed by Arkouda ExtensionArrays. + """ + akidx = ak_Index.from_return_msg(rep_msg) + return ArkoudaIndexAccessor.from_ak_legacy(akidx) + + # --- Structural ops ------------------------------------------------------- + + def concat( + self, + other: Union[pd.Index, pd.MultiIndex], + ) -> Union[pd.Index, pd.MultiIndex]: + """ + Concatenate this index with another Arkouda-backed index. + + Both ``self._obj`` and ``other`` must be convertible to legacy + Arkouda :class:`ak_Index` / :class:`ak_MultiIndex`. The concatenation + is performed in Arkouda and the result is wrapped back into an + Arkouda-backed pandas Index or MultiIndex. + + Parameters + ---------- + other : Union[pd.Index, pd.MultiIndex] + The other index to concatenate with ``self._obj``. It must be a + :class:`pandas.Index` or :class:`pandas.MultiIndex`. + + Returns + ------- + Union[pd.Index, pd.MultiIndex] + A pandas Index or MultiIndex backed by Arkouda, containing the + concatenated values from ``self._obj`` and ``other``. + + Raises + ------ + TypeError + If ``other`` is not a :class:`pandas.Index` or + :class:`pandas.MultiIndex`. + """ + if not isinstance(other, (pd.Index, pd.MultiIndex)): + raise TypeError("`other` must be a pandas.Index or pandas.MultiIndex") + + # Lift both sides to legacy Arkouda Index / MultiIndex + left_ak = self.to_ak_legacy() + right_ak = ArkoudaIndexAccessor(other).to_ak_legacy() + + # Delegate to legacy Arkouda concat + out_ak = left_ak.concat(right_ak) + + # Wrap back into Arkouda-backed pandas Index/MultiIndex + return self.from_ak_legacy(out_ak) + + def lookup(self, key: object) -> pdarray: + """ + Perform a server-side lookup on the underlying Arkouda index. + + This is a thin convenience wrapper around the legacy + :meth:`arkouda.index.Index.lookup` / + :meth:`arkouda.index.MultiIndex.lookup` methods. It converts the + pandas index to a legacy Arkouda index, performs the lookup on the + server, and returns the resulting boolean mask. + + Parameters + ---------- + key : object + Lookup key or keys, interpreted in the same way as the legacy + Arkouda ``Index`` / ``MultiIndex`` ``lookup`` method. For a + single-level index this may be a scalar or an Arkouda ``pdarray``; + for MultiIndex it may be a tuple or sequence of values/arrays. + + Returns + ------- + pdarray + A boolean Arkouda array indicating which positions in the index + match the given ``key``. + """ + akidx = self.to_ak_legacy() + return akidx.lookup(key) + + # --- Serialization -------------------------------------------------------- + + def to_hdf( + self, + prefix_path: str, + dataset: str = "index", + mode: Literal["truncate", "append"] = "truncate", + file_type: Literal["single", "distribute"] = "distribute", + ) -> str: + """ + Save this index to HDF5 via the legacy ``to_hdf`` implementation + and return the server response message. + """ + akidx = self.to_ak_legacy() + return akidx.to_hdf(prefix_path, dataset=dataset, mode=mode, file_type=file_type) + + def update_hdf( + self, + prefix_path: str, + dataset: str = "index", + repack: bool = True, + ): + """ + Overwrite or append this index into an existing HDF5 dataset via + the legacy ``update_hdf`` implementation. + """ + akidx = self.to_ak_legacy() + return akidx.update_hdf(prefix_path, dataset=dataset, repack=repack) + + def to_parquet( + self, + prefix_path: str, + dataset: str = "index", + mode: Literal["truncate", "append"] = "truncate", + ) -> str: + """ + Save this index to Parquet via the legacy ``to_parquet`` implementation + and return the server response message. + """ + akidx = self.to_ak_legacy() + return akidx.to_parquet(prefix_path, dataset=dataset, mode=mode) + + def to_csv( + self, + prefix_path: str, + dataset: str = "index", + ) -> str: + """ + Save this index to CSV via the legacy ``to_csv`` implementation + and return the server response message. + """ + akidx = self.to_ak_legacy() + return akidx.to_csv(prefix_path, dataset=dataset) + + # --- Python/native representations ---------------------------------------- + + def to_dict(self, labels=None): + """ + Convert this index to a dictionary representation if supported. + + For MultiIndex, this delegates to ``MultiIndex.to_dict`` and returns + a mapping of label -> Index. For single-level Indexes, this will + raise a TypeError, since the legacy API only defines ``to_dict`` on + MultiIndex. + """ + from arkouda.pandas.index import MultiIndex + + akidx = self.to_ak_legacy() + if not hasattr(akidx, "to_dict"): + raise TypeError("to_dict is only defined for MultiIndex-backed indices") + if isinstance(akidx, MultiIndex): + return akidx.to_dict(labels=labels) + else: + return akidx.to_dict(label=labels) diff --git a/arkouda/pandas/index.py b/arkouda/pandas/index.py index bed4d0119cb..b4ad0e0d3a8 100644 --- a/arkouda/pandas/index.py +++ b/arkouda/pandas/index.py @@ -97,7 +97,7 @@ ] if TYPE_CHECKING: - from arkouda.numpy.pdarraycreation import array, ones + from arkouda.numpy.pdarraycreation import ones from arkouda.numpy.strings import Strings from arkouda.pandas.categorical import Categorical from arkouda.pandas.series import Series @@ -1197,7 +1197,11 @@ def concat(self, other): self._check_types(other) idx = generic_concat([self.values, other.values], ordered=True) - return Index(idx) + + other_name = getattr(other, "name", None) + name = self.name if self.name == other_name else None + + return Index(idx, name=name) def lookup(self, key): """ @@ -1222,6 +1226,7 @@ def lookup(self, key): """ from arkouda.numpy.pdarrayclass import pdarray + from arkouda.numpy.pdarraycreation import array if not isinstance(key, pdarray): # try to handle single value @@ -2148,7 +2153,11 @@ def concat(self, other): """ self._check_types(other) idx = [generic_concat([ix1, ix2], ordered=True) for ix1, ix2 in zip(self.index, other.index)] - return MultiIndex(idx) + + other_names = getattr(other, "names", None) + names = self.names if self.names == other_names else None + + return MultiIndex(idx, names=names) def lookup(self, key: list[Any] | tuple[Any, ...]) -> groupable: """ diff --git a/tests/pandas/extension/index_accessor.py b/tests/pandas/extension/index_accessor.py index 757af7bea50..d91ae8ea2d1 100644 --- a/tests/pandas/extension/index_accessor.py +++ b/tests/pandas/extension/index_accessor.py @@ -2,11 +2,16 @@ import pytest import arkouda as ak +import arkouda.pandas.extension._index_accessor as idx_mod from arkouda.index import Index as ak_Index from arkouda.index import MultiIndex as ak_MultiIndex from arkouda.pandas.extension import ArkoudaExtensionArray -from arkouda.pandas.extension._index_accessor import _ak_index_to_pandas_no_copy, _pandas_index_to_ak +from arkouda.pandas.extension._index_accessor import ( + ArkoudaIndexAccessor, + _ak_index_to_pandas_no_copy, + _pandas_index_to_ak, +) def _assert_index_equal_values(p_idx: pd.Index, values): @@ -210,3 +215,164 @@ def test_from_ak_legacy_roundtrip_index(self): assert isinstance(idx_back.array, ArkoudaExtensionArray) assert idx_back.name == "nums" _assert_index_equal_values(idx_back, [7, 8, 9]) + + def test_concat_simple_index_via_accessor(self): + # plain pandas indices + left = pd.Index([1, 2, 3], name="id") + right = pd.Index([4, 5], name="id") + + # Run through the .ak accessor (no need to call to_ak() explicitly; + # concat should lift to legacy and back internally). + out = left.ak.concat(right) + + # Result is a pandas Index, Arkouda-backed + assert isinstance(out, pd.Index) + assert out.ak.is_arkouda + assert out.name == "id" + + # Values are concatenated in order + _assert_index_equal_values(out, [1, 2, 3, 4, 5]) + + # Collect should give us a plain NumPy-backed Index with same values + collected = out.ak.collect() + assert isinstance(collected, pd.Index) + assert not collected.ak.is_arkouda + assert collected.equals(pd.Index([1, 2, 3, 4, 5], name="id")) + + def test_concat_multiindex_via_accessor(self): + arrays_left = [[1, 1, 2], ["red", "blue", "red"]] + arrays_right = [[3, 3], ["green", "red"]] + + left = pd.MultiIndex.from_arrays(arrays_left, names=["num", "color"]) + right = pd.MultiIndex.from_arrays(arrays_right, names=["num", "color"]) + + out = left.ak.concat(right) + + # Result is a pandas.MultiIndex, Arkouda-backed + assert isinstance(out, pd.MultiIndex) + assert out.ak.is_arkouda + assert out.names == ["num", "color"] + + # Order-preserving concat of the tuples + expected_tuples = list(left.tolist()) + list(right.tolist()) + assert list(out.tolist()) == expected_tuples + + # Collect should give a plain MultiIndex with same tuples / names + collected = out.ak.collect() + assert isinstance(collected, pd.MultiIndex) + assert not collected.ak.is_arkouda + assert list(collected.tolist()) == expected_tuples + assert collected.names == ["num", "color"] + + @pytest.mark.requires_chapel_module("In1dMsg") + def test_lookup_simple_index_scalar_and_array_via_accessor(self): + idx = pd.Index([10, 20, 30], name="nums") + ak_idx = idx.ak.to_ak() + + # Scalar lookup should be handled by wrapping it into a one-element pdarray + mask_scalar = ak_idx.ak.lookup(20) + import arkouda as ak # local import to avoid confusion with other modules + + assert isinstance(mask_scalar, ak.pdarray) + assert mask_scalar.tolist() == [False, True, False] + + # Array lookup should be passed through directly + keys = ak.array([5, 10, 30]) + mask_array = ak_idx.ak.lookup(keys) + assert isinstance(mask_array, ak.pdarray) + # in1d(self.values, [5, 10, 30]) → [10, 30] are present + assert mask_array.tolist() == [True, False, True] + + # We monkeypatch ak_Index.from_return_msg and ArkoudaIndexAccessor.from_ak_legacy + # because this test only verifies delegation logic. We don't want to invoke the + # real Arkouda client or start a server; we just need to confirm that the correct + # methods are called with the correct arguments and that their return value is + # passed through. + def test_from_return_msg_delegates_to_ak_index_and_wraps(self, monkeypatch): + # Arrange + rep_msg = "INDEX some return message" + dummy_akidx = object() + calls = {} + + def fake_from_return_msg(msg): + calls["msg"] = msg + return dummy_akidx + + def fake_from_ak_legacy(akidx): + calls["akidx"] = akidx + return "wrapped-index" + + # ak_Index.from_return_msg is a classmethod; patch as staticmethod so it + # doesn't receive the class as the first argument. + monkeypatch.setattr( + idx_mod.ak_Index, + "from_return_msg", + staticmethod(fake_from_return_msg), + ) + + # ArkoudaIndexAccessor.from_ak_legacy is a staticmethod as well. + monkeypatch.setattr( + ArkoudaIndexAccessor, + "from_ak_legacy", + staticmethod(fake_from_ak_legacy), + ) + + # Act + result = ArkoudaIndexAccessor.from_return_msg(rep_msg) + + # Assert: result is whatever from_ak_legacy returns + assert result == "wrapped-index" + # Assert: the legacy parser was called with the original message + assert calls["msg"] == rep_msg + # Assert: the wrapper was called with the object produced by ak_Index.from_return_msg + assert calls["akidx"] is dummy_akidx + + def test_to_dict_single_level_index(self): + # Create a single-level Arkouda Index + ak_idx = ak.Index(ak.arange(3)) + idx = ArkoudaIndexAccessor.from_ak_legacy(ak_idx) + + # 1) Default behavior: labels=None → key "idx" + out_default = idx.ak.to_dict() + assert list(out_default.keys()) == ["idx"] + + default_val = out_default["idx"] + # ak_Index.to_dict stores the underlying data; this should be an Arkouda array + assert isinstance(default_val, ak.pdarray) + assert default_val.tolist() == [0, 1, 2] + + # 2) Passing a list of labels → only the first element is used as the key + out_list = idx.ak.to_dict(labels=["foo", "bar"]) + assert list(out_list.keys()) == ["foo"] + assert "bar" not in out_list + + list_val = out_list["foo"] + assert isinstance(list_val, ak.pdarray) + assert list_val.tolist() == [0, 1, 2] + + def test_to_dict_multiindex(self): + # Create a real MultiIndex via Arkouda + level0 = ak.array([1, 1, 2]) + level1 = ak.array([10, 20, 30]) + ak_mi = ak.MultiIndex([level0, level1]) + + # Convert to pandas-backed Arkouda Index + mi = ArkoudaIndexAccessor.from_ak_legacy(ak_mi) + + labels = ["L0", "L1"] + out = mi.ak.to_dict(labels=labels) + + # Underlying MultiIndex.to_dict uses *all* labels when a list is passed + assert list(out.keys()) == labels + + # Each entry is an arkouda pdarray corresponding to a level + idx0 = out["L0"] + idx1 = out["L1"] + + # Type checks: these are Arkouda arrays, not pandas Index objects + assert isinstance(idx0, ak.pdarray) + assert isinstance(idx1, ak.pdarray) + + # Value checks: they match the original levels + assert idx0.tolist() == [1, 1, 2] + assert idx1.tolist() == [10, 20, 30] diff --git a/tests/pandas/extension/io/index_accessor_io.py b/tests/pandas/extension/io/index_accessor_io.py new file mode 100644 index 00000000000..e8a914d2424 --- /dev/null +++ b/tests/pandas/extension/io/index_accessor_io.py @@ -0,0 +1,140 @@ +""" +Minimal IO tests for **pd.Index backed by ArkoudaExtensionArray**. + +Goal +---- +Only verify that a pandas `Index` whose underlying data is an Arkouda-backed +ExtensionArray can round-trip through: + +- `idx.ak.to_csv(...)` +- `idx.ak.to_parquet(...)` +- `idx.ak.to_hdf(...)` + +These tests intentionally avoid `ak.Index` / `ak.MultiIndex` and focus on the +pandas surface area (Index + `.ak` accessor). +""" + +from __future__ import annotations + +import os +import tempfile + +from typing import Any + +import numpy as np +import pandas as pd +import pytest + +import arkouda as ak + +from arkouda.pandas import io_util +from arkouda.pandas.extension import ArkoudaArray + + +def _make_pd_index_with_arkouda_ea() -> pd.Index: + """ + Construct a pd.Index backed by ArkoudaExtensionArray. + + We intentionally go through `pd.array(ak_array)` because that is the + canonical way pandas creates an ExtensionArray, and Arkouda's integration + should return an Arkouda-backed EA here. + """ + ak_arr = ArkoudaArray(ak.arange(10)) + ea = pd.array(ak_arr) # should be ArkoudaExtensionArray (or equivalent) + idx = pd.Index(ea, name="my_index") + return idx + + +def _assert_is_arkouda_backed_index(idx: pd.Index) -> None: + # Must expose `.ak` accessor for IO calls. + assert hasattr(idx, "ak"), "Expected pandas Index to have an `.ak` accessor" + + # Must be ExtensionArray-backed (not numpy). + arr = idx.array + assert not isinstance(arr, np.ndarray), ( + "Expected Index to be backed by an ExtensionArray, not ndarray" + ) + + +def _read_index_from_csv(path: str): + data = ak.read_csv(path) + assert isinstance(data, dict) + assert len(data) >= 1 + # Prefer the conventional "Index" column name if present. + return data.get("Index", next(iter(data.values()))) + + +def _read_index_from_parquet(path_glob: str): + data = ak.read_parquet(path_glob) + assert isinstance(data, dict) + assert len(data) >= 1 + return data.get("Index", next(iter(data.values()))) + + +def _read_index_from_hdf(path_glob: str): + data = ak.read_hdf(path_glob) + assert isinstance(data, dict) + assert len(data) >= 1 + return data.get("Index", next(iter(data.values()))) + + +def _assert_roundtrip_equal(expected: pd.Index, got: Any) -> None: + # `ak.read_*` returns arkouda objects (pdarray/Strings/etc). Normalize to ndarray. + if hasattr(got, "to_ndarray"): + got_np = got.to_ndarray() + else: + got_np = np.asarray(got) + + np.testing.assert_array_equal(expected.to_numpy(), got_np) + + +@pytest.fixture +def index_io_tmp(request): + base = f"{pytest.temp_directory}/.pd_index_ea_io_test" + io_util.get_directory(base) + + def finalizer(): + io_util.delete_directory(base) + + request.addfinalizer(finalizer) + return base + + +@pytest.mark.requires_chapel_module("CSVMsg") +def test_pd_index_ea_to_csv_roundtrip(index_io_tmp): + idx = _make_pd_index_with_arkouda_ea() + assert idx.ak.is_arkouda() + _assert_is_arkouda_backed_index(idx) + + with tempfile.TemporaryDirectory(dir=index_io_tmp) as tmp: + out = os.path.join(tmp, "idx.csv") + idx.ak.to_csv(out) + + rd = _read_index_from_csv(out) + _assert_roundtrip_equal(idx, rd) + + +@pytest.mark.requires_chapel_module("ParquetMsg") +def test_pd_index_ea_to_parquet_roundtrip(index_io_tmp): + idx = _make_pd_index_with_arkouda_ea() + _assert_is_arkouda_backed_index(idx) + + with tempfile.TemporaryDirectory(dir=index_io_tmp) as tmp: + out = os.path.join(tmp, "idx_parquet") + idx.ak.to_parquet(out) + + rd = _read_index_from_parquet(f"{out}*") + _assert_roundtrip_equal(idx, rd) + + +@pytest.mark.requires_chapel_module("HDFMsg") +def test_pd_index_ea_to_hdf_roundtrip(index_io_tmp): + idx = _make_pd_index_with_arkouda_ea() + _assert_is_arkouda_backed_index(idx) + + with tempfile.TemporaryDirectory(dir=index_io_tmp) as tmp: + out = os.path.join(tmp, "idx_hdf") + idx.ak.to_hdf(out) + + rd = _read_index_from_hdf(f"{out}*") + _assert_roundtrip_equal(idx, rd)