Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions src/java/org/apache/cassandra/db/compaction/CursorCompactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import java.util.function.LongPredicate;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.AbstractCompactionController;
import org.apache.cassandra.db.ClusteringComparator;
Expand Down Expand Up @@ -70,6 +70,7 @@
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;

import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
Expand Down Expand Up @@ -294,15 +295,8 @@ private CursorCompactor(OperationType type,
* {@link CompactionIterator#CompactionIterator(OperationType, List, AbstractCompactionController, long, TimeUUID, ActiveCompactionsTracker)}
*/

// Convert Readers to Cursors
this.sstableCursors = new StatefulCursor[sstables.size()];
this.sstableCursors = convertScannersToCursors(scanners, sstables, DatabaseDescriptor.getCompactionReadDiskAccessMode());
this.sstableCursorsEqualsNext = new boolean[sstables.size()];
UnmodifiableIterator<SSTableReader> iterator = sstables.iterator();
for (int i = 0; i < this.sstableCursors.length; i++)
{
SSTableReader ssTableReader = iterator.next();
this.sstableCursors[i] = new StatefulCursor(ssTableReader);
}
this.enforceStrictLiveness = controller.cfs.metadata.get().enforceStrictLiveness();

purger = new Purger(type, controller, nowInSec);
Expand Down Expand Up @@ -1553,6 +1547,33 @@ private static String mergeHistogramToString(long[] histogram)
return sb.toString();
}

/**
* Closes scanner-opened readers before opening cursor-specific readers with the configured disk access mode.
* In cursor compaction, scanners are only used for metadata; closing them avoids holding redundant file
* descriptors and prevents conflicts when scan and non-scan readers for the same file share thread-local
* buffer state on the same thread.
*/
private static StatefulCursor[] convertScannersToCursors(List<ISSTableScanner> scanners, ImmutableSet<SSTableReader> sstables,
Comment thread
aweisberg marked this conversation as resolved.
DiskAccessMode diskAccessMode)
{
for (ISSTableScanner scanner : scanners)
scanner.close();

StatefulCursor[] cursors = new StatefulCursor[sstables.size()];
int i = 0;
try
{
for (SSTableReader reader : sstables)
cursors[i++] = new StatefulCursor(reader, diskAccessMode);
return cursors;
}
catch (RuntimeException | Error e)
{
Throwables.closeNonNullAndAddSuppressed(e, cursors);
throw e;
}
}

public void close()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReusableLivenessInfo;
Expand Down Expand Up @@ -55,9 +56,9 @@ class StatefulCursor extends SSTableCursorReader

private boolean isOpenRangeTombstonePresent = false;

public StatefulCursor(SSTableReader reader)
public StatefulCursor(SSTableReader reader, DiskAccessMode diskAccessMode)
{
super(reader);
super(reader, diskAccessMode);
currPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
prevPartition = new PartitionDescriptor(reader.getPartitioner().createReusableKey(0));
unfiltered = new UnfilteredDescriptor(reader.header.clusteringTypes().toArray(AbstractType[]::new));
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/io/sstable/SSTableCursorReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.ImmutableList;

import org.apache.cassandra.config.Config.DiskAccessMode;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.Columns;
import org.apache.cassandra.db.DeletionTime;
Expand Down Expand Up @@ -197,15 +198,20 @@ public static SSTableCursorReader fromDescriptor(Descriptor desc) throws IOExcep
{
TableMetadata metadata = Util.metadataFromSSTable(desc);
SSTableReader reader = SSTableReader.openNoValidation(null, desc, TableMetadataRef.forOfflineTools(metadata));
return new SSTableCursorReader(reader, metadata, reader.ref());
return new SSTableCursorReader(reader, metadata, reader.ref(), null);
}

public SSTableCursorReader(SSTableReader reader)
{
this(reader, reader.metadata(), null);
this(reader, reader.metadata(), null, null);
}

private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef)
public SSTableCursorReader(SSTableReader reader, DiskAccessMode diskAccessMode)
{
this(reader, reader.metadata(), null, diskAccessMode);
}

private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SSTableReader> readerRef, DiskAccessMode diskAccessMode)
{
ssTableReader = reader;
ssTableReaderRef = readerRef;
Expand All @@ -221,7 +227,7 @@ private SSTableCursorReader(SSTableReader reader, TableMetadata metadata, Ref<SS
deserializationHelper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, null);
serializationHeader = reader.header;

dataReader = reader.openDataReader();
dataReader = reader.openDataReaderForScan(diskAccessMode);
Comment thread
samueldlightfoot marked this conversation as resolved.
hasStaticColumns = metadata.hasStaticColumns();
}

