diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp index 323bbfbe231775..3abb836d87de67 100644 --- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp +++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.cpp @@ -117,6 +117,9 @@ std::map VMCTableWriter::_build_base_writer_params() { if (_mc_sink.__isset.retry_count) { params["retry_count"] = std::to_string(_mc_sink.retry_count); } + if (_mc_sink.__isset.max_write_batch_rows) { + params["max_write_batch_rows"] = std::to_string(_mc_sink.max_write_batch_rows); + } return params; } @@ -156,13 +159,10 @@ Status VMCTableWriter::write(RuntimeState* state, Block& block) { it = _partitions_to_writers.find(_static_partition_spec); } output_block.erase(_non_write_columns_indices); - return it->second->write(output_block); + return _write_block_in_chunks(it->second, output_block); } // Case 2: Dynamic partition or non-partitioned table - // For dynamic partitions, MaxCompute Storage API (with DynamicPartitionOptions) expects - // partition column values in the Arrow data and handles routing internally. - // So we send the full block including partition columns to a single writer. std::string partition_key = ""; auto it = _partitions_to_writers.find(partition_key); if (it == _partitions_to_writers.end()) { @@ -171,7 +171,40 @@ Status VMCTableWriter::write(RuntimeState* state, Block& block) { _partitions_to_writers.insert({partition_key, writer}); it = _partitions_to_writers.find(partition_key); } - return it->second->write(output_block); + return _write_block_in_chunks(it->second, output_block); +} + +Status VMCTableWriter::_write_block_in_chunks(const std::shared_ptr& writer, + Block& output_block) { + // Limit per-JNI data to MAX_WRITE_BLOCK_BYTES. When data source is not MC scanner + // (e.g. Doris internal table, Hive, JDBC), the upstream batch_size controls Block + // row count but not byte size. With large rows (585KB/row), a 4096-row Block is + // ~2.4GB. Splitting ensures each JNI call processes bounded data, limiting Arrow + // and SDK native memory per call. + static constexpr size_t MAX_WRITE_BLOCK_BYTES = 256 * 1024 * 1024; // 256MB + + const size_t block_bytes = output_block.allocated_bytes(); + const size_t rows = output_block.rows(); + + if (block_bytes <= MAX_WRITE_BLOCK_BYTES || rows <= 1) { + return writer->write(output_block); + } + + const size_t bytes_per_row = block_bytes / rows; + const size_t max_rows = std::max(size_t(1), MAX_WRITE_BLOCK_BYTES / bytes_per_row); + + for (size_t offset = 0; offset < rows; offset += max_rows) { + const size_t num_rows = std::min(max_rows, rows - offset); + Block sub_block = output_block.clone_empty(); + auto columns = sub_block.mutate_columns(); + for (size_t i = 0; i < columns.size(); i++) { + columns[i]->insert_range_from(*output_block.get_by_position(i).column, offset, + num_rows); + } + sub_block.set_columns(std::move(columns)); + RETURN_IF_ERROR(writer->write(sub_block)); + } + return Status::OK(); } Status VMCTableWriter::close(Status status) { diff --git a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h index 24075e47a062ec..643b28f1cc0d1a 100644 --- a/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h +++ b/be/src/exec/sink/writer/maxcompute/vmc_table_writer.h @@ -54,6 +54,12 @@ class VMCTableWriter final : public AsyncResultWriter { private: std::shared_ptr _create_partition_writer(const std::string& partition_spec); + // Split large blocks into sub-blocks before JNI to limit Arrow and SDK + // native memory. Needed when data source is not MC scanner and blocks + // may exceed 256MB (e.g. batch_size=4096 with 585KB/row = 2.4GB). + Status _write_block_in_chunks(const std::shared_ptr& writer, + Block& output_block); + std::map _build_base_writer_params(); TDataSink _t_sink; diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index ce1b3941dca629..336991f3802726 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -33,6 +33,7 @@ import com.aliyun.odps.table.read.split.impl.IndexedInputSplit; import com.aliyun.odps.table.read.split.impl.RowRangeInputSplit; import com.google.common.base.Strings; +import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.log4j.Logger; @@ -60,6 +61,13 @@ public class MaxComputeJniScanner extends JniScanner { private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class); + // 256MB byte budget per scanner batch — limits the C++ Block size at the source. + // With large rows (e.g. 585KB/row STRING), batch_size=4096 would create ~2.4GB Blocks. + // The pipeline's AsyncResultWriter queues up to 3 Blocks per instance, and with + // parallel_pipeline_task_num instances, total queue memory = instances * 3 * block_size. + // 256MB keeps queue memory manageable: 5 instances * 3 * 256MB = 3.8GB. + private static final long MAX_BATCH_BYTES = 256 * 1024 * 1024L; + private static final String ACCESS_KEY = "access_key"; private static final String SECRET_KEY = "secret_key"; private static final String ENDPOINT = "endpoint"; @@ -90,6 +98,8 @@ private enum SplitType { private String table; private SplitReader currentSplitReader; + private VectorSchemaRoot currentBatch = null; + private int currentBatchRowOffset = 0; private MaxComputeColumnValue columnValue; private Map readColumnsToId; @@ -215,8 +225,17 @@ public void open() throws IOException { @Override public void close() throws IOException { + if (currentSplitReader != null) { + try { + currentSplitReader.close(); + } catch (Exception e) { + LOG.warn("Failed to close MaxCompute split reader for table " + project + "." + table, e); + } + } startOffset = -1; splitSize = -1; + currentBatch = null; + currentBatchRowOffset = 0; currentSplitReader = null; settings = null; scan = null; @@ -234,45 +253,74 @@ protected int getNext() throws IOException { return readVectors(expectedRows); } + private VectorSchemaRoot getNextBatch() throws IOException { + try { + if (!currentSplitReader.hasNext()) { + currentSplitReader.close(); + currentSplitReader = null; + return null; + } + return currentSplitReader.get(); + } catch (Exception e) { + String errorMsg = "MaxComputeJniScanner readVectors get batch fail"; + LOG.warn(errorMsg, e); + throw new IOException(e.getMessage(), e); + } + } + private int readVectors(int expectedRows) throws IOException { int curReadRows = 0; + long accumulatedBytes = 0; while (curReadRows < expectedRows) { - try { - if (!currentSplitReader.hasNext()) { - currentSplitReader.close(); - currentSplitReader = null; + // Stop early if accumulated variable-width bytes approach int32 limit + if (accumulatedBytes >= MAX_BATCH_BYTES) { + break; + } + if (currentBatch == null) { + currentBatch = getNextBatch(); + if (currentBatch == null || currentBatch.getRowCount() == 0) { + currentBatch = null; break; } - } catch (Exception e) { - String errorMsg = "MaxComputeJniScanner readVectors hasNext fail"; - LOG.warn(errorMsg, e); - throw new IOException(e.getMessage(), e); + currentBatchRowOffset = 0; } - try { - VectorSchemaRoot data = currentSplitReader.get(); - if (data.getRowCount() == 0) { + int rowsToAppend = Math.min(expectedRows - curReadRows, + currentBatch.getRowCount() - currentBatchRowOffset); + List fieldVectors = currentBatch.getFieldVectors(); + + // Limit rows to avoid int32 overflow in VectorColumn's String byte buffer + rowsToAppend = limitRowsByVarWidthBytes( + fieldVectors, currentBatchRowOffset, rowsToAppend, + MAX_BATCH_BYTES - accumulatedBytes); + if (rowsToAppend <= 0) { break; } - List fieldVectors = data.getFieldVectors(); - int batchRows = 0; long startTime = System.nanoTime(); for (FieldVector column : fieldVectors) { Integer readColumnId = readColumnsToId.get(column.getName()); - batchRows = column.getValueCount(); if (readColumnId == null) { continue; } columnValue.reset(column); - for (int j = 0; j < batchRows; j++) { + for (int j = currentBatchRowOffset; j < currentBatchRowOffset + rowsToAppend; j++) { columnValue.setColumnIdx(j); appendData(readColumnId, columnValue); } } appendDataTime += System.nanoTime() - startTime; - curReadRows += batchRows; + // Track bytes for the rows just appended + accumulatedBytes += estimateVarWidthBytes( + fieldVectors, currentBatchRowOffset, rowsToAppend); + + currentBatchRowOffset += rowsToAppend; + curReadRows += rowsToAppend; + if (currentBatchRowOffset >= currentBatch.getRowCount()) { + currentBatch = null; + currentBatchRowOffset = 0; + } } catch (Exception e) { String errorMsg = String.format("MaxComputeJniScanner Fail to read arrow data. " + "curReadRows = {}, expectedRows = {}", curReadRows, expectedRows); @@ -280,9 +328,91 @@ private int readVectors(int expectedRows) throws IOException { throw new RuntimeException(errorMsg, e); } } + if (LOG.isDebugEnabled() && curReadRows > 0 && curReadRows < expectedRows) { + LOG.debug("readVectors: returning " + curReadRows + " rows (limited by byte budget)" + + ", totalVarWidthBytes=" + accumulatedBytes + + ", expectedRows=" + expectedRows); + } return curReadRows; } + /** + * Limit the number of rows to append so that no single variable-width column + * exceeds the remaining byte budget. This prevents int32 overflow in + * VectorColumn's appendIndex for String/Binary child byte arrays. + * + * Uses Arrow's offset buffer for O(1)-per-row byte size calculation — + * no data copying involved. + */ + private int limitRowsByVarWidthBytes(List fieldVectors, + int offset, int maxRows, long remainingBudget) { + if (remainingBudget <= 0) { + return 0; + } + int safeRows = maxRows; + for (FieldVector fv : fieldVectors) { + if (fv instanceof BaseVariableWidthVector) { + BaseVariableWidthVector vec = (BaseVariableWidthVector) fv; + // Find how many rows fit within the budget for THIS column + int rows = findMaxRowsWithinBudget(vec, offset, maxRows, remainingBudget); + safeRows = Math.min(safeRows, rows); + } + } + // Always allow at least 1 row to make progress, even if it exceeds budget + return Math.max(1, safeRows); + } + + /** + * Binary search for the maximum number of rows starting at 'offset' + * whose total bytes in the variable-width vector fit within 'budget'. + */ + private int findMaxRowsWithinBudget(BaseVariableWidthVector vec, + int offset, int maxRows, long budget) { + if (maxRows <= 0) { + return 0; + } + // Total bytes for all maxRows + long totalBytes = (long) vec.getOffsetBuffer().getInt((long) (offset + maxRows) * 4) + - (long) vec.getOffsetBuffer().getInt((long) offset * 4); + if (totalBytes <= budget) { + return maxRows; + } + // Binary search for the cutoff point + int lo = 1; + int hi = maxRows - 1; + int startOff = vec.getOffsetBuffer().getInt((long) offset * 4); + while (lo <= hi) { + int mid = lo + (hi - lo) / 2; + long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + mid) * 4) - startOff; + if (bytes <= budget) { + lo = mid + 1; + } else { + hi = mid - 1; + } + } + // 'hi' is the largest count whose bytes <= budget (could be 0) + return hi; + } + + /** + * Estimate total variable-width bytes for the given row range across all columns. + * Returns the max bytes of any single column (since each column has its own + * VectorColumn child buffer and the overflow is per-column). + */ + private long estimateVarWidthBytes(List fieldVectors, + int offset, int rows) { + long maxColumnBytes = 0; + for (FieldVector fv : fieldVectors) { + if (fv instanceof BaseVariableWidthVector) { + BaseVariableWidthVector vec = (BaseVariableWidthVector) fv; + long bytes = (long) vec.getOffsetBuffer().getInt((long) (offset + rows) * 4) + - (long) vec.getOffsetBuffer().getInt((long) offset * 4); + maxColumnBytes = Math.max(maxColumnBytes, bytes); + } + } + return maxColumnBytes; + } + private static Object deserialize(String serializedString) throws IOException, ClassNotFoundException { byte[] serializedBytes = Base64.getDecoder().decode(serializedString); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedBytes); diff --git a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java index 86b849cd6279e7..4e0f2570a82d59 100644 --- a/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java +++ b/fe/be-java-extensions/max-compute-connector/src/main/java/org/apache/doris/maxcompute/MaxComputeJniWriter.java @@ -18,6 +18,7 @@ package org.apache.doris.maxcompute; import org.apache.doris.common.jni.JniWriter; +import org.apache.doris.common.jni.vec.VectorColumn; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.common.maxcompute.MCUtils; @@ -25,6 +26,7 @@ import com.aliyun.odps.OdpsType; import com.aliyun.odps.table.configuration.ArrowOptions; import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit; +import com.aliyun.odps.table.configuration.CompressionCodec; import com.aliyun.odps.table.configuration.RestOptions; import com.aliyun.odps.table.configuration.WriterOptions; import com.aliyun.odps.table.enviroment.Credentials; @@ -92,18 +94,35 @@ public class MaxComputeJniWriter extends JniWriter { private static final String CONNECT_TIMEOUT = "connect_timeout"; private static final String READ_TIMEOUT = "read_timeout"; private static final String RETRY_COUNT = "retry_count"; + private static final String MAX_WRITE_BATCH_ROWS = "max_write_batch_rows"; private final Map params; + + // 128MB batch threshold — controls peak Arrow native memory per batch. + // Arrow uses sun.misc.Unsafe.allocateMemory() which is invisible to JVM metrics. + // Each batch temporarily holds ~batchDataBytes of native memory. + // With 3 concurrent writers, total Arrow native = 3 * 128MB = ~384MB. + // Using 1GB was too large: 3 writers * 1GB = 3GB invisible native memory. + private static final long MAX_ARROW_BATCH_BYTES = 128 * 1024 * 1024L; + + // Segmented commit: commit and recreate batchWriter every N rows to prevent + // MaxCompute SDK native memory accumulation. Without this, the SDK buffers + // all written data internally, causing process RSS to grow linearly with + // total data volume until SIGSEGV. + private static final long ROWS_PER_SEGMENT = 5000; + private final String endpoint; private final String project; private final String tableName; private final String quota; private String writeSessionId; private long blockId; + private long nextBlockId; // For creating new segments with unique blockIds private String partitionSpec; private int connectTimeout; private int readTimeout; private int retryCount; + private int maxWriteBatchRows; // Storage API objects private TableBatchWriteSession writeSession; @@ -111,7 +130,14 @@ public class MaxComputeJniWriter extends JniWriter { private BufferAllocator allocator; private List columnTypeInfos; private List columnNames; - private WriterCommitMessage commitMessage; + // Collect commit messages from all segments (each batchWriter commit produces one) + private final List commitMessages = new java.util.ArrayList<>(); + + // Per-segment row counter (resets after each segment commit) + private long segmentRows = 0; + + // Writer options cached for creating new batchWriters + private WriterOptions writerOptions; // Statistics private long writtenRows = 0; @@ -127,10 +153,12 @@ public MaxComputeJniWriter(int batchSize, Map params) { this.writeSessionId = Objects.requireNonNull(params.get(WRITE_SESSION_ID), "required property '" + WRITE_SESSION_ID + "'."); this.blockId = Long.parseLong(params.getOrDefault(BLOCK_ID, "0")); + this.nextBlockId = this.blockId + 1; // Reserve blockId for first writer, increment for segments this.partitionSpec = params.getOrDefault(PARTITION_SPEC, ""); this.connectTimeout = Integer.parseInt(params.getOrDefault(CONNECT_TIMEOUT, "10")); this.readTimeout = Integer.parseInt(params.getOrDefault(READ_TIMEOUT, "120")); this.retryCount = Integer.parseInt(params.getOrDefault(RETRY_COUNT, "4")); + this.maxWriteBatchRows = Integer.parseInt(params.getOrDefault(MAX_WRITE_BATCH_ROWS, "4096")); } @Override @@ -184,9 +212,10 @@ public void open() throws IOException { allocator = new RootAllocator(Long.MAX_VALUE); - // Create Arrow writer for this block - WriterOptions writerOptions = WriterOptions.newBuilder() + // Cache writer options for creating new batchWriters in segments + writerOptions = WriterOptions.newBuilder() .withSettings(settings) + .withCompressionCodec(CompressionCodec.ZSTD) .build(); batchWriter = writeSession.createArrowWriter(blockId, WriterAttemptId.of(0), writerOptions); @@ -210,19 +239,44 @@ protected void writeInternal(VectorTable inputTable) throws IOException { } try { - Object[][] data = inputTable.getMaterializedData(); + // Stream data directly from off-heap VectorColumn to Arrow vectors. + // Unlike the previous getMaterializedData() approach that created + // Object[][] (with String objects for STRING columns causing 3x memory + // amplification), this reads bytes directly from VectorColumn and writes + // to Arrow, keeping peak heap usage per batch to O(batch_rows * row_size) + // instead of O(2 * batch_rows * row_size). + int rowOffset = 0; + while (rowOffset < numRows) { + int batchRows = Math.min(maxWriteBatchRows, numRows - rowOffset); - // Get a pre-allocated VectorSchemaRoot from the batch writer - VectorSchemaRoot root = batchWriter.newElement(); - root.setRowCount(numRows); + // For variable-width columns, check byte budget to avoid Arrow int32 overflow + batchRows = limitWriteBatchByBytesStreaming(inputTable, numCols, + rowOffset, batchRows); - for (int col = 0; col < numCols && col < columnTypeInfos.size(); col++) { - OdpsType odpsType = columnTypeInfos.get(col).getOdpsType(); - fillArrowVector(root, col, odpsType, data[col], numRows); - } + VectorSchemaRoot root = batchWriter.newElement(); + try { + root.setRowCount(batchRows); - batchWriter.write(root); - writtenRows += numRows; + for (int col = 0; col < numCols && col < columnTypeInfos.size(); col++) { + OdpsType odpsType = columnTypeInfos.get(col).getOdpsType(); + fillArrowVectorStreaming(root, col, odpsType, + inputTable.getColumn(col), rowOffset, batchRows); + } + + batchWriter.write(root); + } finally { + root.close(); + } + + writtenRows += batchRows; + segmentRows += batchRows; + rowOffset += batchRows; + + // Segmented commit: rotate batchWriter to release SDK native memory + if (segmentRows >= ROWS_PER_SEGMENT) { + rotateBatchWriter(); + } + } } catch (Exception e) { String errorMsg = "Failed to write data to MaxCompute table " + project + "." + tableName; LOG.error(errorMsg, e); @@ -230,17 +284,315 @@ protected void writeInternal(VectorTable inputTable) throws IOException { } } + /** + * Commit current batchWriter and create a new one with a fresh blockId. + * This forces the MaxCompute SDK to flush and release internal native memory + * buffers that accumulate during writes. Without rotation, the SDK holds all + * serialized Arrow data in native memory until close(), causing process RSS + * to grow linearly with total data volume. + */ + private void rotateBatchWriter() throws IOException { + try { + // 1. Commit current batchWriter and save its commit message + WriterCommitMessage msg = batchWriter.commit(); + commitMessages.add(msg); + batchWriter = null; + + // 2. Close current allocator to release Arrow native memory + allocator.close(); + allocator = null; + + // 3. Create new allocator and batchWriter with a new blockId + long newBlockId = nextBlockId++; + allocator = new RootAllocator(Long.MAX_VALUE); + batchWriter = writeSession.createArrowWriter(newBlockId, + WriterAttemptId.of(0), writerOptions); + + LOG.info("Rotated batchWriter: oldBlockId=" + blockId + ", newBlockId=" + newBlockId + + ", totalCommitMessages=" + commitMessages.size() + + ", totalWrittenRows=" + writtenRows); + + blockId = newBlockId; + segmentRows = 0; + } catch (Exception e) { + throw new IOException("Failed to rotate batchWriter for table " + + project + "." + tableName, e); + } + } + + + private boolean isVariableWidthType(OdpsType type) { + return type == OdpsType.STRING || type == OdpsType.VARCHAR + || type == OdpsType.CHAR || type == OdpsType.BINARY; + } + + /** + * Limit write batch size by estimating variable-width column bytes directly + * from the off-heap VectorColumn, without materializing data to Java heap. + */ + private int limitWriteBatchByBytesStreaming(VectorTable inputTable, int numCols, + int rowOffset, int batchRows) { + for (int col = 0; col < numCols && col < columnTypeInfos.size(); col++) { + OdpsType odpsType = columnTypeInfos.get(col).getOdpsType(); + if (!isVariableWidthType(odpsType)) { + continue; + } + VectorColumn vc = inputTable.getColumn(col); + batchRows = findMaxRowsForColumnStreaming(vc, rowOffset, batchRows, MAX_ARROW_BATCH_BYTES); + if (batchRows <= 1) { + return Math.max(1, batchRows); + } + } + return batchRows; + } + + /** + * Find the maximum number of rows (from rowOffset) whose total byte size + * fits within budget, by reading offset metadata directly from VectorColumn. + */ + private int findMaxRowsForColumnStreaming(VectorColumn vc, int rowOffset, int maxRows, long budget) { + long totalBytes = estimateColumnBytesStreaming(vc, rowOffset, maxRows); + if (totalBytes <= budget) { + return maxRows; + } + int rows = maxRows; + while (rows > 1) { + rows = rows / 2; + totalBytes = estimateColumnBytesStreaming(vc, rowOffset, rows); + if (totalBytes <= budget) { + int lo = rows; + int hi = Math.min(rows * 2, maxRows); + while (lo < hi) { + int mid = lo + (hi - lo + 1) / 2; + if (estimateColumnBytesStreaming(vc, rowOffset, mid) <= budget) { + lo = mid; + } else { + hi = mid - 1; + } + } + return lo; + } + } + return 1; + } + + /** + * Estimate total bytes for a range of rows in a VectorColumn by reading + * the offset array directly from off-heap memory, without creating any + * byte[] objects. This is O(1) per row (just offset subtraction). + */ + private long estimateColumnBytesStreaming(VectorColumn vc, int rowOffset, int rows) { + long total = 0; + long offsetAddr = vc.offsetAddress(); + for (int i = rowOffset; i < rowOffset + rows; i++) { + if (!vc.isNullAt(i)) { + // String offsets are stored as int32 in VectorColumn + int startOff = i == 0 ? 0 + : org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * (i - 1)); + int endOff = org.apache.doris.common.jni.utils.OffHeap.getInt(null, offsetAddr + 4L * i); + total += (endOff - startOff); + } + } + return total; + } + + /** + * Fill an Arrow vector by reading data directly from a VectorColumn, + * one row at a time. For STRING columns, this reads bytes directly + * (getBytesWithOffset) instead of creating String objects, eliminating + * the String -> byte[] double-copy that caused heap exhaustion. + */ + private void fillArrowVectorStreaming(VectorSchemaRoot root, int colIdx, OdpsType odpsType, + VectorColumn vc, int rowOffset, int numRows) { + switch (odpsType) { + case BOOLEAN: { + BitVector vec = (BitVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getBoolean(rowOffset + i) ? 1 : 0); + } + } + vec.setValueCount(numRows); + break; + } + case TINYINT: { + TinyIntVector vec = (TinyIntVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getByte(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case SMALLINT: { + SmallIntVector vec = (SmallIntVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getShort(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case INT: { + IntVector vec = (IntVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getInt(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case BIGINT: { + BigIntVector vec = (BigIntVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getLong(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case FLOAT: { + Float4Vector vec = (Float4Vector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getFloat(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case DOUBLE: { + Float8Vector vec = (Float8Vector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getDouble(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case DECIMAL: { + DecimalVector vec = (DecimalVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + vec.set(i, vc.getDecimal(rowOffset + i)); + } + } + vec.setValueCount(numRows); + break; + } + case STRING: + case VARCHAR: + case CHAR: { + // KEY FIX: Read bytes directly from off-heap, no String creation. + // Previously: getMaterializedData -> String[] -> toString().getBytes() -> Arrow + // Now: getBytesWithOffset() -> byte[] -> Arrow (1 copy instead of 3) + VarCharVector vec = (VarCharVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + byte[] bytes = vc.getBytesWithOffset(rowOffset + i); + vec.setSafe(i, bytes); + } + } + vec.setValueCount(numRows); + break; + } + case DATE: { + DateDayVector vec = (DateDayVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + LocalDate date = vc.getDate(rowOffset + i); + vec.set(i, (int) date.toEpochDay()); + } + } + vec.setValueCount(numRows); + break; + } + case DATETIME: + case TIMESTAMP: { + TimeStampMilliVector vec = (TimeStampMilliVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + LocalDateTime dt = vc.getDateTime(rowOffset + i); + long millis = dt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + vec.set(i, millis); + } + } + vec.setValueCount(numRows); + break; + } + case BINARY: { + VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx); + vec.allocateNew(numRows); + for (int i = 0; i < numRows; i++) { + if (vc.isNullAt(rowOffset + i)) { + vec.setNull(i); + } else { + byte[] bytes = vc.getBytesWithOffset(rowOffset + i); + vec.setSafe(i, bytes); + } + } + vec.setValueCount(numRows); + break; + } + default: { + // For complex types (ARRAY, MAP, STRUCT) and other types, + // fall back to object-based materialization for this column only. + Object[] colData = vc.getObjectColumn(rowOffset, rowOffset + numRows); + fillArrowVector(root, colIdx, odpsType, colData, 0, numRows); + break; + } + } + } + private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsType, - Object[] colData, int numRows) { + Object[] colData, int rowOffset, int numRows) { switch (odpsType) { case BOOLEAN: { BitVector vec = (BitVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, (Boolean) colData[i] ? 1 : 0); + vec.set(i, (Boolean) colData[rowOffset + i] ? 1 : 0); } } vec.setValueCount(numRows); @@ -250,10 +602,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp TinyIntVector vec = (TinyIntVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).byteValue()); + vec.set(i, ((Number) colData[rowOffset + i]).byteValue()); } } vec.setValueCount(numRows); @@ -263,10 +615,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp SmallIntVector vec = (SmallIntVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).shortValue()); + vec.set(i, ((Number) colData[rowOffset + i]).shortValue()); } } vec.setValueCount(numRows); @@ -276,10 +628,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp IntVector vec = (IntVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).intValue()); + vec.set(i, ((Number) colData[rowOffset + i]).intValue()); } } vec.setValueCount(numRows); @@ -289,10 +641,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp BigIntVector vec = (BigIntVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).longValue()); + vec.set(i, ((Number) colData[rowOffset + i]).longValue()); } } vec.setValueCount(numRows); @@ -302,10 +654,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp Float4Vector vec = (Float4Vector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).floatValue()); + vec.set(i, ((Number) colData[rowOffset + i]).floatValue()); } } vec.setValueCount(numRows); @@ -315,10 +667,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp Float8Vector vec = (Float8Vector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.set(i, ((Number) colData[i]).doubleValue()); + vec.set(i, ((Number) colData[rowOffset + i]).doubleValue()); } } vec.setValueCount(numRows); @@ -328,12 +680,12 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp DecimalVector vec = (DecimalVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - BigDecimal bd = (colData[i] instanceof BigDecimal) - ? (BigDecimal) colData[i] - : new BigDecimal(colData[i].toString()); + BigDecimal bd = (colData[rowOffset + i] instanceof BigDecimal) + ? (BigDecimal) colData[rowOffset + i] + : new BigDecimal(colData[rowOffset + i].toString()); vec.set(i, bd); } } @@ -346,14 +698,14 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp VarCharVector vec = (VarCharVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { byte[] bytes; - if (colData[i] instanceof byte[]) { - bytes = (byte[]) colData[i]; + if (colData[rowOffset + i] instanceof byte[]) { + bytes = (byte[]) colData[rowOffset + i]; } else { - bytes = colData[i].toString().getBytes(StandardCharsets.UTF_8); + bytes = colData[rowOffset + i].toString().getBytes(StandardCharsets.UTF_8); } vec.setSafe(i, bytes); } @@ -365,12 +717,12 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp DateDayVector vec = (DateDayVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); - } else if (colData[i] instanceof LocalDate) { - vec.set(i, (int) ((LocalDate) colData[i]).toEpochDay()); + } else if (colData[rowOffset + i] instanceof LocalDate) { + vec.set(i, (int) ((LocalDate) colData[rowOffset + i]).toEpochDay()); } else { - vec.set(i, (int) LocalDate.parse(colData[i].toString()).toEpochDay()); + vec.set(i, (int) LocalDate.parse(colData[rowOffset + i].toString()).toEpochDay()); } } vec.setValueCount(numRows); @@ -381,16 +733,16 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp TimeStampMilliVector vec = (TimeStampMilliVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); - } else if (colData[i] instanceof LocalDateTime) { - long millis = ((LocalDateTime) colData[i]) + } else if (colData[rowOffset + i] instanceof LocalDateTime) { + long millis = ((LocalDateTime) colData[rowOffset + i]) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); vec.set(i, millis); - } else if (colData[i] instanceof java.sql.Timestamp) { - vec.set(i, ((java.sql.Timestamp) colData[i]).getTime()); + } else if (colData[rowOffset + i] instanceof java.sql.Timestamp) { + vec.set(i, ((java.sql.Timestamp) colData[rowOffset + i]).getTime()); } else { - long millis = LocalDateTime.parse(colData[i].toString()) + long millis = LocalDateTime.parse(colData[rowOffset + i].toString()) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); vec.set(i, millis); } @@ -402,12 +754,12 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp VarBinaryVector vec = (VarBinaryVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); - } else if (colData[i] instanceof byte[]) { - vec.setSafe(i, (byte[]) colData[i]); + } else if (colData[rowOffset + i] instanceof byte[]) { + vec.setSafe(i, (byte[]) colData[rowOffset + i]); } else { - vec.setSafe(i, colData[i].toString().getBytes(StandardCharsets.UTF_8)); + vec.setSafe(i, colData[rowOffset + i].toString().getBytes(StandardCharsets.UTF_8)); } } vec.setValueCount(numRows); @@ -419,10 +771,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp FieldVector dataVec = listVec.getDataVector(); int elemIdx = 0; for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { listVec.setNull(i); } else { - List list = (List) colData[i]; + List list = (List) colData[rowOffset + i]; listVec.startNewValue(i); for (Object elem : list) { writeListElement(dataVec, elemIdx++, elem); @@ -442,10 +794,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp FieldVector valVec = structVec.getChildrenFromFields().get(1); int elemIdx = 0; for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { mapVec.setNull(i); } else { - Map map = (Map) colData[i]; + Map map = (Map) colData[rowOffset + i]; mapVec.startNewValue(i); for (Map.Entry entry : map.entrySet()) { structVec.setIndexDefined(elemIdx); @@ -466,11 +818,11 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp StructVector structVec = (StructVector) root.getVector(colIdx); structVec.allocateNew(); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { structVec.setNull(i); } else { structVec.setIndexDefined(i); - Map struct = (Map) colData[i]; + Map struct = (Map) colData[rowOffset + i]; for (FieldVector childVec : structVec.getChildrenFromFields()) { Object val = struct.get(childVec.getName()); writeListElement(childVec, i, val); @@ -488,10 +840,10 @@ private void fillArrowVector(VectorSchemaRoot root, int colIdx, OdpsType odpsTyp VarCharVector vec = (VarCharVector) root.getVector(colIdx); vec.allocateNew(numRows); for (int i = 0; i < numRows; i++) { - if (colData[i] == null) { + if (colData[rowOffset + i] == null) { vec.setNull(i); } else { - vec.setSafe(i, colData[i].toString().getBytes(StandardCharsets.UTF_8)); + vec.setSafe(i, colData[rowOffset + i].toString().getBytes(StandardCharsets.UTF_8)); } } vec.setValueCount(numRows); @@ -580,23 +932,41 @@ private void writeListElement(FieldVector vec, int idx, Object elem) { @Override public void close() throws IOException { + Exception firstException = null; try { + // Commit the final segment's batchWriter if (batchWriter != null) { - commitMessage = batchWriter.commit(); - batchWriter = null; + try { + WriterCommitMessage msg = batchWriter.commit(); + commitMessages.add(msg); + } catch (Exception e) { + firstException = e; + LOG.warn("Failed to commit batch writer for table " + project + "." + tableName, e); + } finally { + batchWriter = null; + } } + } finally { if (allocator != null) { - allocator.close(); - allocator = null; + try { + allocator.close(); + } catch (Exception e) { + LOG.warn("Failed to close Arrow allocator (possible memory leak)", e); + if (firstException == null) { + firstException = e; + } + } finally { + allocator = null; + } } - LOG.info("MaxComputeJniWriter closed: writeSessionId=" + writeSessionId - + ", partitionSpec=" + partitionSpec - + ", writtenRows=" + writtenRows - + ", blockId=" + blockId); - } catch (Exception e) { - String errorMsg = "Failed to close MaxCompute arrow writer"; - LOG.error(errorMsg, e); - throw new IOException(errorMsg, e); + } + LOG.info("MaxComputeJniWriter closed: writeSessionId=" + writeSessionId + + ", partitionSpec=" + partitionSpec + + ", writtenRows=" + writtenRows + + ", totalSegments=" + commitMessages.size() + + ", blockId=" + blockId); + if (firstException != null) { + throw new IOException("Failed to close MaxCompute arrow writer", firstException); } } @@ -605,16 +975,18 @@ public Map getStatistics() { Map stats = new HashMap<>(); stats.put("mc_partition_spec", partitionSpec != null ? partitionSpec : ""); - // Serialize WriterCommitMessage to Base64 - if (commitMessage != null) { + // Serialize all WriterCommitMessages (one per segment) as a List object. + if (!commitMessages.isEmpty()) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(commitMessage); + // Serialize the entire list as one object to avoid mixing + // writeInt/writeObject which causes OptionalDataException + oos.writeObject(new java.util.ArrayList<>(commitMessages)); oos.close(); stats.put("mc_commit_message", Base64.getEncoder().encodeToString(baos.toByteArray())); } catch (IOException e) { - LOG.error("Failed to serialize WriterCommitMessage", e); + LOG.error("Failed to serialize WriterCommitMessages", e); } } @@ -625,3 +997,4 @@ public Map getStatistics() { return stats; } } + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java index 627f3bc03e2f82..9ca45b19ea86bd 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/maxcompute/MCProperties.java @@ -57,6 +57,12 @@ public class MCProperties { public static final String DEFAULT_READ_TIMEOUT = "120"; // 120s public static final String DEFAULT_RETRY_COUNT = "4"; // 4 times + public static final String MAX_FIELD_SIZE = "mc.max_field_size_bytes"; + public static final String DEFAULT_MAX_FIELD_SIZE = "8388608"; // 8 * 1024 * 1024 = 8MB + + public static final String MAX_WRITE_BATCH_ROWS = "mc.max_write_batch_rows"; + public static final String DEFAULT_MAX_WRITE_BATCH_ROWS = "4096"; + //withCrossPartition(true): // Very friendly to scenarios where there are many partitions but each partition is very small. //withCrossPartition(false): diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java index 871312c476c7a0..77200d47cec6c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MCTransaction.java @@ -95,6 +95,7 @@ public void beginInsert(ExternalTable dorisTable, Optional TableWriteSessionBuilder builder = new TableWriteSessionBuilder() .identifier(tableId) .withSettings(catalog.getSettings()) + .withMaxFieldSize(catalog.getMaxFieldSize()) .withArrowOptions(ArrowOptions.newBuilder() .withDatetimeUnit(TimestampUnit.MILLI) .withTimestampUnit(TimestampUnit.MILLI) @@ -136,9 +137,13 @@ public void finishInsert() throws UserException { byte[] bytes = Base64.getDecoder().decode(data.getCommitMessage()); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais); - WriterCommitMessage msg = (WriterCommitMessage) ois.readObject(); + // Deserialized as List — supports segmented + // commit where one writer produces multiple commit messages + @SuppressWarnings("unchecked") + List msgs = + (List) ois.readObject(); + allMessages.addAll(msgs); ois.close(); - allMessages.add(msg); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index 73eb62f5dc2577..b3a15355b14aae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -69,6 +69,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private int connectTimeout; private int readTimeout; private int retryTimes; + private long maxFieldSize; + private int maxWriteBatchRows; public boolean dateTimePredicatePushDown; @@ -191,6 +193,10 @@ protected void initLocalObjectsImpl() { props.getOrDefault(MCProperties.READ_TIMEOUT, MCProperties.DEFAULT_READ_TIMEOUT)); retryTimes = Integer.parseInt( props.getOrDefault(MCProperties.RETRY_COUNT, MCProperties.DEFAULT_RETRY_COUNT)); + maxFieldSize = Long.parseLong( + props.getOrDefault(MCProperties.MAX_FIELD_SIZE, MCProperties.DEFAULT_MAX_FIELD_SIZE)); + maxWriteBatchRows = Integer.parseInt( + props.getOrDefault(MCProperties.MAX_WRITE_BATCH_ROWS, MCProperties.DEFAULT_MAX_WRITE_BATCH_ROWS)); RestOptions restOptions = RestOptions.newBuilder() .withConnectTimeout(connectTimeout) @@ -320,6 +326,16 @@ public int getReadTimeout() { return readTimeout; } + public long getMaxFieldSize() { + makeSureInitialized(); + return maxFieldSize; + } + + public int getMaxWriteBatchRows() { + makeSureInitialized(); + return maxWriteBatchRows; + } + public boolean getDateTimePredicatePushDown() { return dateTimePredicatePushDown; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java index 3c59f737eb77bd..bc0f6fc7f31f14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalMaxComputeTableSink.java @@ -113,6 +113,19 @@ public PhysicalProperties getRequirePhysicalProperties() { .map(Column::getName) .collect(Collectors.toSet()); if (!partitionNames.isEmpty()) { + // Check if any partition column is present in cols (the bound columns from SELECT). + // Static partition columns are excluded from cols by BindSink.bindMaxComputeTableSink(), + // so if no partition column remains in cols, all partitions are statically specified + // and we don't need sort/shuffle — all data goes to a single known partition. + Set colNames = cols.stream() + .map(Column::getName) + .collect(Collectors.toSet()); + boolean hasDynamicPartition = partitionNames.stream().anyMatch(colNames::contains); + if (!hasDynamicPartition) { + // All partition columns are statically specified, no sort needed + return PhysicalProperties.SINK_RANDOM_PARTITIONED; + } + List columnIdx = new ArrayList<>(); List fullSchema = targetTable.getFullSchema(); for (int i = 0; i < fullSchema.size(); i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java index fdb50245a8e25a..bf52e894628cd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaxComputeTableSink.java @@ -74,6 +74,7 @@ public void bindDataSink(Optional insertCtx) throws Analys tSink.setConnectTimeout(catalog.getConnectTimeout()); tSink.setReadTimeout(catalog.getReadTimeout()); tSink.setRetryCount(catalog.getRetryTimes()); + tSink.setMaxWriteBatchRows(catalog.getMaxWriteBatchRows()); // Partition columns List partitionColumnNames = targetTable.getPartitionColumns().stream() diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 842765273e057d..9cb9c0925b635c 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -514,6 +514,7 @@ struct TMaxComputeTableSink { 14: optional list partition_columns // partition column names for dynamic partition 15: optional string write_session_id // Storage API write session ID 16: optional map properties // contains authentication properties + 17: optional i32 max_write_batch_rows // max rows per Arrow batch for write } struct TDataSink { diff --git a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out index 50b2cb76282ca7..4b1de864f613bb 100644 --- a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out +++ b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_large_data.out @@ -44,3 +44,13 @@ 8 name_8 0.08 20250101 r3 9 name_9 0.09 20250102 r1 +-- !props_count -- +2000 + +-- !props_top5 -- +0 name_0 +1 name_1 +2 name_2 +3 name_3 +4 name_4 + diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy index 2d6e89c594a637..6dc849c97c3455 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy @@ -140,4 +140,53 @@ suite("test_mc_write_large_data", "p2,external,maxcompute,external_remote,extern sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}""" sql """DROP DATABASE IF EXISTS internal.${internal_db}""" } + + // Test: mc.max_write_batch_rows and mc.max_field_size_bytes catalog properties + String mc_catalog_props = "test_mc_write_large_data_props" + sql """drop catalog if exists ${mc_catalog_props}""" + sql """ + CREATE CATALOG IF NOT EXISTS ${mc_catalog_props} PROPERTIES ( + "type" = "max_compute", + "mc.default.project" = "${defaultProject}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api", + "mc.quota" = "pay-as-you-go", + "mc.enable.namespace.schema" = "true", + "mc.max_write_batch_rows" = "512", + "mc.max_field_size_bytes" = "4194304" + ); + """ + + sql """switch ${mc_catalog_props}""" + String db_props = "mc_props_test_${uuid}" + sql """drop database if exists ${db_props}""" + sql """create database ${db_props}""" + sql """use ${db_props}""" + + try { + String tb_props = "props_verify_${uuid}" + sql """DROP TABLE IF EXISTS ${tb_props}""" + sql """ + CREATE TABLE ${tb_props} ( + id INT, + name STRING + ) + """ + + // Insert 2000 rows to exceed max_write_batch_rows=512 (will be split into 4 batches) + sql """ + INSERT INTO ${tb_props} + SELECT + number AS id, + concat('name_', cast(number AS STRING)) AS name + FROM numbers("number"="2000") + """ + + qt_props_count """ SELECT count(*) FROM ${tb_props} """ + order_qt_props_top5 """ SELECT * FROM ${tb_props} ORDER BY id LIMIT 5 """ + } finally { + sql """drop database if exists ${mc_catalog_props}.${db_props}""" + sql """drop catalog if exists ${mc_catalog_props}""" + } } diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy index 0436577336b25c..f280714839800f 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy @@ -66,6 +66,17 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote order_qt_static_single_p1 """ SELECT * FROM ${tb1} WHERE ds = '20250101' """ order_qt_static_single_p2 """ SELECT * FROM ${tb1} WHERE ds = '20250102' """ + // Explain: static partition INSERT should NOT have SORT node + explain { + sql("INSERT INTO ${tb1} PARTITION(ds='20250103') VALUES (4, 'd')") + notContains "SORT" + } + // Explain: static partition INSERT OVERWRITE should NOT have SORT node + explain { + sql("INSERT OVERWRITE TABLE ${tb1} PARTITION(ds='20250101') VALUES (5, 'e')") + notContains "SORT" + } + // Test 2: Multi-level partition columns static partition INSERT INTO String tb2 = "static_multi_${uuid}" sql """DROP TABLE IF EXISTS ${tb2}""" @@ -82,6 +93,12 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote order_qt_static_multi_all """ SELECT * FROM ${tb2} """ order_qt_static_multi_bj """ SELECT * FROM ${tb2} WHERE region = 'bj' """ + // Explain: all partition columns statically specified should NOT have SORT node + explain { + sql("INSERT INTO ${tb2} PARTITION(ds='20250102', region='gz') VALUES (4, 'v4')") + notContains "SORT" + } + test { sql """ INSERT INTO ${tb2} PARTITION(ds='20250101', region='bj', ds='20250102') VALUES (1, 'v1'), (2, 'v2');""" exception "Duplicate partition column: ds" @@ -111,6 +128,12 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote sql """INSERT INTO ${tb3_dst} PARTITION(ds='20250201') SELECT id, name FROM ${tb3_src}""" order_qt_static_select """ SELECT * FROM ${tb3_dst} """ + // Explain: static partition INSERT INTO SELECT should NOT have SORT node + explain { + sql("INSERT INTO ${tb3_dst} PARTITION(ds='20250202') SELECT id, name FROM ${tb3_src}") + notContains "SORT" + } + // Test 4: INSERT OVERWRITE static partition String tb4 = "overwrite_part_${uuid}" sql """DROP TABLE IF EXISTS ${tb4}""" @@ -129,6 +152,7 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote order_qt_overwrite_p2 """ SELECT * FROM ${tb4} WHERE ds = '20250102' """ // Test 5: Dynamic partition regression (ensure not broken) + // Dynamic partition: partition column 'ds' is in the data, SORT node IS expected String tb5 = "dynamic_reg_${uuid}" sql """DROP TABLE IF EXISTS ${tb5}""" sql """ @@ -141,6 +165,17 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote sql """INSERT INTO ${tb5} VALUES (1, 'a', '20250101'), (2, 'b', '20250102')""" order_qt_dynamic_regression """ SELECT * FROM ${tb5} """ + // Explain: dynamic partition INSERT should HAVE SORT node (partition col in data) + explain { + sql("INSERT INTO ${tb5} VALUES (3, 'c', '20250103')") + contains "SORT" + } + // Explain: dynamic partition INSERT INTO SELECT should HAVE SORT node + explain { + sql("INSERT INTO ${tb5} SELECT * FROM ${tb3_src}") + contains "SORT" + } + // Test 6: INSERT OVERWRITE non-partitioned table String tb6 = "overwrite_nopart_${uuid}" sql """DROP TABLE IF EXISTS ${tb6}""" @@ -153,6 +188,41 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote sql """INSERT INTO ${tb6} VALUES (1, 'old')""" sql """INSERT OVERWRITE TABLE ${tb6} VALUES (2, 'new')""" order_qt_overwrite_no_part """ SELECT * FROM ${tb6} """ + + // Explain: non-partitioned table INSERT should NOT have SORT node + explain { + sql("INSERT INTO ${tb6} VALUES (3, 'val')") + notContains "SORT" + } + // Explain: non-partitioned table INSERT OVERWRITE should NOT have SORT node + explain { + sql("INSERT OVERWRITE TABLE ${tb6} VALUES (4, 'val2')") + notContains "SORT" + } + + // Test 7: Multi-level partition with partial static (only some partition cols specified) + // When only some partition columns are statically specified (partial static), + // the remaining partition columns are dynamic, so SORT node IS expected + String tb7 = "partial_static_${uuid}" + sql """DROP TABLE IF EXISTS ${tb7}""" + sql """ + CREATE TABLE ${tb7} ( + id INT, + val STRING, + ds STRING, + region STRING + ) PARTITION BY (ds, region)() + """ + // Explain: partial static partition (ds static, region dynamic) should HAVE SORT node + explain { + sql("INSERT INTO ${tb7} PARTITION(ds='20250101') VALUES (1, 'v1', 'bj')") + contains "SORT" + } + // Explain: all static partition should NOT have SORT node + explain { + sql("INSERT INTO ${tb7} PARTITION(ds='20250101', region='bj') VALUES (1, 'v1')") + notContains "SORT" + } } finally { sql """drop database if exists ${mc_catalog_name}.${db}""" }