From dc3a429af4fb779a53d41988649a81fdb1c62c3e Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Tue, 30 Dec 2025 20:05:36 +0800 Subject: [PATCH 1/4] JindoFileIO support cache using JindoCache --- .../paimon/rest/RESTCatalogOptions.java | 7 + .../apache/paimon/rest/RESTTokenFileIO.java | 31 +++++ .../apache/paimon/rest/RESTCatalogTest.java | 62 +++++++++ .../paimon/jindo/HadoopCompliantFileIO.java | 126 +++++++++++++++--- .../org/apache/paimon/jindo/JindoFileIO.java | 20 ++- .../jindo/JindoMultiPartUploadCommitter.java | 2 +- 6 files changed, 222 insertions(+), 26 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index d1a88ef4f839..4e1211c1b2f0 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -103,4 +103,11 @@ public class RESTCatalogOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog DLF OSS endpoint."); + + public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = + ConfigOptions.key("dlf.file-io.cache.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index d59ca6dd47c5..c8480ebcc577 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -45,11 +45,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; /** A {@link FileIO} to support getting token from REST Server. */ @@ -63,6 +66,13 @@ public class RESTTokenFileIO implements FileIO { .defaultValue(false) .withDescription("Whether to support data token provided by the REST server."); + public static final ConfigOption FILE_IO_CACHE_POLICY = + ConfigOptions.key("dlf.file-io.cache.policy") + .stringType() + .noDefaultValue() + .withDescription( + "The cache policy of a table provided by the REST server, combined with: meta,read,write"); + private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() .maximumSize(1000) @@ -239,6 +249,27 @@ private Map mergeTokenWithCatalogOptions(Map tok if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) { newToken.put("fs.oss.endpoint", dlfOssEndpoint); } + + // Process file io cache configuration + if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) { + // Disable file io cache, remove the cache policy configs + newToken.remove(FILE_IO_CACHE_POLICY.key()); + } else { + // Enable file io cache, reorder cache policy in fixed order, + // and allow user to override policy provided by REST server. + String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY); + if (cachePolicy == null) { + cachePolicy = token.get(FILE_IO_CACHE_POLICY.key()); + } + if (cachePolicy != null) { + Set cachePolicySet = new TreeSet<>(); + for (String policy : cachePolicy.split(",")) { + cachePolicySet.add(policy.trim().toLowerCase()); + } + newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet)); + } + } + return ImmutableMap.copyOf(newToken); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 288c925ceb23..bf0143dfc2d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -108,6 +108,7 @@ import static org.apache.paimon.TableType.OBJECT_TABLE; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT; import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER; import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis; @@ -2694,6 +2695,67 @@ void testReadPartitionsTable() throws Exception { } } + @Test + void testEnableFileIOCache() throws Exception { + // Enable cache at client-side + Map options = new HashMap<>(); + options.put( + DLF_FILE_IO_CACHE_ENABLED.key(), + "true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + "dlf.file-io.cache.policy", + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertEquals(cachePolicy, fileDataToken.token().get("dlf.file-io.cache.policy")); + } + + @Test + void testDisableFileIOCache() throws Exception { + // Disable cache at client-side + Map options = new HashMap<>(); + this.catalog = newRestCatalogWithDataToken(options); + Identifier identifier = + Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache"); + String cachePolicy = "meta,read"; + RESTToken token = + new RESTToken( + ImmutableMap.of( + "akId", + "akId", + "akSecret", + UUID.randomUUID().toString(), + "dlf.file-io.cache.policy", + cachePolicy), + System.currentTimeMillis() + 3600_000L); + setDataTokenToRestServerForMock(identifier, token); + createTable( + identifier, + ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); + RESTToken fileDataToken = fileIO.validToken(); + assertNull(fileDataToken.token().get("dlf.file-io.cache.policy")); + } + private TestPagedResponse generateTestPagedResponse( Map queryParams, List testData, diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 3ced092f1d68..219de5608dc1 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -18,6 +18,7 @@ package org.apache.paimon.jindo; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -27,31 +28,95 @@ import org.apache.paimon.fs.VectoredReadable; import org.apache.paimon.utils.Pair; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import com.aliyun.jindodata.common.JindoHadoopSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; + /** * Hadoop {@link FileIO}. * *

Important: copy this class from HadoopFileIO here to avoid class loader conflicts. */ public abstract class HadoopCompliantFileIO implements FileIO { + private static final Logger LOG = LoggerFactory.getLogger(HadoopCompliantFileIO.class); private static final long serialVersionUID = 1L; + /// Detailed cache strategies are retrieved from REST server. + private static final String META_CACHE_ENABLED_TAG = "meta"; + private static final String READ_CACHE_ENABLED_TAG = "read"; + private static final String WRITE_CACHE_ENABLED_TAG = "write"; + + private boolean metaCacheEnabled = false; + private boolean readCacheEnabled = false; + private boolean writeCacheEnabled = false; + protected transient volatile Map> fsMap; + protected transient volatile Map> jindoCacheFsMap; + + // Only enable cache for path which is generated with uuid + private static final List CACHE_WHITELIST_PATH_PATTERN = + Lists.newArrayList("bucket-", "manifest"); + + private boolean shouldCache(Path path) { + String pathStr = path.toUri().getPath(); + for (String pattern : CACHE_WHITELIST_PATH_PATTERN) { + if (pathStr.contains(pattern)) { + return true; + } + } + return false; + } + + @Override + public void configure(CatalogContext context) { + if (context.options().get(DLF_FILE_IO_CACHE_ENABLED) + && context.options().get(FILE_IO_CACHE_POLICY) != null) { + if (context.options().get("fs.jindocache.namespace.rpc.address") == null) { + LOG.info( + "FileIO cache is enabled but JindoCache RPC address is not set, fallback to no-cache"); + } else { + metaCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(META_CACHE_ENABLED_TAG); + readCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(READ_CACHE_ENABLED_TAG); + writeCacheEnabled = + context.options() + .get(FILE_IO_CACHE_POLICY) + .contains(WRITE_CACHE_ENABLED_TAG); + LOG.info( + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}", + metaCacheEnabled, + readCacheEnabled, + writeCacheEnabled); + } + } + } @Override public SeekableInputStream newInputStream(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + boolean shouldCache = readCacheEnabled && shouldCache(path); + LOG.debug("InputStream should cache {} for path {}", shouldCache, path); + Pair pair = getFileSystemPair(hadoopPath, shouldCache); JindoHadoopSystem fs = pair.getKey(); String sysType = pair.getValue(); FSDataInputStream fsInput = fs.open(hadoopPath); @@ -63,14 +128,19 @@ public SeekableInputStream newInputStream(Path path) throws IOException { @Override public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); + boolean shouldCache = writeCacheEnabled && shouldCache(path); + LOG.debug("OutputStream should cache {} for path {}", shouldCache, path); return new HadoopPositionOutputStream( - getFileSystem(hadoopPath).create(hadoopPath, overwrite)); + getFileSystem(hadoopPath, shouldCache).create(hadoopPath, overwrite)); } @Override public FileStatus getFileStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return new HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath)); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("GetFileStatus should cache {} for path {}", shouldCache, path); + return new HadoopFileStatus( + getFileSystem(hadoopPath, shouldCache).getFileStatus(hadoopPath)); } @Override @@ -78,7 +148,7 @@ public FileStatus[] listStatus(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); FileStatus[] statuses = new FileStatus[0]; org.apache.hadoop.fs.FileStatus[] hadoopStatuses = - getFileSystem(hadoopPath).listStatus(hadoopPath); + getFileSystem(hadoopPath, false).listStatus(hadoopPath); if (hadoopStatuses != null) { statuses = new FileStatus[hadoopStatuses.length]; for (int i = 0; i < hadoopStatuses.length; i++) { @@ -93,7 +163,7 @@ public RemoteIterator listFilesIterative(Path path, boolean recursiv throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); org.apache.hadoop.fs.RemoteIterator hadoopIter = - getFileSystem(hadoopPath).listFiles(hadoopPath, recursive); + getFileSystem(hadoopPath, false).listFiles(hadoopPath, recursive); return new RemoteIterator() { @Override public boolean hasNext() throws IOException { @@ -111,26 +181,28 @@ public FileStatus next() throws IOException { @Override public boolean exists(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).exists(hadoopPath); + boolean shouldCache = metaCacheEnabled && shouldCache(path); + LOG.debug("Exists should cache {} for path {}", shouldCache, path); + return getFileSystem(hadoopPath, shouldCache).exists(hadoopPath); } @Override public boolean delete(Path path, boolean recursive) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).delete(hadoopPath, recursive); + return getFileSystem(hadoopPath, false).delete(hadoopPath, recursive); } @Override public boolean mkdirs(Path path) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); - return getFileSystem(hadoopPath).mkdirs(hadoopPath); + return getFileSystem(hadoopPath, false).mkdirs(hadoopPath); } @Override public boolean rename(Path src, Path dst) throws IOException { org.apache.hadoop.fs.Path hadoopSrc = path(src); org.apache.hadoop.fs.Path hadoopDst = path(dst); - return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); + return getFileSystem(hadoopSrc, false).rename(hadoopSrc, hadoopDst); } protected org.apache.hadoop.fs.Path path(Path path) { @@ -141,22 +213,34 @@ protected org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } - protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) throws IOException { - return getFileSystemPair(path).getKey(); + protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path, boolean enableCache) + throws IOException { + return getFileSystemPair(path, enableCache).getKey(); } - protected Pair getFileSystemPair(org.apache.hadoop.fs.Path path) - throws IOException { - if (fsMap == null) { - synchronized (this) { - if (fsMap == null) { - fsMap = new ConcurrentHashMap<>(); + protected Pair getFileSystemPair( + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException { + Map> map; + if (enableCache) { + if (jindoCacheFsMap == null) { + synchronized (this) { + if (jindoCacheFsMap == null) { + jindoCacheFsMap = new ConcurrentHashMap<>(); + } + } + } + map = jindoCacheFsMap; + } else { + if (fsMap == null) { + synchronized (this) { + if (fsMap == null) { + fsMap = new ConcurrentHashMap<>(); + } } } + map = fsMap; } - Map> map = fsMap; - String authority = path.toUri().getAuthority(); if (authority == null) { authority = "DEFAULT"; @@ -166,7 +250,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs authority, k -> { try { - return createFileSystem(path); + return createFileSystem(path, enableCache); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -177,7 +261,7 @@ protected Pair getFileSystemPair(org.apache.hadoop.fs } protected abstract Pair createFileSystem( - org.apache.hadoop.fs.Path path) throws IOException; + org.apache.hadoop.fs.Path path, boolean enableCache) throws IOException; private static class HadoopSeekableInputStream extends SeekableInputStream { diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 0aa0939d50f7..40d67d4ed9b3 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -82,6 +82,7 @@ public class JindoFileIO extends HadoopCompliantFileIO { new ConcurrentHashMap<>(); private Options hadoopOptions; + private Options hadoopOptionsWithCache; private boolean allowCache = true; @Override @@ -91,6 +92,7 @@ public boolean isObjectStore() { @Override public void configure(CatalogContext context) { + super.configure(context); allowCache = context.options().get(FILE_IO_ALLOW_CACHE); hadoopOptions = new Options(); // read all configuration with prefix 'CONFIG_PREFIXES' @@ -127,6 +129,14 @@ public void configure(CatalogContext context) { .iterator() .forEachRemaining(entry -> hadoopOptions.set(entry.getKey(), entry.getValue())); } + + // another config when enable cache + hadoopOptionsWithCache = new Options(hadoopOptions.toMap()); + hadoopOptionsWithCache.set("fs.xengine", "jindocache"); + // Workaround: following configurations to avoid bug in some JindoSDK versions + hadoopOptionsWithCache.set("fs.oss.read.profile.columnar.use-pread", "false"); + hadoopOptionsWithCache.set( + "fs.jindocache.read.profile.columnar.readahead.pread.enable", "false"); } public Options hadoopOptions() { @@ -140,20 +150,22 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite throw new IOException("File " + path + " already exists."); } org.apache.hadoop.fs.Path hadoopPath = path(path); - Pair pair = getFileSystemPair(hadoopPath); + Pair pair = getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoTwoPhaseOutputStream( new JindoMultiPartUpload(fs, hadoopPath), hadoopPath, path); } @Override - protected Pair createFileSystem(org.apache.hadoop.fs.Path path) { + protected Pair createFileSystem( + org.apache.hadoop.fs.Path path, boolean enableCache) { final String scheme = path.toUri().getScheme(); final String authority = path.toUri().getAuthority(); + Options options = enableCache ? hadoopOptionsWithCache : hadoopOptions; Supplier> supplier = () -> { Configuration hadoopConf = new Configuration(false); - hadoopOptions.toMap().forEach(hadoopConf::set); + options.toMap().forEach(hadoopConf::set); URI fsUri = path.toUri(); if (scheme == null && authority == null) { fsUri = FileSystem.getDefaultUri(hadoopConf); @@ -185,7 +197,7 @@ protected Pair createFileSystem(org.apache.hadoop.fs. if (allowCache) { return CACHE.computeIfAbsent( - new CacheKey(hadoopOptions, scheme, authority), key -> supplier.get()); + new CacheKey(options, scheme, authority), key -> supplier.get()); } else { return supplier.get(); } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java index 435a0bc49a8f..0f8ba549abba 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoMultiPartUploadCommitter.java @@ -47,7 +47,7 @@ protected MultiPartUploadStore multiPartUploa FileIO fileIO, Path targetPath) throws IOException { JindoFileIO jindoFileIO = (JindoFileIO) fileIO; org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath); - Pair pair = jindoFileIO.getFileSystemPair(hadoopPath); + Pair pair = jindoFileIO.getFileSystemPair(hadoopPath, false); JindoHadoopSystem fs = pair.getKey(); return new JindoMultiPartUpload(fs, hadoopPath); } From d4e35e86ac0149472b35c15871ed77bb2af65e4a Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 5 Jan 2026 20:36:12 +0800 Subject: [PATCH 2/4] Refactor config --- .../org/apache/paimon/rest/RESTCatalogOptions.java | 2 +- .../org/apache/paimon/rest/RESTTokenFileIO.java | 2 +- .../org/apache/paimon/rest/RESTCatalogTest.java | 13 +++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 4e1211c1b2f0..8799ca3b4716 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -105,7 +105,7 @@ public class RESTCatalogOptions { .withDescription("REST Catalog DLF OSS endpoint."); public static final ConfigOption DLF_FILE_IO_CACHE_ENABLED = - ConfigOptions.key("dlf.file-io.cache.enabled") + ConfigOptions.key("dlf.io-cache-enabled") .booleanType() .defaultValue(false) .withDescription( diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index c8480ebcc577..6d1464b12082 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -67,7 +67,7 @@ public class RESTTokenFileIO implements FileIO { .withDescription("Whether to support data token provided by the REST server."); public static final ConfigOption FILE_IO_CACHE_POLICY = - ConfigOptions.key("dlf.file-io.cache.policy") + ConfigOptions.key("dlf.io-cache.policy") .stringType() .noDefaultValue() .withDescription( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index bf0143dfc2d8..55c9392da9a2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -2713,18 +2713,19 @@ void testEnableFileIOCache() throws Exception { "akId", "akSecret", UUID.randomUUID().toString(), - "dlf.file-io.cache.policy", + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), System.currentTimeMillis() + 3600_000L); setDataTokenToRestServerForMock(identifier, token); createTable( identifier, - ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), Lists.newArrayList("col1")); FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); RESTToken fileDataToken = fileIO.validToken(); - assertEquals(cachePolicy, fileDataToken.token().get("dlf.file-io.cache.policy")); + assertEquals( + cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); } @Test @@ -2742,18 +2743,18 @@ void testDisableFileIOCache() throws Exception { "akId", "akSecret", UUID.randomUUID().toString(), - "dlf.file-io.cache.policy", + RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), System.currentTimeMillis() + 3600_000L); setDataTokenToRestServerForMock(identifier, token); createTable( identifier, - ImmutableMap.of("dlf.file-io.cache.policy", cachePolicy), + ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy), Lists.newArrayList("col1")); FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO(); RESTToken fileDataToken = fileIO.validToken(); - assertNull(fileDataToken.token().get("dlf.file-io.cache.policy")); + assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key())); } private TestPagedResponse generateTestPagedResponse( From 8c12687298c205a3d30437f2c69c42819467541c Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 12 Jan 2026 16:46:13 +0800 Subject: [PATCH 3/4] Support cache for lance format --- .../paimon/rest/RESTCatalogOptions.java | 6 +++++ .../apache/paimon/rest/RESTTokenFileIO.java | 5 +++- .../paimon/jindo/HadoopCompliantFileIO.java | 27 ++++++++++++------- .../org/apache/paimon/jindo/JindoFileIO.java | 23 ++++++++++++++-- .../format/lance/LanceReaderFactory.java | 4 +-- .../paimon/format/lance/LanceUtils.java | 15 +++++++++-- .../format/lance/LanceWriterFactory.java | 4 +-- 7 files changed, 66 insertions(+), 18 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 8799ca3b4716..238cf4c72c3f 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -110,4 +110,10 @@ public class RESTCatalogOptions { .defaultValue(false) .withDescription( "Enable cache for visiting files using file io (currently only JindoFileIO supports cache)."); + public static final ConfigOption DLF_FILE_IO_CACHE_WHITELIST_PATH = + ConfigOptions.key("dlf.io-cache.whitelist-path") + .stringType() + .defaultValue("bucket-,manifest") + .withDescription( + "Cache is only applied to paths which contain the specified pattern, and * means all paths."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index 6d1464b12082..acc2489c86f1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -71,7 +71,10 @@ public class RESTTokenFileIO implements FileIO { .stringType() .noDefaultValue() .withDescription( - "The cache policy of a table provided by the REST server, combined with: meta,read,write"); + "The cache policy of a table provided by the REST server, combined with: meta,read,write." + + "`meta`: meta cache is enabled for visiting files; " + + "`read`: cache is enabled when reading files; " + + "`write`: data is also cached when writing files."); private static final Cache FILE_IO_CACHE = Caffeine.newBuilder() diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java index 219de5608dc1..2be789d02d93 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java @@ -39,11 +39,13 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED; +import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_WHITELIST_PATH; import static org.apache.paimon.rest.RESTTokenFileIO.FILE_IO_CACHE_POLICY; /** @@ -61,20 +63,22 @@ public abstract class HadoopCompliantFileIO implements FileIO { private static final String READ_CACHE_ENABLED_TAG = "read"; private static final String WRITE_CACHE_ENABLED_TAG = "write"; - private boolean metaCacheEnabled = false; - private boolean readCacheEnabled = false; - private boolean writeCacheEnabled = false; + protected boolean metaCacheEnabled = false; + protected boolean readCacheEnabled = false; + protected boolean writeCacheEnabled = false; protected transient volatile Map> fsMap; protected transient volatile Map> jindoCacheFsMap; // Only enable cache for path which is generated with uuid - private static final List CACHE_WHITELIST_PATH_PATTERN = - Lists.newArrayList("bucket-", "manifest"); + private List cacheWhitelistPaths = new ArrayList<>(); - private boolean shouldCache(Path path) { + protected boolean shouldCache(Path path) { + if (cacheWhitelistPaths.isEmpty()) { + return true; + } String pathStr = path.toUri().getPath(); - for (String pattern : CACHE_WHITELIST_PATH_PATTERN) { + for (String pattern : cacheWhitelistPaths) { if (pathStr.contains(pattern)) { return true; } @@ -102,11 +106,16 @@ public void configure(CatalogContext context) { context.options() .get(FILE_IO_CACHE_POLICY) .contains(WRITE_CACHE_ENABLED_TAG); + String whitelist = context.options().get(DLF_FILE_IO_CACHE_WHITELIST_PATH); + if (!whitelist.equals("*")) { + cacheWhitelistPaths = Lists.newArrayList(whitelist.split(",")); + } LOG.info( - "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}", + "Cache enabled with cache policy: meta cache enabled {}, read cache enabled {}, write cache enabled {}, whitelist path: {}", metaCacheEnabled, readCacheEnabled, - writeCacheEnabled); + writeCacheEnabled, + whitelist); } } } diff --git a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java index 40d67d4ed9b3..2960dae23bbf 100644 --- a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java +++ b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java @@ -139,8 +139,27 @@ public void configure(CatalogContext context) { "fs.jindocache.read.profile.columnar.readahead.pread.enable", "false"); } - public Options hadoopOptions() { - return hadoopOptions; + /** + * This method is used to initialize some thirdparty connector, such as Lance reader/writer. + * + * @param path file path + * @param opType read/write/meta + * @return + */ + public Options hadoopOptions(Path path, String opType) { + boolean shouldCache = false; + if (opType.equalsIgnoreCase("read")) { + shouldCache = readCacheEnabled && shouldCache(path); + } else if (opType.equalsIgnoreCase("write")) { + shouldCache = writeCacheEnabled && shouldCache(path); + } else if (opType.equalsIgnoreCase("meta")) { + shouldCache = metaCacheEnabled && shouldCache(path); + } + if (shouldCache) { + return hadoopOptionsWithCache; + } else { + return hadoopOptions; + } } @Override diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java index 36de644b5b37..d1a27e10afd4 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceReaderFactory.java @@ -33,7 +33,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecified; +import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecifiedForReader; /** A factory to create Lance Reader. */ public class LanceReaderFactory implements FormatReaderFactory { @@ -55,7 +55,7 @@ public FileRecordReader createReader(Context context) throws IOExce } Pair> lanceSpecified = - toLanceSpecified(context.fileIO(), context.filePath()); + toLanceSpecifiedForReader(context.fileIO(), context.filePath()); return new LanceRecordsReader( lanceSpecified.getLeft(), selectionRangesArray, diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java index 407dc4238583..3748d0403cb4 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java @@ -90,7 +90,18 @@ public class LanceUtils { hadoopFileIOKlass = klass; } - public static Pair> toLanceSpecified(FileIO fileIO, Path path) { + public static Pair> toLanceSpecifiedForReader( + FileIO fileIO, Path path) { + return toLanceSpecified(fileIO, path, true); + } + + public static Pair> toLanceSpecifiedForWriter( + FileIO fileIO, Path path) { + return toLanceSpecified(fileIO, path, false); + } + + private static Pair> toLanceSpecified( + FileIO fileIO, Path path, boolean isRead) { URI uri = path.toUri(); String schema = uri.getScheme(); @@ -107,7 +118,7 @@ public static Pair> toLanceSpecified(FileIO fileIO, Pa if (ossFileIOKlass != null && ossFileIOKlass.isInstance(fileIO)) { originOptions = ((OSSFileIO) fileIO).hadoopOptions(); } else if (jindoFileIOKlass != null && jindoFileIOKlass.isInstance(fileIO)) { - originOptions = ((JindoFileIO) fileIO).hadoopOptions(); + originOptions = ((JindoFileIO) fileIO).hadoopOptions(path, isRead ? "read" : "write"); } else if (pluginFileIOKlass != null && pluginFileIOKlass.isInstance(fileIO)) { originOptions = ((PluginFileIO) fileIO).options(); } else if (hadoopFileIOKlass != null && hadoopFileIOKlass.isInstance(fileIO)) { diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java index 70a531c4d6bd..afa21edd86a0 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceWriterFactory.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.function.Supplier; -import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecified; +import static org.apache.paimon.format.lance.LanceUtils.toLanceSpecifiedForWriter; /** A factory to create Lance {@link FormatWriter}. */ public class LanceWriterFactory implements FormatWriterFactory, SupportsDirectWrite { @@ -52,7 +52,7 @@ public FormatWriter create(PositionOutputStream positionOutputStream, String com @Override public FormatWriter create(FileIO fileIO, Path path, String compression) throws IOException { - Pair> lanceSpecified = toLanceSpecified(fileIO, path); + Pair> lanceSpecified = toLanceSpecifiedForWriter(fileIO, path); LanceWriter lanceWriter = new LanceWriter(lanceSpecified.getLeft().toString(), lanceSpecified.getRight()); return new LanceRecordsWriter( From d0673cfd2d660527b89672371b589a499a3ae9c2 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Mon, 12 Jan 2026 17:36:01 +0800 Subject: [PATCH 4/4] Rebase and fix build --- .../java/org/apache/paimon/format/lance/LanceUtilsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java index d00f9a3dd635..0a0d28c3bfc5 100644 --- a/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java +++ b/paimon-lance/src/test/java/org/apache/paimon/format/lance/LanceUtilsTest.java @@ -67,7 +67,7 @@ void testOssUrlConversion() { TestFileIO fileIO = new TestFileIO(); fileIO.setOptions(options); - Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + Pair> result = LanceUtils.toLanceSpecifiedForReader(fileIO, path); assertTrue(result.getKey().toString().startsWith("oss://test-bucket/")); @@ -97,7 +97,7 @@ void testOssUrlWithSecurityToken() { TestFileIO fileIO = new TestFileIO(); fileIO.setOptions(options); - Pair> result = LanceUtils.toLanceSpecified(fileIO, path); + Pair> result = LanceUtils.toLanceSpecifiedForReader(fileIO, path); Map storageOptions = result.getValue(); assertEquals("test-token", storageOptions.get(LanceUtils.STORAGE_OPTION_SESSION_TOKEN));