Expand Down
44 changes: 31 additions & 13 deletions src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.Nullable;

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1417,44 +1419,60 @@ public StatsMetadata getSSTableMetadata()
return sstableMetadata;
}

public RandomAccessReader openDataReader()
Comment thread
samueldlightfoot marked this conversation as resolved.
{
return openDataReaderInternal(null, null, false);
}

public RandomAccessReader openDataReader(RateLimiter limiter)
{
assert limiter != null;
return dfile.createReader(limiter);
return openDataReaderInternal(null, limiter, false);
}

public RandomAccessReader openDataReader()
public RandomAccessReader openDataReader(DiskAccessMode diskAccessMode)
{
return dfile.createReader();
return openDataReaderInternal(diskAccessMode, null, false);
}

public RandomAccessReader openDataReaderForScan()
{
return openDataReaderForScan(dfile.diskAccessMode());
return openDataReaderInternal(null, null, true);
}

public RandomAccessReader openDataReaderForScan(DiskAccessMode diskAccessMode)
{
boolean isSameDiskAccessMode = diskAccessMode == dfile.diskAccessMode();
boolean isDirectIONotSupported = diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO();
return openDataReaderInternal(diskAccessMode, null, true);
}

if (isSameDiskAccessMode || isDirectIONotSupported)
return dfile.createReaderForScan(OnReaderClose.RETAIN_FILE_OPEN);
private RandomAccessReader openDataReaderInternal(@Nullable DiskAccessMode diskAccessMode,
Comment thread
samueldlightfoot marked this conversation as resolved.
@Nullable RateLimiter limiter,
boolean forScan)
{
if (canReuseDfile(diskAccessMode))
return dfile.createReader(limiter, forScan, OnReaderClose.RETAIN_FILE_OPEN);

FileHandle dataFile = dfile.toBuilder()
.withDiskAccessMode(diskAccessMode)
.complete();
FileHandle handle = dfile.toBuilder()
.withDiskAccessMode(diskAccessMode)
.complete();
try
{
return dataFile.createReaderForScan(OnReaderClose.CLOSE_FILE);
return handle.createReader(limiter, forScan, OnReaderClose.CLOSE_FILE);
}
catch (Throwable t)
{
dataFile.close();
handle.close();
throw t;
}
}

private boolean canReuseDfile(@Nullable DiskAccessMode diskAccessMode)
{
return diskAccessMode == null
|| diskAccessMode == dfile.diskAccessMode()
|| (diskAccessMode == DiskAccessMode.direct && !dfile.supportsDirectIO());
Comment thread
samueldlightfoot marked this conversation as resolved.
}

public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ public ByteBuffer getBuffer(int size)

private static void cleanBuffer(ByteBuffer buffer)
{
// Aligned buffers are slices; clean the backing buffer (attachment)
DirectBuffer db = (DirectBuffer) buffer;
ByteBuffer attachment = (ByteBuffer) db.attachment();
MemoryUtil.clean(attachment != null ? attachment : buffer);
// Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment)
MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.agrona.BufferUtil;

import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.memory.MemoryUtil;

import sun.nio.ch.DirectBuffer;

public final class DirectThreadLocalReadAheadBuffer extends ThreadLocalReadAheadBuffer
{
Expand All @@ -46,4 +49,12 @@ protected void loadBlock(ByteBuffer blockBuffer, long blockPosition, int sizeToR
if (channel.read(blockBuffer, blockPosition) < sizeToRead)
throw new CorruptSSTableException(null, channel.filePath());
}
}

@Override
protected void cleanBuffer(ByteBuffer buffer)
Comment thread
samueldlightfoot marked this conversation as resolved.
{
// Aligned buffers from BufferUtil.allocateDirectAligned are slices; clean the backing buffer (attachment)
MemoryUtil.clean((ByteBuffer) ((DirectBuffer) buffer).attachment());
}

Comment thread
samueldlightfoot marked this conversation as resolved.
}
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/io/util/FileHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,6 @@ public RandomAccessReader createReader()
return createReader(null);
}

