Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/cassandra_latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2529,3 +2529,6 @@ storage_compatibility_mode: NONE
# # especially in keyspaces with many tables. The splitter avoids batching tables together if they
# # exceed other configuration parameters like bytes_per_assignment or partitions_per_assignment.
# max_tables_per_assignment: 64
#compression_service_options:
# configurations:
# DeflateCompressor: QatDeflateCompressorFactory
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1502,4 +1502,16 @@ public enum CQLStartTime
public boolean enforce_native_deadline_for_hints = false;

public boolean paxos_repair_race_wait = true;

//Retrieves the compression service configurations.
public static class CompressionServiceOptions
{
public Map<String, String> configurations = new HashMap<>();

public CompressionServiceOptions()
{
}
}
public final CompressionServiceOptions compression_service_options = new CompressionServiceOptions();

}
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5979,4 +5979,12 @@ public static void setPartitioner(String name)
{
partitioner = FBUtilities.newPartitioner(name);
}

public static Map<String, String> getCompressionServiceOptions()
{
if (conf == null || conf.compression_service_options == null)
return null;
return conf.compression_service_options.configurations;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;

public abstract class AbstractCompressorDecorator implements ICompressor
{
protected ICompressor baseCompressor;
public AbstractCompressorDecorator(ICompressor compressor)
{
this.baseCompressor = compressor;
}

@Override
public int initialCompressedBufferLength(int chunkLength)
{
return baseCompressor.initialCompressedBufferLength(chunkLength);
}

@Override
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
{
return baseCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
}

@Override
public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
baseCompressor.compress(input, output);
}

@Override
public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
baseCompressor.uncompress(input, output);
}

@Override
public BufferType preferredBufferType() {
return baseCompressor.preferredBufferType();
}

@Override
public boolean supports(BufferType bufferType) {
return baseCompressor.supports(bufferType);
}

@Override
public Set<String> supportedOptions() {
return baseCompressor.supportedOptions();
}

}
72 changes: 72 additions & 0 deletions src/java/org/apache/cassandra/io/compress/CompressorDecorator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io.compress;

import java.io.IOException;
import java.nio.ByteBuffer;

public class CompressorDecorator extends AbstractCompressorDecorator
{
private ICompressor pluginCompressor;
public CompressorDecorator(ICompressor baseCompressor, ICompressor pluginCompressor)
{
super(baseCompressor);
this.pluginCompressor = pluginCompressor;
}

@Override
public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException
{
try
{
return pluginCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset);
}
catch(IOException e)
{
return super.uncompress(input, inputOffset, inputLength, output, outputOffset);
}
}

@Override
public void compress(ByteBuffer input, ByteBuffer output) throws IOException
{
try
{
pluginCompressor.compress(input, output);
}
catch(IOException e)
{
super.compress(input, output);
}
}

@Override
public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
try
{
pluginCompressor.uncompress(input, output);
}
catch(IOException e)
{
super.uncompress(input, output);
}
}

}
26 changes: 26 additions & 0 deletions src/java/org/apache/cassandra/io/compress/ICompressorFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.compress;

import java.util.Map;

public interface ICompressorFactory
{
public ICompressor createCompressionProvider(ICompressor compressor, Map<String, String> options);
// String getSupportedCompressorName(); //get supported compressor information from plugin or use yaml file?
}
74 changes: 73 additions & 1 deletion src/java/org/apache/cassandra/schema/CompressionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
Expand All @@ -31,6 +32,7 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
Expand All @@ -40,10 +42,14 @@
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.String.format;

public final class CompressionParams
{
private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class);
public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16;
public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the
// new compression parameter we can't use a
Expand Down Expand Up @@ -75,6 +81,7 @@ public final class CompressionParams
private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch.
private final double minCompressRatio; // In configuration we store min ratio, the input parameter.
private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor
private static ImmutableMap<String, ICompressorFactory> compressionServices;

public static CompressionParams fromMap(Map<String, String> opts)
{
Expand Down Expand Up @@ -280,7 +287,11 @@ private static ICompressor createCompressor(Class<?> compressorClass, Map<String
for (String provided : compressionOptions.keySet())
if (!compressor.supportedOptions().contains(provided))
throw new ConfigurationException("Unknown compression options " + provided);
return compressor;

// Load all the compressor services in the classpath and
// create a decorated compressor, if service providers available
loadServices();
return decorateCompressor(compressor, compressionOptions);
}
catch (NoSuchMethodException e)
{
Expand Down Expand Up @@ -331,6 +342,67 @@ private static Map<String, String> copyOptions(Map<? extends CharSequence, ? ext
return compressionOptions;
}

/**
* Load the services associated with ICompressorFactory and create
* a hashmap with key as in-built compressor which will be decorated
* and value being the serviceprovider factory
*/
private static void loadServices()
{
if(compressionServices != null)
return;
// Read options from configuration file and populate compressionServices map
ImmutableMap.Builder<String, ICompressorFactory> compressionServiceBuilder = ImmutableMap.builder();
Map<String, String> compressionServiceOptions = DatabaseDescriptor.getCompressionServiceOptions();
compressionServiceOptions.forEach((algorithmName, configuredFactoryName) -> {
try
{
ICompressorFactory compressorFactory = getServiceProviderFactory(configuredFactoryName);
if (compressorFactory != null)
{
compressionServiceBuilder.put(algorithmName, compressorFactory);
logger.info("Adding '{}' for '{}'", configuredFactoryName, algorithmName);
}
}
catch (Exception e)
{
logger.warn("Failed to load service '{}' for '{}'", configuredFactoryName, algorithmName);
}
});
compressionServices = compressionServiceBuilder.build();
}

private static ICompressorFactory getServiceProviderFactory(String configuredFactoryName)
{
ServiceLoader<ICompressorFactory> loader = ServiceLoader.load(ICompressorFactory.class);
return loader.stream()
.filter(factory -> factory.type().getSimpleName().equals(configuredFactoryName))
.map(ServiceLoader.Provider::get)
.findFirst()
.get();
}

/**
* Creates a decorated compressor, if service providers available or
* returns the base compressor provided as input
*/
private static ICompressor decorateCompressor(ICompressor baseCompressor, Map<String, String> options)
{
if(compressionServices != null) {
try {
ICompressorFactory selectedFactory = compressionServices.get(baseCompressor.getClass().getSimpleName());
if (selectedFactory != null) {
ICompressor pluginCompressor = selectedFactory.createCompressionProvider(baseCompressor, options);
return new CompressorDecorator(baseCompressor, pluginCompressor);
}
}
catch(Exception e) {
logger.warn("Failed to access service provider. Will fallback to default!!");
}
}
return baseCompressor;
}

/**
* Parse the chunk length (in KiB) and returns it as bytes.
*
Expand Down