diff --git a/pyathena/pandas/result_set.py b/pyathena/pandas/result_set.py index 592fdb2d..3e49383b 100644 --- a/pyathena/pandas/result_set.py +++ b/pyathena/pandas/result_set.py @@ -320,38 +320,33 @@ def _get_csv_engine( """Determine the appropriate CSV engine based on configuration and compatibility. Args: - file_size_bytes: Size of the CSV file in bytes. + file_size_bytes: Size of the CSV file in bytes. Only used for PyArrow + compatibility checks (minimum file size threshold). chunksize: Chunksize parameter (overrides self._chunksize if provided). Returns: CSV engine name ('pyarrow', 'c', or 'python'). """ - effective_chunksize = chunksize if chunksize is not None else self._chunksize + if self._engine == "python": + return "python" + # Use PyArrow only when explicitly requested and all compatibility + # checks pass; otherwise fall through to the C engine default. if self._engine == "pyarrow": - return self._get_pyarrow_engine(file_size_bytes, effective_chunksize) - - if self._engine in ("c", "python"): - return self._engine - - # Auto-selection for "auto" or unknown engine values - return self._get_optimal_csv_engine(file_size_bytes) - - def _get_pyarrow_engine(self, file_size_bytes: int | None, chunksize: int | None) -> str: - """Get PyArrow engine if compatible, otherwise return optimal engine.""" - # Check parameter compatibility - if chunksize is not None or self._quoting != 1 or self.converters: - return self._get_optimal_csv_engine(file_size_bytes) - - # Check file size compatibility - if file_size_bytes is not None and file_size_bytes < self.PYARROW_MIN_FILE_SIZE_BYTES: - return self._get_optimal_csv_engine(file_size_bytes) + effective_chunksize = chunksize if chunksize is not None else self._chunksize + is_compatible = ( + effective_chunksize is None + and self._quoting == 1 + and not self.converters + and (file_size_bytes is None or file_size_bytes >= self.PYARROW_MIN_FILE_SIZE_BYTES) + ) + if is_compatible: + try: + return self._get_available_engine(["pyarrow"]) + except ImportError: + pass - # Check availability - try: - return self._get_available_engine(["pyarrow"]) - except ImportError: - return self._get_optimal_csv_engine(file_size_bytes) + return "c" def _get_available_engine(self, engine_candidates: list[str]) -> str: """Get the first available engine from a list of candidates. @@ -382,19 +377,6 @@ def _get_available_engine(self, engine_candidates: list[str]) -> str: f"{error_msgs}" ) - def _get_optimal_csv_engine(self, file_size_bytes: int | None = None) -> str: - """Get the optimal CSV engine based on file size. - - Args: - file_size_bytes: Size of the CSV file in bytes. - - Returns: - 'python' for large files (>50MB) to avoid C parser limits, otherwise 'c'. - """ - if file_size_bytes and file_size_bytes > self.LARGE_FILE_THRESHOLD_BYTES: - return "python" - return "c" - def _auto_determine_chunksize(self, file_size_bytes: int) -> int | None: """Determine appropriate chunksize for large files based on file size. @@ -594,26 +576,6 @@ def _read_csv(self) -> TextFileReader | DataFrame: except Exception as e: _logger.exception("Failed to read %s.", self.output_location) - error_msg = str(e).lower() - if any( - phrase in error_msg - for phrase in ["signed integer", "maximum", "overflow", "int32", "c parser"] - ): - # Enhanced error message with specific recommendations - file_mb = (length or 0) // (1024 * 1024) - detailed_msg = ( - f"Large dataset processing error ({file_mb}MB file): {e}. " - "This is likely due to pandas C parser limitations. " - "Recommended solutions:\n" - "1. Set chunksize: cursor = connection.cursor(PandasCursor, chunksize=50000)\n" - "2. Enable auto-optimization: " - "cursor = connection.cursor(PandasCursor, auto_optimize_chunksize=True)\n" - "3. Use PyArrow engine: " - "cursor = connection.cursor(PandasCursor, engine='pyarrow')\n" - "4. Use Python engine: " - "cursor = connection.cursor(PandasCursor, engine='python')" - ) - raise OperationalError(detailed_msg) from e raise OperationalError(*e.args) from e def _read_parquet(self, engine) -> DataFrame: diff --git a/tests/pyathena/pandas/test_cursor.py b/tests/pyathena/pandas/test_cursor.py index 944dc025..d5e6d51c 100644 --- a/tests/pyathena/pandas/test_cursor.py +++ b/tests/pyathena/pandas/test_cursor.py @@ -658,23 +658,6 @@ def test_no_ops(self): cursor.close() conn.close() - def test_get_optimal_csv_engine(self): - """Test _get_optimal_csv_engine method behavior.""" - - # Mock the parent class initialization - with patch("pyathena.pandas.result_set.AthenaResultSet.__init__"): - result_set = AthenaPandasResultSet.__new__(AthenaPandasResultSet) - result_set._engine = "auto" - result_set._chunksize = None # No chunking by default - - # Small file should prefer C engine (compatibility-first approach) - engine = result_set._get_optimal_csv_engine(1024) # 1KB - assert engine == "c" - - # Large file should prefer Python engine (avoid C parser int32 limits) - engine = result_set._get_optimal_csv_engine(100 * 1024 * 1024) # 100MB - assert engine == "python" - def test_auto_determine_chunksize(self): """Test _auto_determine_chunksize method behavior.""" @@ -747,58 +730,46 @@ def test_get_csv_engine_explicit_specification(self): # Test PyArrow with incompatible chunksize (via parameter) with ( - patch.object(result_set, "_get_available_engine", return_value="pyarrow"), patch.object( type(result_set), "converters", new_callable=PropertyMock, return_value={} ), - patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt, ): engine = result_set._get_csv_engine(chunksize=1000) assert engine == "c" - mock_opt.assert_called_once() # Test PyArrow with incompatible chunksize (via instance variable) result_set._chunksize = 1000 with ( - patch.object(result_set, "_get_available_engine", return_value="pyarrow"), patch.object( type(result_set), "converters", new_callable=PropertyMock, return_value={} ), - patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt, ): engine = result_set._get_csv_engine() assert engine == "c" - mock_opt.assert_called_once() # Test PyArrow with incompatible quoting result_set._chunksize = None result_set._quoting = 0 # Non-default quoting with ( - patch.object(result_set, "_get_available_engine", return_value="pyarrow"), patch.object( type(result_set), "converters", new_callable=PropertyMock, return_value={} ), - patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt, ): engine = result_set._get_csv_engine() assert engine == "c" - mock_opt.assert_called_once() # Test PyArrow with incompatible converters result_set._quoting = 1 # Reset to default with ( - patch.object(result_set, "_get_available_engine", return_value="pyarrow"), patch.object( type(result_set), "converters", new_callable=PropertyMock, return_value={"col1": str}, ), - patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt, ): engine = result_set._get_csv_engine() assert engine == "c" - mock_opt.assert_called_once() # Test PyArrow specification (when unavailable) with ( @@ -806,11 +777,9 @@ def test_get_csv_engine_explicit_specification(self): patch.object( type(result_set), "converters", new_callable=PropertyMock, return_value={} ), - patch.object(result_set, "_get_optimal_csv_engine", return_value="c") as mock_opt, ): engine = result_set._get_csv_engine() assert engine == "c" - mock_opt.assert_called_once() @pytest.mark.parametrize( ("pandas_cursor", "parquet_engine", "chunksize"),