public RandomAccessReader createReaderForScan(OnReaderClose onReaderClose)
{
return createReader(null, true, onReaderClose);
}

/**
* Create {@link RandomAccessReader} with configured method of reading content of the file.
* Reading from file will be rate limited by given {@link RateLimiter}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,16 @@ public void clear(boolean deallocate)
blockBuffer.clear();
if (deallocate)
{
MemoryUtil.clean(blockBuffer);
cleanBuffer(blockBuffer);
block.buffer = null;
}
}

protected void cleanBuffer(ByteBuffer buffer)
{
MemoryUtil.clean(buffer);
}

@Override
public void close()
{
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/utils/memory/BufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,11 @@ void unsafeFree()
if (parent != null)
parent.free(slab);
else
MemoryUtil.clean(slab);
{
// slab may be an aligned slice from allocateDirectAligned(); clean the root allocation
ByteBuffer attachment = (ByteBuffer) ((DirectBuffer) slab).attachment();
MemoryUtil.clean(attachment != null ? attachment : slab);
}
}

static void unsafeRecycle(Chunk chunk)
Expand Down
47 changes: 44 additions & 3 deletions src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import com.sun.jna.Native;

import org.apache.cassandra.utils.concurrent.Ref;

import jdk.internal.ref.Cleaner;
import sun.misc.Unsafe;
import sun.nio.ch.DirectBuffer;

Expand Down Expand Up @@ -330,9 +333,47 @@ public static void clean(ByteBuffer buffer)
return;

DirectBuffer db = (DirectBuffer) buffer;
if (db.attachment() != null)
return; // duplicate or slice
Cleaner cleaner = db.cleaner();

if (cleaner == null)
{
// No cleaner means this buffer does not own its memory (e.g. slice, duplicate, pool sub-allocation,
// hollow buffer). A non-null attachment confirms it's a view; null attachment means it's a hollow/synthetic
// buffer or was already cleaned.
if (db.attachment() != null)
throw new IllegalArgumentException(
"Cannot clean a buffer with no cleaner and attachment type "
+ db.attachment().getClass().getName() + "; this buffer does not own its memory. "
+ "For slices/duplicates, resolve to the root allocation before calling clean()");
return;
}

Object attachment = db.attachment();
if (!isSafeAttachment(attachment))
throw new IllegalArgumentException(
"Buffer has a cleaner but an unexpected attachment type "
+ attachment.getClass().getName()
+ "; this may indicate corrupted buffer state");

unsafe.invokeCleaner(buffer);
cleaner.clean();
}

/**
* Allow-list of attachment types expected on buffers that have a cleaner. Any new attachment type set
* on a root allocation via {@link #setAttachment} must be added here with justification.
* <ul>
* <li>{@code null} – normal case for root allocations ({@code ByteBuffer.allocateDirect}) and mmap buffers</li>
* <li>{@link Runnable} – mmap force callback set by {@code ListenableFileSystem} and dispatched
* by {@code SyncUtil.force()} to simulate mmap flush behaviour in tests</li>
* <li>{@link Ref.DirectBufferRef} – reference tracking metadata set on root allocations by
* {@code MerkleTree} when {@code Ref.TRACE_ENABLED} is true</li>
* </ul>
*/
private static boolean isSafeAttachment(Object attachment)
{
return attachment == null
|| attachment instanceof Runnable
|| attachment instanceof Ref.DirectBufferRef;
}

}
Loading