From e6ec5bf4c43778907d8c0a1aebedb201d6e850b2 Mon Sep 17 00:00:00 2001 From: "Kokoori, Shylaja" Date: Wed, 8 Oct 2025 12:01:25 -0700 Subject: [PATCH] Adding capability to load compression service providers using decorator pattern --- conf/cassandra_latest.yaml | 3 + .../org/apache/cassandra/config/Config.java | 12 +++ .../cassandra/config/DatabaseDescriptor.java | 8 ++ .../compress/AbstractCompressorDecorator.java | 72 ++++++++++++++++++ .../io/compress/CompressorDecorator.java | 72 ++++++++++++++++++ .../io/compress/ICompressorFactory.java | 26 +++++++ .../cassandra/schema/CompressionParams.java | 74 ++++++++++++++++++- 7 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java create mode 100644 src/java/org/apache/cassandra/io/compress/CompressorDecorator.java create mode 100644 src/java/org/apache/cassandra/io/compress/ICompressorFactory.java diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml index f42604843146..8f4ac47279ac 100644 --- a/conf/cassandra_latest.yaml +++ b/conf/cassandra_latest.yaml @@ -2529,3 +2529,6 @@ storage_compatibility_mode: NONE # # especially in keyspaces with many tables. The splitter avoids batching tables together if they # # exceed other configuration parameters like bytes_per_assignment or partitions_per_assignment. # max_tables_per_assignment: 64 +#compression_service_options: +# configurations: +# DeflateCompressor: QatDeflateCompressorFactory diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 9783e086ad13..abc9f2c95a79 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1502,4 +1502,16 @@ public enum CQLStartTime public boolean enforce_native_deadline_for_hints = false; public boolean paxos_repair_race_wait = true; + + //Retrieves the compression service configurations. + public static class CompressionServiceOptions + { + public Map configurations = new HashMap<>(); + + public CompressionServiceOptions() + { + } + } + public final CompressionServiceOptions compression_service_options = new CompressionServiceOptions(); + } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ec76193e1046..c302323ad01f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -5979,4 +5979,12 @@ public static void setPartitioner(String name) { partitioner = FBUtilities.newPartitioner(name); } + + public static Map getCompressionServiceOptions() + { + if (conf == null || conf.compression_service_options == null) + return null; + return conf.compression_service_options.configurations; + } + } diff --git a/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java b/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java new file mode 100644 index 000000000000..a417d9471375 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; + +public abstract class AbstractCompressorDecorator implements ICompressor +{ + protected ICompressor baseCompressor; + public AbstractCompressorDecorator(ICompressor compressor) + { + this.baseCompressor = compressor; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return baseCompressor.initialCompressedBufferLength(chunkLength); + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + return baseCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + baseCompressor.compress(input, output); + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + baseCompressor.uncompress(input, output); + } + + @Override + public BufferType preferredBufferType() { + return baseCompressor.preferredBufferType(); + } + + @Override + public boolean supports(BufferType bufferType) { + return baseCompressor.supports(bufferType); + } + + @Override + public Set supportedOptions() { + return baseCompressor.supportedOptions(); + } + +} diff --git a/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java b/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java new file mode 100644 index 000000000000..9fa2ab05e31f --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class CompressorDecorator extends AbstractCompressorDecorator +{ + private ICompressor pluginCompressor; + public CompressorDecorator(ICompressor baseCompressor, ICompressor pluginCompressor) + { + super(baseCompressor); + this.pluginCompressor = pluginCompressor; + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + try + { + return pluginCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + catch(IOException e) + { + return super.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + pluginCompressor.compress(input, output); + } + catch(IOException e) + { + super.compress(input, output); + } + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + pluginCompressor.uncompress(input, output); + } + catch(IOException e) + { + super.uncompress(input, output); + } + } + +} diff --git a/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java b/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java new file mode 100644 index 000000000000..b49dae4686b8 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.compress; + +import java.util.Map; + +public interface ICompressorFactory +{ + public ICompressor createCompressionProvider(ICompressor compressor, Map options); + // String getSupportedCompressorName(); //get supported compressor information from plugin or use yaml file? +} diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index 0e7c3da13ab0..cc827393a805 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.ServiceLoader; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -31,6 +32,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; @@ -40,10 +42,14 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static java.lang.String.format; public final class CompressionParams { + private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class); public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the // new compression parameter we can't use a @@ -75,6 +81,7 @@ public final class CompressionParams private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch. private final double minCompressRatio; // In configuration we store min ratio, the input parameter. private final ImmutableMap otherOptions; // Unrecognized options, can be used by the compressor + private static ImmutableMap compressionServices; public static CompressionParams fromMap(Map opts) { @@ -280,7 +287,11 @@ private static ICompressor createCompressor(Class compressorClass, Map copyOptions(Map compressionServiceBuilder = ImmutableMap.builder(); + Map compressionServiceOptions = DatabaseDescriptor.getCompressionServiceOptions(); + compressionServiceOptions.forEach((algorithmName, configuredFactoryName) -> { + try + { + ICompressorFactory compressorFactory = getServiceProviderFactory(configuredFactoryName); + if (compressorFactory != null) + { + compressionServiceBuilder.put(algorithmName, compressorFactory); + logger.info("Adding '{}' for '{}'", configuredFactoryName, algorithmName); + } + } + catch (Exception e) + { + logger.warn("Failed to load service '{}' for '{}'", configuredFactoryName, algorithmName); + } + }); + compressionServices = compressionServiceBuilder.build(); + } + + private static ICompressorFactory getServiceProviderFactory(String configuredFactoryName) + { + ServiceLoader loader = ServiceLoader.load(ICompressorFactory.class); + return loader.stream() + .filter(factory -> factory.type().getSimpleName().equals(configuredFactoryName)) + .map(ServiceLoader.Provider::get) + .findFirst() + .get(); + } + + /** + * Creates a decorated compressor, if service providers available or + * returns the base compressor provided as input + */ + private static ICompressor decorateCompressor(ICompressor baseCompressor, Map options) + { + if(compressionServices != null) { + try { + ICompressorFactory selectedFactory = compressionServices.get(baseCompressor.getClass().getSimpleName()); + if (selectedFactory != null) { + ICompressor pluginCompressor = selectedFactory.createCompressionProvider(baseCompressor, options); + return new CompressorDecorator(baseCompressor, pluginCompressor); + } + } + catch(Exception e) { + logger.warn("Failed to access service provider. Will fallback to default!!"); + } + } + return baseCompressor; + } + /** * Parse the chunk length (in KiB) and returns it as bytes. *