Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 19 additions & 57 deletions pyathena/pandas/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,38 +320,33 @@ def _get_csv_engine(
"""Determine the appropriate CSV engine based on configuration and compatibility.

Args:
file_size_bytes: Size of the CSV file in bytes.
file_size_bytes: Size of the CSV file in bytes. Only used for PyArrow
compatibility checks (minimum file size threshold).
chunksize: Chunksize parameter (overrides self._chunksize if provided).

Returns:
CSV engine name ('pyarrow', 'c', or 'python').
"""
effective_chunksize = chunksize if chunksize is not None else self._chunksize
if self._engine == "python":
return "python"

# Use PyArrow only when explicitly requested and all compatibility
# checks pass; otherwise fall through to the C engine default.
if self._engine == "pyarrow":
return self._get_pyarrow_engine(file_size_bytes, effective_chunksize)

if self._engine in ("c", "python"):
return self._engine

# Auto-selection for "auto" or unknown engine values
return self._get_optimal_csv_engine(file_size_bytes)

def _get_pyarrow_engine(self, file_size_bytes: int | None, chunksize: int | None) -> str:
"""Get PyArrow engine if compatible, otherwise return optimal engine."""
# Check parameter compatibility
if chunksize is not None or self._quoting != 1 or self.converters:
return self._get_optimal_csv_engine(file_size_bytes)

# Check file size compatibility
if file_size_bytes is not None and file_size_bytes < self.PYARROW_MIN_FILE_SIZE_BYTES:
return self._get_optimal_csv_engine(file_size_bytes)
effective_chunksize = chunksize if chunksize is not None else self._chunksize
is_compatible = (
effective_chunksize is None
and self._quoting == 1
and not self.converters
and (file_size_bytes is None or file_size_bytes >= self.PYARROW_MIN_FILE_SIZE_BYTES)
)
if is_compatible:
try:
return self._get_available_engine(["pyarrow"])
except ImportError:
pass

# Check availability
try:
return self._get_available_engine(["pyarrow"])
except ImportError:
return self._get_optimal_csv_engine(file_size_bytes)
return "c"

def _get_available_engine(self, engine_candidates: list[str]) -> str:
"""Get the first available engine from a list of candidates.
Expand Down Expand Up @@ -382,19 +377,6 @@ def _get_available_engine(self, engine_candidates: list[str]) -> str:
f"{error_msgs}"
)

def _get_optimal_csv_engine(self, file_size_bytes: int | None = None) -> str:
"""Get the optimal CSV engine based on file size.

Args:
file_size_bytes: Size of the CSV file in bytes.

Returns:
'python' for large files (>50MB) to avoid C parser limits, otherwise 'c'.
"""
if file_size_bytes and file_size_bytes > self.LARGE_FILE_THRESHOLD_BYTES:
return "python"
return "c"

def _auto_determine_chunksize(self, file_size_bytes: int) -> int | None:
"""Determine appropriate chunksize for large files based on file size.

Expand Down Expand Up @@ -594,26 +576,6 @@ def _read_csv(self) -> TextFileReader | DataFrame:

except Exception as e:
_logger.exception("Failed to read %s.", self.output_location)
error_msg = str(e).lower()
if any(
phrase in error_msg
for phrase in ["signed integer", "maximum", "overflow", "int32", "c parser"]
):
# Enhanced error message with specific recommendations
file_mb = (length or 0) // (1024 * 1024)
detailed_msg = (
f"Large dataset processing error ({file_mb}MB file): {e}. "
"This is likely due to pandas C parser limitations. "
"Recommended solutions:\n"
"1. Set chunksize: cursor = connection.cursor(PandasCursor, chunksize=50000)\n"
"2. Enable auto-optimization: "
"cursor = connection.cursor(PandasCursor, auto_optimize_chunksize=True)\n"
"3. Use PyArrow engine: "
"cursor = connection.cursor(PandasCursor, engine='pyarrow')\n"
"4. Use Python engine: "
"cursor = connection.cursor(PandasCursor, engine='python')"
)
raise OperationalError(detailed_msg) from e
raise OperationalError(*e.args) from e

def _read_parquet(self, engine) -> DataFrame:
Expand Down
31 changes: 0 additions & 31 deletions tests/pyathena/pandas/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,23 +658,6 @@ def test_no_ops(self):
cursor.close()
conn.close()

def test_get_optimal_csv_engine(self):
"""Test _get_optimal_csv_engine method behavior."""

# Mock the parent class initialization
with patch("pyathena.pandas.result_set.AthenaResultSet.__init__"):
result_set = AthenaPandasResultSet.__new__(AthenaPandasResultSet)
result_set._engine = "auto"
result_set._chunksize = None # No chunking by default

# Small file should prefer C engine (compatibility-first approach)
engine = result_set._get_optimal_csv_engine(1024) # 1KB
assert engine == "c"

# Large file should prefer Python engine (avoid C parser int32 limits)
engine = result_set._get_optimal_csv_engine(100 * 1024 * 1024) # 100MB
assert engine == "python"

def test_auto_determine_chunksize(self):
"""Test _auto_determine_chunksize method behavior."""

Expand Down Expand Up @@ -747,70 +730,56 @@ def test_get_csv_engine_explicit_specification(self):

# Test PyArrow with incompatible chunksize (via parameter)
with (
patch.object(result_set, "_get_available_engine", return_value="pyarrow"),
patch.object(
type(result_set), "converters", new_callable=PropertyMock, return_value={}
),
patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt,
):
engine = result_set._get_csv_engine(chunksize=1000)
assert engine == "c"
mock_opt.assert_called_once()

# Test PyArrow with incompatible chunksize (via instance variable)
result_set._chunksize = 1000
with (
patch.object(result_set, "_get_available_engine", return_value="pyarrow"),
patch.object(
type(result_set), "converters", new_callable=PropertyMock, return_value={}
),
patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt,
):
engine = result_set._get_csv_engine()
assert engine == "c"
mock_opt.assert_called_once()

# Test PyArrow with incompatible quoting
result_set._chunksize = None
result_set._quoting = 0 # Non-default quoting
with (
patch.object(result_set, "_get_available_engine", return_value="pyarrow"),
patch.object(
type(result_set), "converters", new_callable=PropertyMock, return_value={}
),
patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt,
):
engine = result_set._get_csv_engine()
assert engine == "c"
mock_opt.assert_called_once()

# Test PyArrow with incompatible converters
result_set._quoting = 1 # Reset to default
with (
patch.object(result_set, "_get_available_engine", return_value="pyarrow"),
patch.object(
type(result_set),
"converters",
new_callable=PropertyMock,
return_value={"col1": str},
),
patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt,
):
engine = result_set._get_csv_engine()
assert engine == "c"
mock_opt.assert_called_once()

# Test PyArrow specification (when unavailable)
with (
patch.object(result_set, "_get_available_engine", side_effect=ImportError),
patch.object(
type(result_set), "converters", new_callable=PropertyMock, return_value={}
),
patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt,
):
engine = result_set._get_csv_engine()
assert engine == "c"
mock_opt.assert_called_once()

@pytest.mark.parametrize(
("pandas_cursor", "parquet_engine", "chunksize"),
Expand Down
Loading