From 5a14f5fd95fabc2db628d125123d07e57c605c9e Mon Sep 17 00:00:00 2001 From: "Kokoori, Shylaja" Date: Thu, 4 Dec 2025 09:41:09 -0800 Subject: [PATCH] Adding support for plugin compressors --- .../compress/AbstractCompressorDecorator.java | 72 ++++++++++++++++++ .../compress/CompressedSequentialWriter.java | 2 +- .../io/compress/CompressionMetadata.java | 2 +- .../io/compress/CompressorDecorator.java | 75 +++++++++++++++++++ .../io/compress/ICompressorFactory.java | 42 +++++++++++ .../cassandra/schema/CompressionParams.java | 59 +++++++++++++++ 6 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java create mode 100644 src/java/org/apache/cassandra/io/compress/CompressorDecorator.java create mode 100644 src/java/org/apache/cassandra/io/compress/ICompressorFactory.java diff --git a/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java b/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java new file mode 100644 index 000000000000..a417d9471375 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/AbstractCompressorDecorator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; + +public abstract class AbstractCompressorDecorator implements ICompressor +{ + protected ICompressor baseCompressor; + public AbstractCompressorDecorator(ICompressor compressor) + { + this.baseCompressor = compressor; + } + + @Override + public int initialCompressedBufferLength(int chunkLength) + { + return baseCompressor.initialCompressedBufferLength(chunkLength); + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + return baseCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + baseCompressor.compress(input, output); + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + baseCompressor.uncompress(input, output); + } + + @Override + public BufferType preferredBufferType() { + return baseCompressor.preferredBufferType(); + } + + @Override + public boolean supports(BufferType bufferType) { + return baseCompressor.supports(bufferType); + } + + @Override + public Set supportedOptions() { + return baseCompressor.supportedOptions(); + } + +} diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 1f4cd517b021..3b339a4e0d9c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -109,7 +109,7 @@ public CompressedSequentialWriter(File file, .bufferType(parameters.getSstableCompressor().preferredBufferType()) .finishOnClose(option.finishOnClose()) .build()); - ICompressor compressor = parameters.getSstableCompressor(); + ICompressor compressor = parameters.getDecoratedSstableCompressor(); this.digestFile = Optional.ofNullable(digestFile); // buffer for compression should be the same size as buffer itself diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index f49734a6597e..a06057a5ab66 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -177,7 +177,7 @@ public ICompressor compressor() result = resolvedCompressor; if (result == null) { - result = resolveCompressor(parameters.getSstableCompressor(), compressionDictionary); + result = resolveCompressor(parameters.getDecoratedSstableCompressor(), compressionDictionary); resolvedCompressor = result; } return result; diff --git a/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java b/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java new file mode 100644 index 000000000000..4ebf258a4dd0 --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/CompressorDecorator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * This class uses a plugin compressor to perform compress/decompress, if available, otherwise reverts to default SstableCompressor. + */ +public class CompressorDecorator extends AbstractCompressorDecorator +{ + private ICompressor pluginCompressor; + public CompressorDecorator(ICompressor baseCompressor, ICompressor pluginCompressor) + { + super(baseCompressor); + this.pluginCompressor = pluginCompressor; + } + + @Override + public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) throws IOException + { + try + { + return pluginCompressor.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + catch(IOException e) + { + return super.uncompress(input, inputOffset, inputLength, output, outputOffset); + } + } + + @Override + public void compress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + pluginCompressor.compress(input, output); + } + catch(IOException e) + { + super.compress(input, output); + } + } + + @Override + public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException + { + try + { + pluginCompressor.uncompress(input, output); + } + catch(IOException e) + { + super.uncompress(input, output); + } + } + +} diff --git a/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java b/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java new file mode 100644 index 000000000000..10ceaf11364c --- /dev/null +++ b/src/java/org/apache/cassandra/io/compress/ICompressorFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.compress; + +import java.util.Map; +import com.google.common.collect.ImmutableMap; + +public interface ICompressorFactory +{ + public static final ImmutableMap COMPRESSOR_NAME_MAP = ImmutableMap.of( + "deflate", "DeflateCompressor", + "lz4", "LZ4Compressor", + "snappy", "SnappyCompressor", + "zstd", "ZstdCompressor" + ); + + /** + * Used to create a plugin compressor + */ + public ICompressor createCompressor(Map options) throws IllegalStateException; + + /** + * Services can use COMPRESSOR_NAME_MAP to associate with an existing compressor + */ + public String getSupportedCompressorName(); + +} diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index fdf184e94d96..dd11dba86f0c 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -40,10 +42,14 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static java.lang.String.format; public final class CompressionParams { + private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class); public static final int DEFAULT_CHUNK_LENGTH = 1024 * 16; public static final double DEFAULT_MIN_COMPRESS_RATIO = 0.0; // Since pre-4.0 versions do not understand the // new compression parameter we can't use a @@ -71,6 +77,7 @@ public final class CompressionParams Collections.emptyMap()); private final ICompressor sstableCompressor; + private final ICompressor decoratedSstableCompressor; private final int chunkLength; private final int maxCompressedLength; // In content we store max length to avoid rounding errors causing compress/decompress mismatch. private final double minCompressRatio; // In configuration we store min ratio, the input parameter. @@ -223,6 +230,7 @@ private CompressionParams(ICompressor sstableCompressor, int chunkLength, int ma this.otherOptions = ImmutableMap.copyOf(otherOptions); this.minCompressRatio = minCompressRatio; this.maxCompressedLength = maxCompressedLength; + this.decoratedSstableCompressor = decorateCompressor(sstableCompressor, otherOptions); } public CompressionParams copy() @@ -260,6 +268,15 @@ public ICompressor getSstableCompressor() return sstableCompressor; } + /** + * Provides a decorated SSTable compressor, if a compression service is loaded. + * @return a decorated SSTable compressor or {@code getSstableCompressor()}. + */ + public ICompressor getDecoratedSstableCompressor() + { + return decoratedSstableCompressor; + } + public ImmutableMap getOtherOptions() { return otherOptions; @@ -348,6 +365,48 @@ public static ICompressor createCompressor(ParameterizedClass compression) throw return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); } + /** + * Creates a decorated compressor, if compression service providers available in the classpath or + * returns the base compressor + * @param baseCompressor compressor being decorated + * @param options compression options of baseCompressor + * @return returns a decorated compressor, if service available, otherwise baseCompressor + */ + private static ICompressor decorateCompressor(ICompressor baseCompressor, Map options) + { + if(baseCompressor != null) + { + try + { + Optional selectedFactory = getServiceProviderFactory(baseCompressor.getClass().getSimpleName()); + if (selectedFactory.isPresent()) + { + ICompressor pluginCompressor = selectedFactory.get().createCompressor(options); + return new CompressorDecorator(baseCompressor, pluginCompressor); + } + } + catch(IllegalStateException e) + { + logger.trace("Failed to access service provider. Will fallback to default!!"); + } + } + return baseCompressor; + } + + /** + * Provides access to a factory to create plugin compressors, if available + * @param compressorName simple name of the compressor class + * @return an optional containing ICompressorFactory, if present, an empty optional otherwise + */ + private static Optional getServiceProviderFactory(String compressorName) + { + ServiceLoader loader = ServiceLoader.load(ICompressorFactory.class); + return loader.stream() + .filter(factory -> factory.get().getSupportedCompressorName().equals(compressorName)) + .map(ServiceLoader.Provider::get) + .findFirst(); + } + private static Map copyOptions(Map co) { if (co == null || co.isEmpty())