From 2e9394d360ee1334705f6cb95e23ff5ff50df1dd Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 12:25:56 +0000 Subject: [PATCH 01/12] add more flushing stats: partitions/rows, bytes rate, CPU and heap allocation for the flushing thread --- .../unified/ShardedMultiWriter.java | 8 +++ .../cassandra/db/memtable/Flushing.java | 33 +++++++++- .../io/sstable/RangeAwareSSTableWriter.java | 6 ++ .../io/sstable/SSTableMultiWriter.java | 1 + .../sstable/SSTableTxnSingleStreamWriter.java | 6 ++ .../io/sstable/SSTableZeroCopyWriter.java | 6 ++ .../io/sstable/SimpleSSTableMultiWriter.java | 5 ++ .../io/sstable/format/SSTableWriter.java | 5 ++ .../sstable/metadata/MetadataCollector.java | 5 ++ .../apache/cassandra/utils/ThreadStats.java | 60 +++++++++++++++++++ 10 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 src/java/org/apache/cassandra/utils/ThreadStats.java diff --git a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java index 79efadbfa37f..b0d3f6ec9c8d 100644 --- a/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java @@ -205,6 +205,14 @@ public long getOnDiskBytesWritten() return bytesWritten; } + public long getTotalRows() + { + long totalRows = 0; + for (int i = 0; i <= currentWriter; ++i) + totalRows += writers[i].getTotalRows(); + return totalRows; + } + @Override public TableId getTableId() { diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java b/src/java/org/apache/cassandra/db/memtable/Flushing.java index 3fc856858294..054f50a362a1 100644 --- a/src/java/org/apache/cassandra/db/memtable/Flushing.java +++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -44,7 +45,9 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.ThreadStats; public class Flushing { @@ -151,6 +154,9 @@ public FlushRunnable(Memtable.FlushablePartitionSet flushSet, private void writeSortedContents() { logger.info("Writing {}, flushed range = [{}, {})", toFlush.memtable(), toFlush.from(), toFlush.to()); + long startTimeNs = Clock.Global.nanoTime(); + long startCpuTime = ThreadStats.getCurrentThreadCpuTimeNano(); + long startAllocatedBytes = ThreadStats.getCurrentThreadAllocatedBytes(); // (we can't clear out the map as-we-go to free up memory, // since the memtable is being used for queries in the "pending flush" category) @@ -175,11 +181,34 @@ private void writeSortedContents() if (logCompletion) { + long endTimeNs = Clock.Global.nanoTime(); + long endCpuTime = ThreadStats.getCurrentThreadCpuTimeNano(); + long endAllocatedBytes = ThreadStats.getCurrentThreadAllocatedBytes(); + + long durationMs = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs); + long durationSec = TimeUnit.MILLISECONDS.toSeconds(durationMs); + durationSec = durationSec == 0 ? 1 : durationSec; long bytesFlushed = writer.getBytesWritten(); - logger.info("Completed flushing {} ({}) for commitlog position {}", + long byteFlushedPerSec = bytesFlushed / durationSec; + long partitionsPerSec = toFlush.partitionCount() / durationSec; + long rowsPerSec = writer.getTotalRows() / durationSec; + + logger.info("Completed flushing {} ({}) for commitlog position {}, " + + "time spent: {} ms, " + + "bytes flushed: {}/({} per sec), " + + "partitions flushed: {}/({} per sec), " + + "rows: {}/({} per sec), " + + "cpu time: {} ms, allocated: {}", writer.getFilename(), FBUtilities.prettyPrintMemory(bytesFlushed), - toFlush.memtable().getFinalCommitLogUpperBound()); + toFlush.memtable().getFinalCommitLogUpperBound(), + durationMs, + bytesFlushed, byteFlushedPerSec, + toFlush.partitionCount(), partitionsPerSec, + writer.getTotalRows(), rowsPerSec, + startCpuTime < 0 ? "n/a": TimeUnit.NANOSECONDS.toMillis(endCpuTime - startCpuTime), + endAllocatedBytes < 0 ? "n/a" : FBUtilities.prettyPrintMemory(endAllocatedBytes - startAllocatedBytes) + ); // Update the metrics metrics.bytesFlushed.inc(bytesFlushed); } diff --git a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java index 1b4fa8e8febc..1db8ef679dbb 100644 --- a/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/RangeAwareSSTableWriter.java @@ -154,6 +154,12 @@ public long getBytesWritten() return currentWriter != null ? currentWriter.getBytesWritten() : 0L; } + @Override + public long getTotalRows() + { + return currentWriter != null ? currentWriter.getTotalRows() : 0L; + } + @Override public long getOnDiskBytesWritten() { diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java index 9a7968071b25..c5ea26af98f0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java @@ -44,6 +44,7 @@ public interface SSTableMultiWriter extends Transactional String getFilename(); long getBytesWritten(); long getOnDiskBytesWritten(); + long getTotalRows(); TableId getTableId(); static void abortOrDie(SSTableMultiWriter writer) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java index 390e63858f02..35a31ded44d8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnSingleStreamWriter.java @@ -106,6 +106,12 @@ public long getOnDiskBytesWritten() return writer.getOnDiskBytesWritten(); } + @Override + public long getTotalRows() + { + return writer.getTotalRows(); + } + @Override public TableId getTableId() { diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java index 47566ee55062..de30fa8def0e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableZeroCopyWriter.java @@ -163,6 +163,12 @@ public long getOnDiskBytesWritten() return 0; } + @Override + public long getTotalRows() + { + return 0; + } + @Override public TableId getTableId() { diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index fa541d075c1d..4b7d046a84f4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -81,6 +81,11 @@ public long getOnDiskBytesWritten() return writer.getEstimatedOnDiskBytesWritten(); } + public long getTotalRows() + { + return writer.getEstimatedOnDiskBytesWritten(); + } + public TableId getTableId() { return writer.metadata().id; diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 8543da23e2e4..a0fd3306906c 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -213,6 +213,11 @@ public long getEstimatedOnDiskBytesWritten() return getOnDiskFilePointer(); } + public long getTotalRows() + { + return metadataCollector.getTotalRows(); + } + /** * Reset the data file to the marked position (see {@link #mark()}) and truncate the rest of the file. */ diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index a137b653a4e8..ee9082c68919 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -405,6 +405,11 @@ public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards) this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; } + public long getTotalRows() + { + return totalRows; + } + public Map finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, TimeUUID pendingRepair, boolean isTransient, SerializationHeader header, ByteBuffer firstKey, ByteBuffer lastKey) { assert minClustering.kind() == ClusteringPrefix.Kind.CLUSTERING || minClustering.kind().isStart(); diff --git a/src/java/org/apache/cassandra/utils/ThreadStats.java b/src/java/org/apache/cassandra/utils/ThreadStats.java new file mode 100644 index 000000000000..f162618db653 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/ThreadStats.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.lang.management.ManagementFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.management.ThreadMXBean; + +public class ThreadStats +{ + private final static Logger logger = LoggerFactory.getLogger(ThreadStats.class); + + private static final ThreadMXBean threadMxBean; + static + { + ThreadMXBean threadMxBeanToInit; + try + { + threadMxBeanToInit = (com.sun.management.ThreadMXBean) ManagementFactory.getThreadMXBean(); + threadMxBeanToInit.setThreadAllocatedMemoryEnabled(true); + } + catch (Throwable e) + { + threadMxBeanToInit = null; + logger.debug("Per thread stats are not available: {}", e.getMessage()); + } + threadMxBean = threadMxBeanToInit; + } + + public static long getCurrentThreadCpuTimeNano() + { + return threadMxBean == null || !threadMxBean.isCurrentThreadCpuTimeSupported() + ? -1 : threadMxBean.getCurrentThreadCpuTime(); + } + + public static long getCurrentThreadAllocatedBytes() + { + return threadMxBean == null || !threadMxBean.isThreadAllocatedMemorySupported() + ? -1 : threadMxBean.getCurrentThreadAllocatedBytes(); + } +} From ab3ee745a8b67eea8626e1437df007653d9e409e Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 14:16:03 +0000 Subject: [PATCH 02/12] invoke metadataCollector.updateClusteringValues only for first and last clustering key in a partition --- .../io/sstable/format/SortedTableWriter.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index d600ee665d34..919fe1634c85 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -145,8 +145,20 @@ public final AbstractRowIndexEntry append(UnfilteredRowIterator partition) if (header.hasStatic()) addStaticRow(partition.partitionKey(), partition.staticRow()); - while (partition.hasNext()) - addUnfiltered(partition.partitionKey(), partition.next()); + int i = 0; + boolean hasNext = false; + while (hasNext || partition.hasNext()) + { + Unfiltered current = partition.next(); + hasNext = partition.hasNext(); + boolean isRowFirstOrLast; + if (i == 0) + isRowFirstOrLast = true; + else + isRowFirstOrLast = !hasNext; + addUnfiltered(partition.partitionKey(), current, isRowFirstOrLast); + i++; + } indexEntry = endPartition(partition.partitionKey(), partition.partitionLevelDeletion()); @@ -197,20 +209,21 @@ private void addStaticRow(DecoratedKey key, Row row) throws IOException onStaticRow(row); } - private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered) throws IOException + private void addUnfiltered(DecoratedKey key, Unfiltered unfiltered, boolean isRowFirstOrLast) throws IOException { if (unfiltered.isRow()) - addRow(key, (Row) unfiltered); + addRow(key, (Row) unfiltered, isRowFirstOrLast); else addRangeTomstoneMarker((RangeTombstoneMarker) unfiltered); } - private void addRow(DecoratedKey key, Row row) throws IOException + private void addRow(DecoratedKey key, Row row, boolean isRowFirstOrLast) throws IOException { guardCollectionSize(key, row); partitionWriter.addUnfiltered(row); - metadataCollector.updateClusteringValues(row.clustering()); + if (isRowFirstOrLast) + metadataCollector.updateClusteringValues(row.clustering()); Rows.collectStats(row, metadataCollector); onRow(row); From fff8e1af1b8e911f6e3b8893afc153509420d538 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 14:36:40 +0000 Subject: [PATCH 03/12] split call sites for in Cell serialize logic, make isCounterCell cheaper (avoid megamorphic call + cache isCounterColumn) --- .../cassandra/db/rows/AbstractCell.java | 2 +- .../org/apache/cassandra/db/rows/Cell.java | 54 +++++++++++++++---- .../cassandra/schema/ColumnMetadata.java | 8 +++ 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index aa5536593dcc..61c3d32be3e0 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -50,7 +50,7 @@ protected AbstractCell(ColumnMetadata column) public boolean isCounterCell() { - return !isTombstone() && column.isCounterColumn(); + return column.isCounterColumn() && !isTombstone(); } public boolean isLive(long nowInSec) diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index d4c015b0cccc..b09f5485ed26 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -280,11 +280,47 @@ public static class Serializer public void serialize(Cell cell, ColumnMetadata column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException { assert cell != null; - boolean hasValue = cell.valueSize() > 0; - boolean isDeleted = cell.isTombstone(); - boolean isExpiring = cell.isExpiring(); - boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); - boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); + int valueSize; + boolean hasValue; + boolean isDeleted; + boolean isExpiring; + long cellTimestamp; + long localDeletionTime; + int ttl; + T value; + ValueAccessor accessor; + // to avoid megamorphic calls we split call sites + // we have ArrayCell, BufferCell and NativeCell and all of them can be here in different scenarios + if (cell.getClass() == NativeCell.class) + { + valueSize = cell.valueSize(); + hasValue = valueSize > 0; + isDeleted = cell.isTombstone(); + isExpiring = cell.isExpiring(); + cellTimestamp = cell.timestamp(); + localDeletionTime = cell.localDeletionTime(); + ttl = cell.ttl(); + value = cell.value(); + accessor = cell.accessor(); + } + else + { + valueSize = cell.valueSize(); + hasValue = valueSize > 0; + isDeleted = cell.isTombstone(); + isExpiring = cell.isExpiring(); + cellTimestamp = cell.timestamp(); + localDeletionTime = cell.localDeletionTime(); + ttl = cell.ttl(); + value = cell.value(); + accessor = cell.accessor(); + } + + boolean useRowTimestamp = !rowLiveness.isEmpty() && cellTimestamp == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring + && rowLiveness.isExpiring() + && ttl == rowLiveness.ttl() + && localDeletionTime == rowLiveness.localExpirationTime(); int flags = 0; if (!hasValue) flags |= HAS_EMPTY_VALUE_MASK; @@ -302,18 +338,18 @@ else if (isExpiring) out.writeByte((byte)flags); if (!useRowTimestamp) - header.writeTimestamp(cell.timestamp(), out); + header.writeTimestamp(cellTimestamp, out); if ((isDeleted || isExpiring) && !useRowTTL) - header.writeLocalDeletionTime(cell.localDeletionTime(), out); + header.writeLocalDeletionTime(localDeletionTime, out); if (isExpiring && !useRowTTL) - header.writeTTL(cell.ttl(), out); + header.writeTTL(ttl, out); if (column.isComplex()) column.cellPathSerializer().serialize(cell.path(), out); if (hasValue) - header.getType(column).writeValue(cell.value(), cell.accessor(), out); + header.getType(column).writeValue(value, accessor, out); } public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnMetadata column, SerializationHeader header, DeserializationHelper helper, ValueAccessor accessor) throws IOException diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java b/src/java/org/apache/cassandra/schema/ColumnMetadata.java index d6ffa1c1cac9..4bcbdcd7ad43 100644 --- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java +++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java @@ -135,6 +135,8 @@ public boolean isPrimaryKeyKind() */ private final long comparisonOrder; + private final boolean isCounterColumn; + /** * Masking function used to dynamically mask the contents of this column. */ @@ -288,6 +290,7 @@ public ColumnMetadata(String ksName, this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path()); this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b); this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name); + this.isCounterColumn = isCounterColumn(type); this.mask = mask; this.columnConstraints = columnConstraints; this.columnConstraints.setColumnName(name); @@ -729,6 +732,11 @@ public AbstractType cellValueType() * Check if column is counter type. */ public boolean isCounterColumn() + { + return isCounterColumn; + } + + private static boolean isCounterColumn(AbstractType type) { if (type instanceof CollectionType) // Possible with, for example, supercolumns return ((CollectionType) type).valueComparator().isCounter(); From 4642a746f67646e167849b3b2c68f75447214790 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 14:59:07 +0000 Subject: [PATCH 04/12] check if Guardrails enabled at the beginning of writing, avoid hidden auto-boxing for logging of primitive parameters --- .../io/sstable/format/SortedTableWriter.java | 22 +++++++++++++++---- .../io/sstable/format/big/BigTableWriter.java | 3 ++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java index 919fe1634c85..44e49c4770d8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java @@ -88,6 +88,11 @@ public abstract class SortedTableWriter

crcCheckChanceSupplier; + private final boolean isPartitionSizeGuardEnabled; + private final boolean isPartitionTombstonesGuardEnabled; + private final boolean areCollectionGuardsDisabled; + + public SortedTableWriter(Builder builder, ILifecycleTransaction txn, SSTable.Owner owner) { super(builder, txn, owner); @@ -118,6 +123,9 @@ public SortedTableWriter(Builder builder, ILifecycleTransaction txn, handleConstructionFailure(ex); throw ex; } + isPartitionSizeGuardEnabled = Guardrails.partitionSize.enabled(); + isPartitionTombstonesGuardEnabled = Guardrails.partitionTombstones.enabled(); + areCollectionGuardsDisabled = !Guardrails.collectionSize.enabled() && !Guardrails.itemsPerCollection.enabled(); } /** @@ -254,10 +262,12 @@ private AbstractRowIndexEntry endPartition(DecoratedKey key, DeletionTime partit long finishResult = partitionWriter.finish(); long endPosition = dataWriter.position(); - // inclusive of last byte long partitionSize = endPosition - partitionWriter.getPartitionStartPosition(); - guardPartitionThreshold(Guardrails.partitionSize, key, partitionSize); - guardPartitionThreshold(Guardrails.partitionTombstones, key, metadataCollector.totalTombstones); + // inclusive of last byte + if (isPartitionSizeGuardEnabled) + guardPartitionThreshold(Guardrails.partitionSize, key, partitionSize); + if (isPartitionTombstonesGuardEnabled) + guardPartitionThreshold(Guardrails.partitionTombstones, key, metadataCollector.totalTombstones); metadataCollector.addPartitionSizeInBytes(partitionSize); metadataCollector.addKey(key.getKey()); metadataCollector.addCellPerPartitionCount(); @@ -267,7 +277,8 @@ private AbstractRowIndexEntry endPartition(DecoratedKey key, DeletionTime partit if (first == null) first = lastWrittenKey; - logger.trace("wrote {} at {}", key, endPosition); + if (logger.isTraceEnabled()) + logger.trace("wrote {} at {}", key, endPosition); return createRowIndexEntry(key, partitionLevelDeletion, finishResult); } @@ -414,6 +425,9 @@ private void guardPartitionThreshold(Threshold guardrail, DecoratedKey key, long private void guardCollectionSize(DecoratedKey partitionKey, Row row) { + if (areCollectionGuardsDisabled) + return; + if (!Guardrails.collectionSize.enabled() && !Guardrails.itemsPerCollection.enabled()) return; diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 4f04eee10028..f343b1d8f95b 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -294,7 +294,8 @@ public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd, Byt } long indexEnd = writer.position(); - logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); + if (logger.isTraceEnabled()) + logger.trace("wrote index entry: {} at {}", indexEntry, indexStart); summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd); } From b84633f042bd1141cc867e938b434436b3ffd1f7 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 15:57:41 +0000 Subject: [PATCH 05/12] add fast return for BTreeRow.hasComplexDeletion, avoid column.name.bytes.hashCode if not needed, avoid capturing lambda allocation in UnfilteredSerializer.serializeRowBody --- .../apache/cassandra/db/rows/BTreeRow.java | 5 +- .../db/rows/SerializationHelper.java | 10 ++++ .../db/rows/UnfilteredSerializer.java | 46 ++++++++++--------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 8bccda8aa77d..842d6ef1f5d5 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -353,7 +353,8 @@ public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setA if (!inclusionTester.test(column)) return null; - DroppedColumn dropped = droppedColumns.get(column.name.bytes); + // we check isEmpty here to avoid bytes.hashCode calculation if it is not needed + DroppedColumn dropped = droppedColumns.isEmpty() ? null : droppedColumns.get(column.name.bytes); if (column.isComplex()) return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped, rowLiveness); @@ -401,6 +402,8 @@ public boolean hasComplex() public boolean hasComplexDeletion() { + if (minLocalDeletionTime == Cell.MAX_DELETION_TIME || !hasComplex()) + return false; long result = accumulate((cd, v) -> ((ComplexColumnData) cd).complexDeletion().isLive() ? 0 : STOP_SENTINEL_VALUE, COLUMN_COMPARATOR, isStatic() ? FIRST_COMPLEX_STATIC : FIRST_COMPLEX_REGULAR, 0L); return result == STOP_SENTINEL_VALUE; diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java index dca4240dd02f..710baa195596 100644 --- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java +++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java @@ -18,7 +18,9 @@ package org.apache.cassandra.db.rows; +import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTreeSearchIterator; @@ -29,6 +31,14 @@ public class SerializationHelper private BTreeSearchIterator statics = null; private BTreeSearchIterator regulars = null; + // reusable fields to avoid extra allocation during cells processing + // within org.apache.cassandra.db.rows.UnfilteredSerializer.serializeRowBody + int flags; + LivenessInfo pkLiveness; + + DataOutputPlus out; + SearchIterator si; + public SerializationHelper(SerializationHeader header) { this.header = header; diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index ca1edfdbaf22..40434d2824cb 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -235,29 +235,13 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da SearchIterator si = helper.iterator(isStatic); + helper.flags = flags; + helper.pkLiveness = pkLiveness; + helper.out = out; + helper.si = si; try { - row.apply(cd -> { - // We can obtain the column for data directly from data.column(). However, if the cell/complex data - // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, - // and if that type have been recently altered, that may not be the type we want to serialize the column - // with. So we use the ColumnMetadata from the "header" which is "current". Also see #11810 for what - // happens if we don't do that. - ColumnMetadata column = si.next(cd.column()); - assert column != null : cd.column.toString(); - - try - { - if (cd.column.isSimple()) - Cell.serializer.serialize((Cell) cd, column, out, pkLiveness, header); - else - writeComplexColumn((ComplexColumnData) cd, column, hasComplexDeletion(flags), pkLiveness, header, out); - } - catch (IOException e) - { - throw new WrappedException(e); - } - }); + row.apply(UnfilteredSerializer::serializeColumnData, helper); } catch (WrappedException e) { @@ -268,7 +252,25 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da } } - private void writeComplexColumn(ComplexColumnData data, ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out) + private static void serializeColumnData(SerializationHelper helper, ColumnData cd) + { + ColumnMetadata column = helper.si.next(cd.column()); + assert column != null : cd.column.toString(); + + try + { + if (cd.column.isSimple()) + Cell.serializer.serialize((Cell) cd, column, helper.out, helper.pkLiveness, helper.header); + else + writeComplexColumn((ComplexColumnData) cd, column, hasComplexDeletion(helper.flags), helper.pkLiveness, helper.header, helper.out); + } + catch (IOException e) + { + throw new WrappedException(e); + } + } + + private static void writeComplexColumn(ComplexColumnData data, ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out) throws IOException { if (hasComplexDeletion) From 1f934f613f74fda73b1a42032fa814d3990480a1 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 18:13:38 +0000 Subject: [PATCH 06/12] reduce allocations during serialization of NativeClustering --- .../apache/cassandra/db/ClusteringPrefix.java | 29 ++++++---- .../apache/cassandra/db/NativeClustering.java | 54 ++++++++++++++++++- .../cassandra/db/marshal/AbstractType.java | 27 ++++++++++ .../db/marshal/IndexedValueHolder.java | 32 +++++++++++ .../cassandra/db/marshal/NativeAccessor.java | 7 +++ .../cassandra/db/marshal/ValueAccessor.java | 18 +++++++ 6 files changed, 156 insertions(+), 11 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index e6e3c76c4320..3898ff265780 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -258,6 +258,23 @@ default boolean isEmpty() */ public V get(int i); + + default void writeValue(AbstractType type, int i, DataOutputPlus out) throws IOException + { + V v = get(i); + if (v != null && !isEmpty(i)) + type.writeValue(v, accessor(), out); + } + + default long writtenLength(AbstractType type, int i) + { + V v = get(i); + if (v == null || isEmpty(i)) + return 0; + + return type.writtenLength(v, accessor()); + } + /** * The method is introduced to allow to avoid a value object retrieval/allocation for simple checks */ @@ -476,7 +493,6 @@ void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPl { int offset = 0; int clusteringSize = clustering.size(); - ValueAccessor accessor = clustering.accessor(); // serialize in batches of 32, to avoid garbage when deserializing headers while (offset < clusteringSize) { @@ -488,9 +504,7 @@ void serializeValuesWithoutSize(ClusteringPrefix clustering, DataOutputPl out.writeUnsignedVInt(makeHeader(clustering, offset, limit)); while (offset < limit) { - V v = clustering.get(offset); - if (v != null && !accessor.isEmpty(v)) - types.get(offset).writeValue(v, accessor, out); + clustering.writeValue(types.get(offset), offset, out); offset++; } } @@ -507,14 +521,9 @@ long valuesWithoutSizeSerializedSize(ClusteringPrefix clustering, int ver result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, offset, limit)); offset = limit; } - ValueAccessor accessor = clustering.accessor(); for (int i = 0; i < clusteringSize; i++) { - V v = clustering.get(i); - if (v == null || accessor.isEmpty(v)) - continue; // handled in the header - - result += types.get(i).writtenLength(v, accessor); + result += clustering.writtenLength(types.get(i), i); } return result; } diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java b/src/java/org/apache/cassandra/db/NativeClustering.java index 45cfbb3b35d5..9b4b27ab63c1 100644 --- a/src/java/org/apache/cassandra/db/NativeClustering.java +++ b/src/java/org/apache/cassandra/db/NativeClustering.java @@ -18,14 +18,18 @@ */ package org.apache.cassandra.db; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AddressBasedNativeData; import org.apache.cassandra.db.marshal.ByteBufferAccessor; +import org.apache.cassandra.db.marshal.IndexedValueHolder; import org.apache.cassandra.db.marshal.NativeAccessor; import org.apache.cassandra.db.marshal.NativeData; import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -34,7 +38,7 @@ import org.apache.cassandra.utils.memory.MemoryUtil; import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil; -public class NativeClustering implements Clustering +public class NativeClustering implements Clustering, IndexedValueHolder { private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering()); @@ -118,11 +122,59 @@ public NativeData get(int i) return buildDataObject(i, AddressBasedNativeData::new); } + public void writeValue(AbstractType type, int i, DataOutputPlus out) throws IOException + { + if (!isEmpty(i)) // is null is checked as a part of isEmpty + type.writeValue(this, i, NativeAccessor.instance, out); + } + + public long writtenLength(AbstractType type, int i) + { + if (isEmpty(i)) // is null is checked as a part of isEmpty + return 0; + + return type.writtenLength(this, i, NativeAccessor.instance); + } + + @Override + public int size(int i) + { + int size = size(); + if (isNull(peer, size, i)) + return 0; + + int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2); + int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2); + return (endOffset - startOffset); + } + public boolean isNull(int i) { return isNull(peer, size(), i); } + @Override + public void write(int i, DataOutputPlus out) throws IOException + { + int size = size(); + if (i >= size) + throw new IndexOutOfBoundsException(); + + int metadataSize = (size * 2) + 4; + int bitmapSize = ((size + 7) >>> 3); + long bitmapStart = peer + metadataSize; + int b = NativeEndianMemoryUtil.getByte(bitmapStart + (i >>> 3)); + if ((b & (1 << (i & 7))) != 0) + return; + + int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2); + int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2); + + long address = bitmapStart + bitmapSize + startOffset; + int length = endOffset - startOffset; + out.writeMemory(address, length); + } + private static boolean isNull(long peer, int size, int i) { if (i >= size) diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index 42190e0c2e84..e411e4040cba 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -585,6 +585,25 @@ public void writeValue(V value, ValueAccessor accessor, DataOutputPlus o } } + public void writeValue(IndexedValueHolder valueHolder, int i, ValueAccessor accessor, DataOutputPlus out) throws IOException + { + assert !valueHolder.isNull(i) : "bytes should not be null for type " + this; + int expectedValueLength = valueLengthIfFixed(); + if (expectedValueLength >= 0) + { + int actualValueLength = valueHolder.size(i); + if (actualValueLength == expectedValueLength) + accessor.write(valueHolder, i, out); + else + throw new IOException(String.format("Expected exactly %d bytes, but was %d", + expectedValueLength, actualValueLength)); + } + else + {; + accessor.writeWithVIntLength(valueHolder, i, out); + } + } + public long writtenLength(ByteBuffer value) { return writtenLength(value, ByteBufferAccessor.instance); @@ -598,6 +617,14 @@ public long writtenLength(V value, ValueAccessor accessor) : accessor.sizeWithVIntLength(value); } + public long writtenLength(IndexedValueHolder valueHolder, int i, ValueAccessor accessor) + { + assert !valueHolder.isNull(i) : "bytes should not be null for type " + this; + return valueLengthIfFixed() >= 0 + ? valueHolder.size(i) // if the size is wrong, this will be detected in writeValue + : accessor.sizeWithVIntLength(valueHolder, i); + } + public ByteBuffer readBuffer(DataInputPlus in) throws IOException { return readBuffer(in, Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java b/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java new file mode 100644 index 000000000000..0aefaf267a97 --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.marshal; + +import java.io.IOException; + +import org.apache.cassandra.io.util.DataOutputPlus; + +public interface IndexedValueHolder +{ + V get(int i); + int size(int i); + boolean isEmpty(int i); + boolean isNull(int i); + void write(int i, DataOutputPlus out) throws IOException; +} diff --git a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java index 70d73041de1b..fd5442f82d24 100644 --- a/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java +++ b/src/java/org/apache/cassandra/db/marshal/NativeAccessor.java @@ -70,6 +70,13 @@ public void write(NativeData sourceValue, DataOutputPlus out) throws IOException out.writeMemory(sourceValue.getAddress(), sourceValue.nativeDataSize()); } + public void writeWithVIntLength(IndexedValueHolder valueHolder, int i, DataOutputPlus out) throws IOException + { + int size = valueHolder.size(i); + out.writeUnsignedVInt32(size); + valueHolder.write(i, out); + } + @Override public ByteBuffer toBuffer(NativeData value) { diff --git a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java index 4916c7326223..8b1613e429e1 100644 --- a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java +++ b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java @@ -120,6 +120,12 @@ default int sizeWithVIntLength(V value) return TypeSizes.sizeofUnsignedVInt(size) + size; } + default int sizeWithVIntLength(IndexedValueHolder valueHolder, int i) + { + int size = valueHolder.size(i); + return TypeSizes.sizeofUnsignedVInt(size) + size; + } + /** serialized size including a short length prefix */ default int sizeWithShortLength(V value) { @@ -168,12 +174,24 @@ default boolean isEmptyFromOffset(V value, int offset) */ void write(V value, DataOutputPlus out) throws IOException; + default void write(IndexedValueHolder valueHolder, int i, DataOutputPlus out) throws IOException + { + write(valueHolder.get(i), out); + } + + default void writeWithVIntLength(V value, DataOutputPlus out) throws IOException { out.writeUnsignedVInt32(size(value)); write(value, out); } + default void writeWithVIntLength(IndexedValueHolder valueHolder, int i, DataOutputPlus out) throws IOException + { + out.writeUnsignedVInt32(valueHolder.size(i)); + write(valueHolder.get(i), out); + } + /** * Write the contents of the given value into the ByteBuffer */ From f6c74c2559a93e80614fc68c43b5435bbfe80b79 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 18:47:22 +0000 Subject: [PATCH 07/12] do not re-map colums in serializeRowBody if they haven't changed --- .../cassandra/db/SerializationHeader.java | 37 ++++++++++++++++++- .../memtable/AbstractAllocatorMemtable.java | 4 ++ .../cassandra/db/memtable/Flushing.java | 3 +- .../cassandra/db/memtable/Memtable.java | 10 +++++ .../cassandra/db/memtable/TrieMemtable.java | 7 ++++ .../db/rows/UnfilteredSerializer.java | 4 +- .../io/sstable/SimpleSSTableMultiWriter.java | 2 +- 7 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 61d87c847c55..b0e58373b943 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -56,12 +56,25 @@ public class SerializationHeader private final Map> typeMap; + private final boolean columnsMayChanged; + private SerializationHeader(boolean isForSSTable, AbstractType keyType, List> clusteringTypes, RegularAndStaticColumns columns, EncodingStats stats, Map> typeMap) + { + this(isForSSTable, keyType, clusteringTypes, columns, stats, typeMap, true); + } + + private SerializationHeader(boolean isForSSTable, + AbstractType keyType, + List> clusteringTypes, + RegularAndStaticColumns columns, + EncodingStats stats, + Map> typeMap, + boolean columnsMayChanged) { this.isForSSTable = isForSSTable; this.keyType = keyType; @@ -69,6 +82,7 @@ private SerializationHeader(boolean isForSSTable, this.columns = columns; this.stats = stats; this.typeMap = typeMap; + this.columnsMayChanged = columnsMayChanged; } public static SerializationHeader makeWithoutStats(TableMetadata metadata) @@ -111,6 +125,21 @@ private static Collection orderByDescendingGeneration(Collection< return readers; } + public SerializationHeader(boolean isForSSTable, + TableMetadata metadata, + RegularAndStaticColumns columns, + EncodingStats stats, + boolean columnsMayChanged) + { + this(isForSSTable, + metadata.partitionKeyType, + metadata.comparator.subtypes(), + columns, + stats, + null, + columnsMayChanged); + } + public SerializationHeader(boolean isForSSTable, TableMetadata metadata, RegularAndStaticColumns columns, @@ -121,7 +150,8 @@ public SerializationHeader(boolean isForSSTable, metadata.comparator.subtypes(), columns, stats, - null); + null, + true); } public RegularAndStaticColumns columns() @@ -139,6 +169,11 @@ public boolean isForSSTable() return isForSSTable; } + public boolean columnsMayChanged() + { + return columnsMayChanged; + } + public EncodingStats stats() { return stats; diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java index 2dbe41374f09..9cd79d597725 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractAllocatorMemtable.java @@ -31,6 +31,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; @@ -75,6 +76,8 @@ public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithComm private final long creationNano = Clock.Global.nanoTime(); + protected final RegularAndStaticColumns columnsOnCreation; + @VisibleForTesting static MemtablePool createMemtableAllocatorPool() { @@ -120,6 +123,7 @@ public AbstractAllocatorMemtable(AtomicReference commitLogLow this.initialComparator = metadata.get().comparator; this.initialFactory = metadata().params.memtable.factory(); this.owner = owner; + this.columnsOnCreation = metadata().regularAndStaticColumns(); scheduleFlush(); } diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java b/src/java/org/apache/cassandra/db/memtable/Flushing.java index 054f50a362a1..80dfcb87f510 100644 --- a/src/java/org/apache/cassandra/db/memtable/Flushing.java +++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java @@ -245,7 +245,8 @@ public static SSTableMultiWriter createFlushWriter(ColumnFamilyStore cfs, new SerializationHeader(true, flushSet.metadata(), flushSet.columns(), - flushSet.encodingStats()), + flushSet.encodingStats(), + flushSet.columnsChangedAfterCreation()), txn); } } diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java b/src/java/org/apache/cassandra/db/memtable/Memtable.java index 4d665b8bf946..a111ef7bc2df 100644 --- a/src/java/org/apache/cassandra/db/memtable/Memtable.java +++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java @@ -226,6 +226,11 @@ default long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Grou */ TableMetadata metadata(); + default boolean columnsChangedAfterCreation() + { + return true; + } + // Memory usage tracking @@ -337,6 +342,11 @@ default TableMetadata metadata() return memtable().metadata(); } + default boolean columnsChangedAfterCreation() + { + return true; + } + default boolean isEmpty() { return partitionCount() > 0; diff --git a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java index 66b1e8cb3aab..669c0f625886 100644 --- a/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/TrieMemtable.java @@ -488,6 +488,13 @@ public TableMetadata metadata() { return tableMetadata; } + + + @Override + public boolean columnsChangedAfterCreation() + { + return !TrieMemtable.this.columnsOnCreation.equals(tableMetadata.regularAndStaticColumns()); + } }; } diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 40434d2824cb..857e227ec137 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -233,7 +233,7 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da if ((flags & HAS_ALL_COLUMNS) == 0) Columns.serializer.serializeSubset(row.columns(), headerColumns, out); - SearchIterator si = helper.iterator(isStatic); + SearchIterator si = helper.header.columnsMayChanged() ? helper.iterator(isStatic) : null; helper.flags = flags; helper.pkLiveness = pkLiveness; @@ -254,7 +254,7 @@ private void serializeRowBody(Row row, int flags, SerializationHelper helper, Da private static void serializeColumnData(SerializationHelper helper, ColumnData cd) { - ColumnMetadata column = helper.si.next(cd.column()); + ColumnMetadata column = helper.header.columnsMayChanged() ? helper.si.next(cd.column()) : cd.column(); assert column != null : cd.column.toString(); try diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java index 4b7d046a84f4..482b4cf071a8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java @@ -83,7 +83,7 @@ public long getOnDiskBytesWritten() public long getTotalRows() { - return writer.getEstimatedOnDiskBytesWritten(); + return writer.getTotalRows(); } public TableId getTableId() From f6e09b7438a2483f54f63d56aaf2e32856f4e048 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 19:19:39 +0000 Subject: [PATCH 08/12] add flushing iterator without columns filtering --- src/java/org/apache/cassandra/db/memtable/Flushing.java | 2 +- .../cassandra/db/partitions/AbstractBTreePartition.java | 6 ++++++ src/java/org/apache/cassandra/db/partitions/Partition.java | 3 +++ .../org/apache/cassandra/db/partitions/PartitionUpdate.java | 5 +++++ 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java b/src/java/org/apache/cassandra/db/memtable/Flushing.java index 80dfcb87f510..cf3032e76c1b 100644 --- a/src/java/org/apache/cassandra/db/memtable/Flushing.java +++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java @@ -172,7 +172,7 @@ private void writeSortedContents() if (!partition.isEmpty()) { - try (UnfilteredRowIterator iter = partition.unfilteredIterator()) + try (UnfilteredRowIterator iter = partition.flushingIterator(toFlush.metadata())) { writer.append(iter); } diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index e4a653bec935..268a2c25ea59 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -171,6 +171,12 @@ public UnfilteredRowIterator unfilteredIterator() return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); } + public UnfilteredRowIterator flushingIterator(TableMetadata tableMetadata) + { + return unfilteredIterator(ColumnFilter.all(columns()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) { return unfilteredIterator(holder(), selection, slices, reversed); diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java index 601934a8e714..3e0733ec02ab 100644 --- a/src/java/org/apache/cassandra/db/partitions/Partition.java +++ b/src/java/org/apache/cassandra/db/partitions/Partition.java @@ -68,6 +68,9 @@ public interface Partition */ public UnfilteredRowIterator unfilteredIterator(); + public UnfilteredRowIterator flushingIterator(TableMetadata tableMetadata); + + /** * Returns an UnfilteredRowIterator over the rows/RT contained by this partition * selected by the provided slices. diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 3da60b3eb5da..4be8ddb32b57 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -291,6 +291,11 @@ public UnfilteredRowIterator unfilteredIterator() return unfilteredIterator(ColumnFilter.SelectionColumnFilter.all(columns()), Slices.ALL, false); } + public UnfilteredRowIterator flushingIterator(TableMetadata tableMetadata) + { + throw new UnsupportedOperationException("Flushing iterator should be used only to flush memtable partitions"); + } + public PartitionUpdate withOnlyPresentColumns() { From acebda4bea855cb1d60e54a205b2606e6508dccd Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 20:19:30 +0000 Subject: [PATCH 09/12] improve cell methods inlining and try to use monomorphic calls --- .../cassandra/db/rows/AbstractCell.java | 2 +- .../org/apache/cassandra/db/rows/Cell.java | 5 ++++ .../sstable/metadata/MetadataCollector.java | 26 ++++++++++++++++--- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index 61c3d32be3e0..cdd3a86798ba 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -55,7 +55,7 @@ public boolean isCounterCell() public boolean isLive(long nowInSec) { - return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime()); + return isLive(nowInSec, localDeletionTime(), ttl()); } public boolean isTombstone() diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index b09f5485ed26..1d3a652953a6 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -177,6 +177,11 @@ public long localDeletionTime() */ public abstract boolean isLive(long nowInSec); + public final boolean isLive(long nowInSec, long localDeletionTime, int ttl) + { + return localDeletionTime == NO_DELETION_TIME || ttl != NO_TTL && nowInSec < localDeletionTime; + } + /** * For cells belonging to complex types (non-frozen collection and UDT), the * path to the cell. diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index ee9082c68919..ab01ff2505e4 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; +import org.apache.cassandra.db.rows.ArrayCell; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.io.sstable.SSTable; @@ -245,10 +246,27 @@ public void update(LivenessInfo newInfo) public void update(Cell cell) { ++currentPartitionCells; - updateTimestamp(cell.timestamp()); - updateTTL(cell.ttl()); - updateLocalDeletionTime(cell.localDeletionTime()); - if (!cell.isLive(nowInSec)) + long timestamp; + int ttl; + long localDeletionTime; + if (cell.getClass() == ArrayCell.class) + { + timestamp = cell.timestamp(); + ttl = cell.ttl(); + localDeletionTime = cell.localDeletionTime(); + } + else + { + timestamp = cell.timestamp(); + ttl = cell.ttl(); + localDeletionTime = cell.localDeletionTime(); + } + updateTimestamp(timestamp); + updateTTL(ttl); + updateLocalDeletionTime(localDeletionTime); + + // isLive(nowInSec) is not used to avoid additional non-monomorphic calls of Cell methods + if (!cell.isLive(nowInSec, localDeletionTime, ttl)) updateTombstoneCount(); } From 716d6bee2599eea4a483e60bc23a2b8433b59fd7 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Thu, 18 Dec 2025 22:22:25 +0000 Subject: [PATCH 10/12] minor changes in completed flushing log message --- src/java/org/apache/cassandra/db/memtable/Flushing.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/memtable/Flushing.java b/src/java/org/apache/cassandra/db/memtable/Flushing.java index cf3032e76c1b..ebdd09b90bd2 100644 --- a/src/java/org/apache/cassandra/db/memtable/Flushing.java +++ b/src/java/org/apache/cassandra/db/memtable/Flushing.java @@ -203,7 +203,7 @@ private void writeSortedContents() FBUtilities.prettyPrintMemory(bytesFlushed), toFlush.memtable().getFinalCommitLogUpperBound(), durationMs, - bytesFlushed, byteFlushedPerSec, + bytesFlushed, FBUtilities.prettyPrintMemory(byteFlushedPerSec), toFlush.partitionCount(), partitionsPerSec, writer.getTotalRows(), rowsPerSec, startCpuTime < 0 ? "n/a": TimeUnit.NANOSECONDS.toMillis(endCpuTime - startCpuTime), From f8af3e050cc8ed556c122ed47ff38a58c023a621 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Sat, 20 Dec 2025 18:28:35 +0000 Subject: [PATCH 11/12] add columnsChangedAfterCreation implementation for SkipListMemtable and ShardedSkipListMemtable --- .../cassandra/db/memtable/ShardedSkipListMemtable.java | 6 ++++++ .../org/apache/cassandra/db/memtable/SkipListMemtable.java | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java index 8c0897e1d4a4..dc6ce2f0de8d 100644 --- a/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/ShardedSkipListMemtable.java @@ -335,6 +335,12 @@ public TableMetadata metadata() { return tableMetadata; } + + @Override + public boolean columnsChangedAfterCreation() + { + return !ShardedSkipListMemtable.this.columnsOnCreation.equals(tableMetadata.regularAndStaticColumns()); + } }; } diff --git a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java index 9271000f05af..312f680ab940 100644 --- a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java @@ -333,6 +333,12 @@ public TableMetadata metadata() { return tableMetadata; } + + @Override + public boolean columnsChangedAfterCreation() + { + return !SkipListMemtable.this.columnsOnCreation.equals(tableMetadata.regularAndStaticColumns()); + } }; } From dbdb5c8186fab63f1f6074a5c11f51da3b779e20 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Sun, 21 Dec 2025 10:58:09 +0000 Subject: [PATCH 12/12] address review comments --- src/java/org/apache/cassandra/db/marshal/AbstractType.java | 2 +- .../cassandra/io/sstable/metadata/MetadataCollector.java | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index e411e4040cba..a32d676a36d7 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -599,7 +599,7 @@ public void writeValue(IndexedValueHolder valueHolder, int i, ValueAcces expectedValueLength, actualValueLength)); } else - {; + { accessor.writeWithVIntLength(valueHolder, i, out); } } diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index ab01ff2505e4..4efb7d27ae70 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -249,6 +249,10 @@ public void update(Cell cell) long timestamp; int ttl; long localDeletionTime; + // This method may process several implementations of Cell. + // To improve inlining of Cell method calls, we split the call sites. + // This is a very hot path, invoked for every cell (potentially millions of times per second), + // so this micro-optimization is justified. if (cell.getClass() == ArrayCell.class) { timestamp = cell.timestamp();