Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/java/org/apache/cassandra/db/ClusteringPrefix.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,23 @@ default boolean isEmpty()
*/
public V get(int i);


default void writeValue(AbstractType<?> type, int i, DataOutputPlus out) throws IOException
{
V v = get(i);
if (v != null && !isEmpty(i))
type.writeValue(v, accessor(), out);
}

default long writtenLength(AbstractType<?> type, int i)
{
V v = get(i);
if (v == null || isEmpty(i))
return 0;

return type.writtenLength(v, accessor());
}

/**
* The method is introduced to allow to avoid a value object retrieval/allocation for simple checks
*/
Expand Down Expand Up @@ -476,7 +493,6 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
{
int offset = 0;
int clusteringSize = clustering.size();
ValueAccessor<V> accessor = clustering.accessor();
// serialize in batches of 32, to avoid garbage when deserializing headers
while (offset < clusteringSize)
{
Expand All @@ -488,9 +504,7 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
out.writeUnsignedVInt(makeHeader(clustering, offset, limit));
while (offset < limit)
{
V v = clustering.get(offset);
if (v != null && !accessor.isEmpty(v))
types.get(offset).writeValue(v, accessor, out);
clustering.writeValue(types.get(offset), offset, out);
offset++;
}
}
Expand All @@ -507,14 +521,9 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, offset, limit));
offset = limit;
}
ValueAccessor<V> accessor = clustering.accessor();
for (int i = 0; i < clusteringSize; i++)
{
V v = clustering.get(i);
if (v == null || accessor.isEmpty(v))
continue; // handled in the header

result += types.get(i).writtenLength(v, accessor);
result += clustering.writtenLength(types.get(i), i);
}
return result;
}
Expand Down
54 changes: 53 additions & 1 deletion src/java/org/apache/cassandra/db/NativeClustering.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.cassandra.db;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AddressBasedNativeData;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.IndexedValueHolder;
import org.apache.cassandra.db.marshal.NativeAccessor;
import org.apache.cassandra.db.marshal.NativeData;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.OpOrder;
Expand All @@ -34,7 +38,7 @@
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;

public class NativeClustering implements Clustering<NativeData>
public class NativeClustering implements Clustering<NativeData>, IndexedValueHolder<NativeData>
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering());

Expand Down Expand Up @@ -118,11 +122,59 @@ public NativeData get(int i)
return buildDataObject(i, AddressBasedNativeData::new);
}

public void writeValue(AbstractType<?> type, int i, DataOutputPlus out) throws IOException
{
if (!isEmpty(i)) // is null is checked as a part of isEmpty
type.writeValue(this, i, NativeAccessor.instance, out);
}

public long writtenLength(AbstractType<?> type, int i)
{
if (isEmpty(i)) // is null is checked as a part of isEmpty
return 0;

return type.writtenLength(this, i, NativeAccessor.instance);
}

@Override
public int size(int i)
{
int size = size();
if (isNull(peer, size, i))
return 0;

int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2);
int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2);
return (endOffset - startOffset);
}

public boolean isNull(int i)
{
return isNull(peer, size(), i);
}

@Override
public void write(int i, DataOutputPlus out) throws IOException
{
int size = size();
if (i >= size)
throw new IndexOutOfBoundsException();

int metadataSize = (size * 2) + 4;
int bitmapSize = ((size + 7) >>> 3);
long bitmapStart = peer + metadataSize;
int b = NativeEndianMemoryUtil.getByte(bitmapStart + (i >>> 3));
if ((b & (1 << (i & 7))) != 0)
return;

int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2);
int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2);

long address = bitmapStart + bitmapSize + startOffset;
int length = endOffset - startOffset;
out.writeMemory(address, length);
}

private static boolean isNull(long peer, int size, int i)
{
if (i >= size)
Expand Down
37 changes: 36 additions & 1 deletion src/java/org/apache/cassandra/db/SerializationHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,33 @@ public class SerializationHeader

private final Map<ByteBuffer, AbstractType<?>> typeMap;

private final boolean columnsMayChanged;

private SerializationHeader(boolean isForSSTable,
AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
RegularAndStaticColumns columns,
EncodingStats stats,
Map<ByteBuffer, AbstractType<?>> typeMap)
{
this(isForSSTable, keyType, clusteringTypes, columns, stats, typeMap, true);
}

private SerializationHeader(boolean isForSSTable,
AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
RegularAndStaticColumns columns,
EncodingStats stats,
Map<ByteBuffer, AbstractType<?>> typeMap,
boolean columnsMayChanged)
{
this.isForSSTable = isForSSTable;
this.keyType = keyType;
this.clusteringTypes = clusteringTypes;
this.columns = columns;
this.stats = stats;
this.typeMap = typeMap;
this.columnsMayChanged = columnsMayChanged;
}

public static SerializationHeader makeWithoutStats(TableMetadata metadata)
Expand Down Expand Up @@ -111,6 +125,21 @@ private static Collection<SSTableReader> orderByDescendingGeneration(Collection<
return readers;
}

public SerializationHeader(boolean isForSSTable,
TableMetadata metadata,
RegularAndStaticColumns columns,
EncodingStats stats,
boolean columnsMayChanged)
{
this(isForSSTable,
metadata.partitionKeyType,
metadata.comparator.subtypes(),
columns,
stats,
null,
columnsMayChanged);
}

public SerializationHeader(boolean isForSSTable,
TableMetadata metadata,
RegularAndStaticColumns columns,
Expand All @@ -121,7 +150,8 @@ public SerializationHeader(boolean isForSSTable,
metadata.comparator.subtypes(),
columns,
stats,
null);
null,
true);
}

