diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index e310f398f821..01e4bfd31722 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -3167,6 +3167,12 @@ components: type: array items: type: string + accessType: + type: string + description: Type of table access, omitted for direct table access. + fallbackFrom: + type: string + description: Source table for fallback access, omitted for direct table access. AuthTableQueryResponse: type: object properties: diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java index 7433a30388f8..4d20f3ef8004 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java @@ -809,7 +809,27 @@ public void replaceTable(Identifier identifier, Schema schema) { */ public AuthTableQueryResponse authTableQuery( Identifier identifier, @Nullable List select) { - AuthTableQueryRequest request = new AuthTableQueryRequest(select); + return authTableQuery(identifier, select, null, null); + } + + /** + * Auth table query with access context. + * + * @param identifier database name and table name. + * @param select select columns, null if select all + * @param accessType type of table access, null for direct table access + * @param fallbackFrom source table for fallback access, null for direct table access + * @return additional filter for row level access control and column masking rules + * @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists + * @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for + * this table + */ + public AuthTableQueryResponse authTableQuery( + Identifier identifier, + @Nullable List select, + @Nullable String accessType, + @Nullable String fallbackFrom) { + AuthTableQueryRequest request = new AuthTableQueryRequest(select, accessType, fallbackFrom); return client.post( resourcePaths.authTable(identifier.getDatabaseName(), identifier.getObjectName()), request, diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/requests/AuthTableQueryRequest.java b/paimon-api/src/main/java/org/apache/paimon/rest/requests/AuthTableQueryRequest.java index 2e0582183523..6fee6a709b21 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/requests/AuthTableQueryRequest.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/requests/AuthTableQueryRequest.java @@ -23,6 +23,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import javax.annotation.Nullable; @@ -34,14 +35,36 @@ public class AuthTableQueryRequest implements RESTRequest { private static final String FIELD_SELECT = "select"; + private static final String FIELD_ACCESS_TYPE = "accessType"; + private static final String FIELD_FALLBACK_FROM = "fallbackFrom"; @JsonProperty(FIELD_SELECT) + @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable private final List select; + @JsonProperty(FIELD_ACCESS_TYPE) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final String accessType; + + @JsonProperty(FIELD_FALLBACK_FROM) + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + private final String fallbackFrom; + @JsonCreator - public AuthTableQueryRequest(@JsonProperty(FIELD_SELECT) @Nullable List select) { + public AuthTableQueryRequest( + @JsonProperty(FIELD_SELECT) @Nullable List select, + @JsonProperty(FIELD_ACCESS_TYPE) @Nullable String accessType, + @JsonProperty(FIELD_FALLBACK_FROM) @Nullable String fallbackFrom) { this.select = select; + this.accessType = accessType; + this.fallbackFrom = fallbackFrom; + } + + public AuthTableQueryRequest(@Nullable List select) { + this(select, null, null); } @JsonGetter(FIELD_SELECT) @@ -49,4 +72,16 @@ public AuthTableQueryRequest(@JsonProperty(FIELD_SELECT) @Nullable List public List select() { return select; } + + @JsonGetter(FIELD_ACCESS_TYPE) + @Nullable + public String accessType() { + return accessType; + } + + @JsonGetter(FIELD_FALLBACK_FROM) + @Nullable + public String fallbackFrom() { + return fallbackFrom; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a8a8b1b91b6..fc955740532c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -592,6 +592,12 @@ protected void truncateTable(FileStoreTable existingTable) { @Override public Table getTable(Identifier identifier) throws TableNotExistException { + return getTable(identifier, TableAccessContext.direct()); + } + + @Override + public Table getTable(Identifier identifier, TableAccessContext accessContext) + throws TableNotExistException { return CatalogUtils.loadTable( this, identifier, @@ -601,7 +607,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { lockFactory().orElse(null), lockContext().orElse(null), context, - false); + false, + accessContext); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 57fa040a2acd..12eb45e90be5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -157,6 +157,19 @@ void alterDatabase(String name, List changes, boolean ignoreIfNo */ Table getTable(Identifier identifier) throws TableNotExistException; + /** + * Return a {@link Table} identified by the given {@link Identifier} with access context. + * + * @param identifier Path of the table + * @param accessContext context describing why the table is accessed + * @return The requested table + * @throws TableNotExistException if the target does not exist + */ + default Table getTable(Identifier identifier, TableAccessContext accessContext) + throws TableNotExistException { + return getTable(identifier); + } + /** * Return a {@link Table} identified by the given tableId. * @@ -1214,6 +1227,22 @@ void alterFunction( TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException; + /** + * Auth table query select with access context and get the filter for row level access control + * and column masking rules. + * + * @param identifier path of the table to query + * @param select selected fields, null if select all + * @param accessContext context describing why the table is accessed + * @return additional filter for row level access control and column masking rules + * @throws TableNotExistException if the table does not exist + */ + default TableQueryAuthResult authTableQuery( + Identifier identifier, @Nullable List select, TableAccessContext accessContext) + throws TableNotExistException { + return authTableQuery(identifier, select); + } + // ==================== Catalog Information ========================== /** Catalog options for re-creating this catalog. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index a33dc2c0c106..d191866ece66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -255,6 +255,31 @@ public static Table loadTable( @Nullable CatalogContext catalogContext, boolean isRestCatalog) throws Catalog.TableNotExistException { + return loadTable( + catalog, + identifier, + internalFileIO, + externalFileIO, + metadataLoader, + lockFactory, + lockContext, + catalogContext, + isRestCatalog, + TableAccessContext.direct()); + } + + public static Table loadTable( + Catalog catalog, + Identifier identifier, + Function internalFileIO, + Function externalFileIO, + TableMetadata.Loader metadataLoader, + @Nullable CatalogLockFactory lockFactory, + @Nullable CatalogLockContext lockContext, + @Nullable CatalogContext catalogContext, + boolean isRestCatalog, + TableAccessContext tableAccessContext) + throws Catalog.TableNotExistException { if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog); } @@ -299,7 +324,8 @@ public static Table loadTable( isRestCatalog ? null : lockContext, catalogContext, catalog.supportsVersionManagement(), - catalog.supportsPartitionModification()); + catalog.supportsPartitionModification(), + tableAccessContext); Path path = new Path(schema.options().get(PATH.key())); FileStoreTable table = FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 0f18f7d04540..9d6d4e9b9efa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -375,6 +375,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return wrapped.getTable(identifier); } + @Override + public Table getTable(Identifier identifier, TableAccessContext accessContext) + throws TableNotExistException { + return wrapped.getTable(identifier, accessContext); + } + @Override public View getView(Identifier identifier) throws ViewNotExistException { return wrapped.getView(identifier); @@ -461,6 +467,13 @@ public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List return wrapped.authTableQuery(identifier, select); } + @Override + public TableQueryAuthResult authTableQuery( + Identifier identifier, @Nullable List select, TableAccessContext accessContext) + throws TableNotExistException { + return wrapped.authTableQuery(identifier, select, accessContext); + } + @Override public void repairCatalog() { wrapped.repairCatalog(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/TableAccessContext.java b/paimon-core/src/main/java/org/apache/paimon/catalog/TableAccessContext.java new file mode 100644 index 000000000000..9e3bdf1e7270 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableAccessContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; + +/** Context describing why a table is accessed. */ +public class TableAccessContext implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Type of table access. */ + public enum AccessType { + DIRECT, + BLOB_VIEW_FALLBACK + } + + private final AccessType accessType; + @Nullable private final Identifier fallbackFrom; + + private TableAccessContext(AccessType accessType, @Nullable Identifier fallbackFrom) { + this.accessType = Objects.requireNonNull(accessType, "accessType"); + this.fallbackFrom = fallbackFrom; + } + + public static TableAccessContext direct() { + return new TableAccessContext(AccessType.DIRECT, null); + } + + public static TableAccessContext blobViewFallback(Identifier fallbackFrom) { + return new TableAccessContext( + AccessType.BLOB_VIEW_FALLBACK, + Objects.requireNonNull(fallbackFrom, "fallbackFrom")); + } + + public AccessType accessType() { + return accessType; + } + + @Nullable + public Identifier fallbackFrom() { + return fallbackFrom; + } + + public boolean isBlobViewFallback() { + return accessType == AccessType.BLOB_VIEW_FALLBACK; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index b408055e5112..18a80e5a7122 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.catalog.TableAccessContext; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.Options; @@ -149,7 +150,13 @@ public void alterTable( @Override public Table getTable(Identifier identifier) throws TableNotExistException { - Table table = wrapped.getTable(identifier); + return getTable(identifier, TableAccessContext.direct()); + } + + @Override + public Table getTable(Identifier identifier, TableAccessContext accessContext) + throws TableNotExistException { + Table table = wrapped.getTable(identifier, accessContext); if (table instanceof FileStoreTable) { return PrivilegedFileStoreTable.wrap( (FileStoreTable) table, privilegeManager.getPrivilegeChecker(), identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 7e2f6cfd2743..631876bfb14e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -29,6 +29,7 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.catalog.TableAccessContext; import org.apache.paimon.catalog.TableMetadata; import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.consumer.ConsumerInfo; @@ -307,6 +308,12 @@ public Table getTableById(String tableId) throws TableIdNotExistException { @Override public Table getTable(Identifier identifier) throws TableNotExistException { + return getTable(identifier, TableAccessContext.direct()); + } + + @Override + public Table getTable(Identifier identifier, TableAccessContext accessContext) + throws TableNotExistException { return CatalogUtils.loadTable( this, identifier, @@ -316,7 +323,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { null, null, context, - true); + true, + accessContext); } @Override @@ -614,9 +622,23 @@ public void replaceTable(Identifier identifier, Schema newSchema, boolean ignore @Override public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List select) throws TableNotExistException { + return authTableQuery(identifier, select, TableAccessContext.direct()); + } + + @Override + public TableQueryAuthResult authTableQuery( + Identifier identifier, @Nullable List select, TableAccessContext accessContext) + throws TableNotExistException { checkNotSystemTable(identifier, "authTable"); try { - AuthTableQueryResponse response = api.authTableQuery(identifier, select); + AuthTableQueryResponse response = + api.authTableQuery( + identifier, + select, + accessContext.accessType().name(), + accessContext.fallbackFrom() == null + ? null + : accessContext.fallbackFrom().getFullName()); return new TableQueryAuthResult(response.filter(), response.columnMasking()); } catch (NoSuchResourceException e) { throw new TableNotExistException(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index d65c84fd5e65..c31403fabed7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -128,6 +128,7 @@ public InnerTableRead newRead() { ? new DataEvolutionTableRead( providerFactories, schema(), + catalogEnvironment.identifier(), catalogEnvironment.catalogContext(), () -> new AppendTableRead(providerFactories, schema())) : new AppendTableRead(providerFactories, schema()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index cb40c4447e18..0e3aef4cae27 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.RenamingSnapshotCommit; import org.apache.paimon.catalog.SnapshotCommit; +import org.apache.paimon.catalog.TableAccessContext; import org.apache.paimon.catalog.TableRollback; import org.apache.paimon.operation.Lock; import org.apache.paimon.table.source.TableQueryAuth; @@ -54,6 +55,7 @@ public class CatalogEnvironment implements Serializable { @Nullable private final CatalogContext catalogContext; private final boolean supportsVersionManagement; private final boolean supportsPartitionModification; + @Nullable private final TableAccessContext tableAccessContext; public CatalogEnvironment( @Nullable Identifier identifier, @@ -64,6 +66,28 @@ public CatalogEnvironment( @Nullable CatalogContext catalogContext, boolean supportsVersionManagement, boolean supportsPartitionModification) { + this( + identifier, + uuid, + catalogLoader, + lockFactory, + lockContext, + catalogContext, + supportsVersionManagement, + supportsPartitionModification, + TableAccessContext.direct()); + } + + public CatalogEnvironment( + @Nullable Identifier identifier, + @Nullable String uuid, + @Nullable CatalogLoader catalogLoader, + @Nullable CatalogLockFactory lockFactory, + @Nullable CatalogLockContext lockContext, + @Nullable CatalogContext catalogContext, + boolean supportsVersionManagement, + boolean supportsPartitionModification, + TableAccessContext tableAccessContext) { this.identifier = identifier; this.uuid = uuid; this.catalogLoader = catalogLoader; @@ -72,6 +96,7 @@ public CatalogEnvironment( this.catalogContext = catalogContext; this.supportsVersionManagement = supportsVersionManagement; this.supportsPartitionModification = supportsPartitionModification; + this.tableAccessContext = tableAccessContext; } public static CatalogEnvironment empty() { @@ -196,6 +221,10 @@ public CatalogContext catalogContext() { return catalogContext; } + public TableAccessContext tableAccessContext() { + return tableAccessContext == null ? TableAccessContext.direct() : tableAccessContext; + } + public CatalogEnvironment copy(Identifier identifier) { return new CatalogEnvironment( identifier, @@ -205,7 +234,21 @@ public CatalogEnvironment copy(Identifier identifier) { lockContext, catalogContext, supportsVersionManagement, - supportsPartitionModification); + supportsPartitionModification, + tableAccessContext()); + } + + public CatalogEnvironment copy(TableAccessContext tableAccessContext) { + return new CatalogEnvironment( + identifier, + uuid, + catalogLoader, + lockFactory, + lockContext, + catalogContext, + supportsVersionManagement, + supportsPartitionModification, + tableAccessContext); } public TableQueryAuth tableQueryAuth(CoreOptions options) { @@ -214,7 +257,7 @@ public TableQueryAuth tableQueryAuth(CoreOptions options) { } return select -> { try (Catalog catalog = catalogLoader.load()) { - return catalog.authTableQuery(identifier, select); + return catalog.authTableQuery(identifier, select, tableAccessContext()); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java index 35aabd38184e..0ead6d0ccfdc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionTableRead.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.TableQueryAuthResult; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobView; @@ -49,15 +50,18 @@ /** A {@link TableRead} for data-evolution enabled append-only tables. */ public class DataEvolutionTableRead extends AppendTableRead { + @Nullable private final Identifier identifier; @Nullable private final CatalogContext catalogContext; @Nullable private final Supplier readFactory; public DataEvolutionTableRead( List> providerFactories, TableSchema schema, + @Nullable Identifier identifier, @Nullable CatalogContext catalogContext, @Nullable Supplier readFactory) { super(providerFactories, schema); + this.identifier = identifier; this.catalogContext = catalogContext; this.readFactory = readFactory; } @@ -129,7 +133,8 @@ private RecordReader createBlobViewReader( } BlobViewResolver resolver = - BlobViewLookup.createResolver(catalogContext, new ArrayList<>(viewStructs)); + BlobViewLookup.createResolver( + catalogContext, identifier, new ArrayList<>(viewStructs)); RecordReader reader = createDataReader(split, authResult); Set blobViewFieldSet = new HashSet<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java index f570a2bb85b6..ec2f527c4914 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BlobViewLookup.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.TableAccessContext; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.BlobViewResolver; @@ -36,6 +37,8 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,16 +66,26 @@ public class BlobViewLookup { public static BlobViewResolver createResolver( CatalogContext catalogContext, List viewStructs) { - return createResolver(catalogContext, viewStructs, CatalogFactory::createCatalog); + return createResolver(catalogContext, null, viewStructs); + } + + public static BlobViewResolver createResolver( + CatalogContext catalogContext, + @Nullable Identifier fallbackFrom, + List viewStructs) { + return createResolver( + catalogContext, fallbackFrom, viewStructs, CatalogFactory::createCatalog); } @VisibleForTesting static BlobViewResolver createResolver( CatalogContext catalogContext, + @Nullable Identifier fallbackFrom, List viewStructs, CatalogLoader catalogLoader) { + TableAccessContext accessContext = accessContext(fallbackFrom); Map cached = - preloadDescriptors(catalogContext, viewStructs, catalogLoader); + preloadDescriptors(catalogContext, viewStructs, accessContext, catalogLoader); Map cache = new HashMap<>(); return blobView -> { BlobViewStruct viewStruct = blobView.viewStruct(); @@ -90,7 +103,7 @@ static BlobViewResolver createResolver( identifier -> { try (Catalog catalog = catalogLoader.create(catalogContext)) { return UriReader.fromFile( - catalog.getTable(identifier).fileIO()); + catalog.getTable(identifier, accessContext).fileIO()); } catch (Exception e) { throw new RuntimeException(e); } @@ -99,9 +112,16 @@ static BlobViewResolver createResolver( }; } + private static TableAccessContext accessContext(@Nullable Identifier fallbackFrom) { + return fallbackFrom == null + ? TableAccessContext.direct() + : TableAccessContext.blobViewFallback(fallbackFrom); + } + private static Map preloadDescriptors( CatalogContext catalogContext, List viewStructs, + TableAccessContext accessContext, CatalogLoader catalogLoader) { if (viewStructs.isEmpty()) { return Collections.emptyMap(); @@ -110,7 +130,11 @@ private static Map preloadDescriptors( Map grouped = groupReferencesByTable(viewStructs); try { return loadReferencedDescriptors( - catalogContext, grouped.values(), PRELOAD_DESCRIPTOR_EXECUTOR, catalogLoader); + catalogContext, + grouped.values(), + accessContext, + PRELOAD_DESCRIPTOR_EXECUTOR, + catalogLoader); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Failed to preload blob descriptors.", e); @@ -151,12 +175,15 @@ private static Map groupReferencesByTable( private static Map loadReferencedDescriptors( CatalogContext catalogContext, Collection grouped, + TableAccessContext accessContext, ExecutorService executor, CatalogLoader catalogLoader) throws Exception { List plans = new ArrayList<>(grouped.size()); for (TableReferences tableReferences : grouped) { - plans.add(createTableReadPlan(catalogContext, tableReferences, catalogLoader)); + plans.add( + createTableReadPlan( + catalogContext, tableReferences, accessContext, catalogLoader)); } long targetRowsPerTask = targetRowsPerTask(plans); @@ -180,6 +207,7 @@ private static Map loadReferencedDescriptors( plan.fields, plan.readType, rangeChunk, + accessContext, catalogLoader); } finally { Thread.currentThread() @@ -206,11 +234,12 @@ private static Map loadReferencedDescriptors( private static TableReadPlan createTableReadPlan( CatalogContext catalogContext, TableReferences tableReferences, + TableAccessContext accessContext, CatalogLoader catalogLoader) throws Exception { try (Catalog catalog = catalogLoader.create(catalogContext)) { List fields = new ArrayList<>(tableReferences.referencesByField.size()); - Table table = catalog.getTable(tableReferences.identifier); + Table table = catalog.getTable(tableReferences.identifier, accessContext); for (Map.Entry> entry : tableReferences.referencesByField.entrySet()) { int fieldId = entry.getKey(); @@ -249,12 +278,13 @@ private static Map loadTableDescriptorChunk( List fields, RowType readType, List rowRanges, + TableAccessContext accessContext, CatalogLoader catalogLoader) throws Exception { try (Catalog catalog = catalogLoader.create(catalogContext)) { Map resolved = new HashMap<>(); Table table = - catalog.getTable(identifier) + catalog.getTable(identifier, accessContext) .copy( Collections.singletonMap( CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true")); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogAccessContextTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogAccessContextTest.java new file mode 100644 index 000000000000..0db56958e68a --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogAccessContextTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.ResolvingFileIO; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for table access context. */ +public class CatalogAccessContextTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testTableAccessContextPassedToQueryAuth() throws Exception { + Options catalogOptions = new Options(); + catalogOptions.set(CatalogOptions.WAREHOUSE, tempDir.toUri().toString()); + CatalogContext catalogContext = CatalogContext.create(catalogOptions); + FileIO fileIO = new ResolvingFileIO(); + fileIO.configure(catalogContext); + + TrackingCatalog catalog = + new TrackingCatalog(fileIO, new Path(tempDir.toUri().toString()), catalogContext); + Identifier tableIdentifier = Identifier.create("default", "T"); + Identifier viewIdentifier = Identifier.create("default", "ViewT"); + TableAccessContext accessContext = TableAccessContext.blobViewFallback(viewIdentifier); + + catalog.createDatabase("default", true); + catalog.createTable( + tableIdentifier, + Schema.newBuilder() + .column("id", DataTypes.INT()) + .option(CoreOptions.QUERY_AUTH_ENABLED.key(), "true") + .build(), + false); + + Table table = catalog.getTable(tableIdentifier, accessContext); + table.newReadBuilder().newScan().plan(); + + assertThat(((FileStoreTable) table).catalogEnvironment().tableAccessContext()) + .isSameAs(accessContext); + assertThat(catalog.latestAuthContext.get()).isSameAs(accessContext); + assertThat(catalog.latestAuthIdentifier.get()).isEqualTo(tableIdentifier); + } + + private static class TrackingCatalog extends FileSystemCatalog { + + private final AtomicReference latestAuthContext = + new AtomicReference<>(); + private final AtomicReference latestAuthIdentifier = new AtomicReference<>(); + + private TrackingCatalog(FileIO fileIO, Path warehouse, CatalogContext context) { + super(fileIO, warehouse, context); + } + + @Override + public CatalogLoader catalogLoader() { + return () -> this; + } + + @Override + public TableQueryAuthResult authTableQuery( + Identifier identifier, + @Nullable List select, + TableAccessContext accessContext) { + latestAuthIdentifier.set(identifier); + latestAuthContext.set(accessContext); + return null; + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java index 1b8b1f71d0c0..6b6187212ac0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTApiJsonTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.rest.requests.AlterFunctionRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.AlterViewRequest; +import org.apache.paimon.rest.requests.AuthTableQueryRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.requests.CreateFunctionRequest; import org.apache.paimon.rest.requests.CreateTableRequest; @@ -309,4 +310,24 @@ public void authTableQueryResponseParseTest() throws Exception { assertEquals(response.filter(), parseData.filter()); assertEquals(response.columnMasking(), parseData.columnMasking()); } + + @Test + public void authTableQueryRequestParseTest() throws Exception { + AuthTableQueryRequest request = + new AuthTableQueryRequest( + java.util.Arrays.asList("col1", "col2"), + "BLOB_VIEW_FALLBACK", + "db.blob_view"); + String requestStr = RESTApi.toJson(request); + AuthTableQueryRequest parseData = RESTApi.fromJson(requestStr, AuthTableQueryRequest.class); + assertEquals(request.select(), parseData.select()); + assertEquals(request.accessType(), parseData.accessType()); + assertEquals(request.fallbackFrom(), parseData.fallbackFrom()); + + AuthTableQueryRequest oldRequest = + RESTApi.fromJson("{\"select\":[\"col1\"]}", AuthTableQueryRequest.class); + assertEquals(java.util.Collections.singletonList("col1"), oldRequest.select()); + assertEquals(null, oldRequest.accessType()); + assertEquals(null, oldRequest.fallbackFrom()); + } }