Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,17 @@ public class RESTCatalogOptions {
.stringType()
.noDefaultValue()
.withDescription("REST Catalog DLF OSS endpoint.");

public static final ConfigOption<Boolean> DLF_FILE_IO_CACHE_ENABLED =
ConfigOptions.key("dlf.io-cache-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable cache for visiting files using file io (currently only JindoFileIO supports cache).");
public static final ConfigOption<String> DLF_FILE_IO_CACHE_WHITELIST_PATH =
ConfigOptions.key("dlf.io-cache.whitelist-path")
.stringType()
.defaultValue("bucket-,manifest")
.withDescription(
"Cache is only applied to paths which contain the specified pattern, and * means all paths.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;

/** A {@link FileIO} to support getting token from REST Server. */
Expand All @@ -63,6 +66,16 @@ public class RESTTokenFileIO implements FileIO {
.defaultValue(false)
.withDescription("Whether to support data token provided by the REST server.");

public static final ConfigOption<String> FILE_IO_CACHE_POLICY =
ConfigOptions.key("dlf.io-cache.policy")
.stringType()
.noDefaultValue()
.withDescription(
"The cache policy of a table provided by the REST server, combined with: meta,read,write."
+ "`meta`: meta cache is enabled for visiting files; "
+ "`read`: cache is enabled when reading files; "
+ "`write`: data is also cached when writing files.");

private static final Cache<RESTToken, FileIO> FILE_IO_CACHE =
Caffeine.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -239,6 +252,27 @@ private Map<String, String> mergeTokenWithCatalogOptions(Map<String, String> tok
if (dlfOssEndpoint != null && !dlfOssEndpoint.isEmpty()) {
newToken.put("fs.oss.endpoint", dlfOssEndpoint);
}

// Process file io cache configuration
if (!catalogContext.options().get(DLF_FILE_IO_CACHE_ENABLED)) {
// Disable file io cache, remove the cache policy configs
newToken.remove(FILE_IO_CACHE_POLICY.key());
} else {
// Enable file io cache, reorder cache policy in fixed order,
// and allow user to override policy provided by REST server.
String cachePolicy = catalogContext.options().get(FILE_IO_CACHE_POLICY);
if (cachePolicy == null) {
cachePolicy = token.get(FILE_IO_CACHE_POLICY.key());
}
if (cachePolicy != null) {
Set<String> cachePolicySet = new TreeSet<>();
for (String policy : cachePolicy.split(",")) {
cachePolicySet.add(policy.trim().toLowerCase());
}
newToken.put(FILE_IO_CACHE_POLICY.key(), String.join(",", cachePolicySet));
}
}

return ImmutableMap.copyOf(newToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static org.apache.paimon.TableType.OBJECT_TABLE;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_FILE_IO_CACHE_ENABLED;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;
import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
import static org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
Expand Down Expand Up @@ -2694,6 +2695,68 @@ void testReadPartitionsTable() throws Exception {
}
}

@Test
void testEnableFileIOCache() throws Exception {
// Enable cache at client-side
Map<String, String> options = new HashMap<>();
options.put(
DLF_FILE_IO_CACHE_ENABLED.key(),
"true"); // DLF_FILE_IO_CACHE_ENABLED MUST be configured to enable cache
this.catalog = newRestCatalogWithDataToken(options);
Identifier identifier =
Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache");
String cachePolicy = "meta,read";
RESTToken token =
new RESTToken(
ImmutableMap.of(
"akId",
"akId",
"akSecret",
UUID.randomUUID().toString(),
RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(),
cachePolicy),
System.currentTimeMillis() + 3600_000L);
setDataTokenToRestServerForMock(identifier, token);
createTable(
identifier,
ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy),
Lists.newArrayList("col1"));
FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
RESTToken fileDataToken = fileIO.validToken();
assertEquals(
cachePolicy, fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key()));
}

@Test
void testDisableFileIOCache() throws Exception {
// Disable cache at client-side
Map<String, String> options = new HashMap<>();
this.catalog = newRestCatalogWithDataToken(options);
Identifier identifier =
Identifier.create("test_file_io_cache", "table_for_testing_file_io_cache");
String cachePolicy = "meta,read";
RESTToken token =
new RESTToken(
ImmutableMap.of(
"akId",
"akId",
"akSecret",
UUID.randomUUID().toString(),
RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(),
cachePolicy),
System.currentTimeMillis() + 3600_000L);
setDataTokenToRestServerForMock(identifier, token);
createTable(
identifier,
ImmutableMap.of(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key(), cachePolicy),
Lists.newArrayList("col1"));
FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
RESTToken fileDataToken = fileIO.validToken();
assertNull(fileDataToken.token().get(RESTTokenFileIO.FILE_IO_CACHE_POLICY.key()));
}

private TestPagedResponse generateTestPagedResponse(
Map<String, String> queryParams,
List<Integer> testData,
Expand Down
Loading