public RegularAndStaticColumns columns()
Expand All @@ -139,6 +169,11 @@ public boolean isForSSTable()
return isForSSTable;
}

public boolean columnsMayChanged()
{
return columnsMayChanged;
}

public EncodingStats stats()
{
return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ public long getOnDiskBytesWritten()
return bytesWritten;
}

public long getTotalRows()
{
long totalRows = 0;
for (int i = 0; i <= currentWriter; ++i)
totalRows += writers[i].getTotalRows();
return totalRows;
}

@Override
public TableId getTableId()
{
Expand Down
27 changes: 27 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/AbstractType.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,25 @@ public <V> void writeValue(V value, ValueAccessor<V> accessor, DataOutputPlus o
}
}

public <V> void writeValue(IndexedValueHolder<V> valueHolder, int i, ValueAccessor<V> accessor, DataOutputPlus out) throws IOException
{
assert !valueHolder.isNull(i) : "bytes should not be null for type " + this;
int expectedValueLength = valueLengthIfFixed();
if (expectedValueLength >= 0)
{
int actualValueLength = valueHolder.size(i);
if (actualValueLength == expectedValueLength)
accessor.write(valueHolder, i, out);
else
throw new IOException(String.format("Expected exactly %d bytes, but was %d",
expectedValueLength, actualValueLength));
}
else
{
accessor.writeWithVIntLength(valueHolder, i, out);
}
}

public long writtenLength(ByteBuffer value)
{
return writtenLength(value, ByteBufferAccessor.instance);
Expand All @@ -598,6 +617,14 @@ public <V> long writtenLength(V value, ValueAccessor<V> accessor)
: accessor.sizeWithVIntLength(value);
}

public <V> long writtenLength(IndexedValueHolder<V> valueHolder, int i, ValueAccessor<V> accessor)
{
assert !valueHolder.isNull(i) : "bytes should not be null for type " + this;
return valueLengthIfFixed() >= 0
? valueHolder.size(i) // if the size is wrong, this will be detected in writeValue
: accessor.sizeWithVIntLength(valueHolder, i);
}

public ByteBuffer readBuffer(DataInputPlus in) throws IOException
{
return readBuffer(in, Integer.MAX_VALUE);
Expand Down
32 changes: 32 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/IndexedValueHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.marshal;

import java.io.IOException;

import org.apache.cassandra.io.util.DataOutputPlus;

public interface IndexedValueHolder<V>
{
V get(int i);
int size(int i);
boolean isEmpty(int i);
boolean isNull(int i);
void write(int i, DataOutputPlus out) throws IOException;
}
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/NativeAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ public void write(NativeData sourceValue, DataOutputPlus out) throws IOException
out.writeMemory(sourceValue.getAddress(), sourceValue.nativeDataSize());
}

public void writeWithVIntLength(IndexedValueHolder<NativeData> valueHolder, int i, DataOutputPlus out) throws IOException
{
int size = valueHolder.size(i);
out.writeUnsignedVInt32(size);
valueHolder.write(i, out);
}

@Override
public ByteBuffer toBuffer(NativeData value)
{
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ default int sizeWithVIntLength(V value)
return TypeSizes.sizeofUnsignedVInt(size) + size;
}

default int sizeWithVIntLength(IndexedValueHolder<V> valueHolder, int i)
{
int size = valueHolder.size(i);
return TypeSizes.sizeofUnsignedVInt(size) + size;
}

/** serialized size including a short length prefix */
default int sizeWithShortLength(V value)
{
Expand Down Expand Up @@ -168,12 +174,24 @@ default boolean isEmptyFromOffset(V value, int offset)
*/
void write(V value, DataOutputPlus out) throws IOException;

default void write(IndexedValueHolder<V> valueHolder, int i, DataOutputPlus out) throws IOException
{
write(valueHolder.get(i), out);
}


default void writeWithVIntLength(V value, DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(size(value));
write(value, out);
}

default void writeWithVIntLength(IndexedValueHolder<V> valueHolder, int i, DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(valueHolder.size(i));
write(valueHolder.get(i), out);
}

/**
* Write the contents of the given value into the ByteBuffer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
Expand Down Expand Up @@ -75,6 +76,8 @@ public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithComm

private final long creationNano = Clock.Global.nanoTime();

protected final RegularAndStaticColumns columnsOnCreation;

@VisibleForTesting
static MemtablePool createMemtableAllocatorPool()
{
Expand Down Expand Up @@ -120,6 +123,7 @@ public AbstractAllocatorMemtable(AtomicReference<CommitLogPosition> commitLogLow
this.initialComparator = metadata.get().comparator;
this.initialFactory = metadata().params.memtable.factory();
this.owner = owner;
this.columnsOnCreation = metadata().regularAndStaticColumns();
scheduleFlush();
}

Expand Down
Loading