Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/static/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3167,6 +3167,12 @@ components:
type: array
items:
type: string
accessType:
type: string
description: Type of table access, omitted for direct table access.
fallbackFrom:
type: string
description: Source table for fallback access, omitted for direct table access.
AuthTableQueryResponse:
type: object
properties:
Expand Down
22 changes: 21 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,27 @@ public void replaceTable(Identifier identifier, Schema schema) {
*/
public AuthTableQueryResponse authTableQuery(
Identifier identifier, @Nullable List<String> select) {
AuthTableQueryRequest request = new AuthTableQueryRequest(select);
return authTableQuery(identifier, select, null, null);
}

/**
* Auth table query with access context.
*
* @param identifier database name and table name.
* @param select select columns, null if select all
* @param accessType type of table access, null for direct table access
* @param fallbackFrom source table for fallback access, null for direct table access
* @return additional filter for row level access control and column masking rules
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the table not exists
* @throws ForbiddenException Exception thrown on HTTP 403 means don't have the permission for
* this table
*/
public AuthTableQueryResponse authTableQuery(
Identifier identifier,
@Nullable List<String> select,
@Nullable String accessType,
@Nullable String fallbackFrom) {
AuthTableQueryRequest request = new AuthTableQueryRequest(select, accessType, fallbackFrom);
return client.post(
resourcePaths.authTable(identifier.getDatabaseName(), identifier.getObjectName()),
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
Expand All @@ -34,19 +35,53 @@
public class AuthTableQueryRequest implements RESTRequest {

private static final String FIELD_SELECT = "select";
private static final String FIELD_ACCESS_TYPE = "accessType";
private static final String FIELD_FALLBACK_FROM = "fallbackFrom";

@JsonProperty(FIELD_SELECT)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final List<String> select;

@JsonProperty(FIELD_ACCESS_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final String accessType;

@JsonProperty(FIELD_FALLBACK_FROM)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final String fallbackFrom;

@JsonCreator
public AuthTableQueryRequest(@JsonProperty(FIELD_SELECT) @Nullable List<String> select) {
public AuthTableQueryRequest(
@JsonProperty(FIELD_SELECT) @Nullable List<String> select,
@JsonProperty(FIELD_ACCESS_TYPE) @Nullable String accessType,
@JsonProperty(FIELD_FALLBACK_FROM) @Nullable String fallbackFrom) {
this.select = select;
this.accessType = accessType;
this.fallbackFrom = fallbackFrom;
}

public AuthTableQueryRequest(@Nullable List<String> select) {
this(select, null, null);
}

@JsonGetter(FIELD_SELECT)
@Nullable
public List<String> select() {
return select;
}

@JsonGetter(FIELD_ACCESS_TYPE)
@Nullable
public String accessType() {
return accessType;
}

@JsonGetter(FIELD_FALLBACK_FROM)
@Nullable
public String fallbackFrom() {
return fallbackFrom;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,12 @@ protected void truncateTable(FileStoreTable existingTable) {

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
return getTable(identifier, TableAccessContext.direct());
}

@Override
public Table getTable(Identifier identifier, TableAccessContext accessContext)
throws TableNotExistException {
return CatalogUtils.loadTable(
this,
identifier,
Expand All @@ -601,7 +607,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
lockFactory().orElse(null),
lockContext().orElse(null),
context,
false);
false,
accessContext);
}

@Override
Expand Down
29 changes: 29 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ void alterDatabase(String name, List<PropertyChange> changes, boolean ignoreIfNo
*/
Table getTable(Identifier identifier) throws TableNotExistException;

/**
* Return a {@link Table} identified by the given {@link Identifier} with access context.
*
* @param identifier Path of the table
* @param accessContext context describing why the table is accessed
* @return The requested table
* @throws TableNotExistException if the target does not exist
*/
default Table getTable(Identifier identifier, TableAccessContext accessContext)
throws TableNotExistException {
return getTable(identifier);
}

/**
* Return a {@link Table} identified by the given tableId.
*
Expand Down Expand Up @@ -1214,6 +1227,22 @@ void alterFunction(
TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String> select)
throws TableNotExistException;

/**
* Auth table query select with access context and get the filter for row level access control
* and column masking rules.
*
* @param identifier path of the table to query
* @param select selected fields, null if select all
* @param accessContext context describing why the table is accessed
* @return additional filter for row level access control and column masking rules
* @throws TableNotExistException if the table does not exist
*/
default TableQueryAuthResult authTableQuery(
Identifier identifier, @Nullable List<String> select, TableAccessContext accessContext)
throws TableNotExistException {
return authTableQuery(identifier, select);
}

// ==================== Catalog Information ==========================

/** Catalog options for re-creating this catalog. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,31 @@ public static Table loadTable(
@Nullable CatalogContext catalogContext,
boolean isRestCatalog)
throws Catalog.TableNotExistException {
return loadTable(
catalog,
identifier,
internalFileIO,
externalFileIO,
metadataLoader,
lockFactory,
lockContext,
catalogContext,
isRestCatalog,
TableAccessContext.direct());
}

public static Table loadTable(
Catalog catalog,
Identifier identifier,
Function<Path, FileIO> internalFileIO,
Function<Path, FileIO> externalFileIO,
TableMetadata.Loader metadataLoader,
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext,
@Nullable CatalogContext catalogContext,
boolean isRestCatalog,
TableAccessContext tableAccessContext)
throws Catalog.TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
}
Expand Down Expand Up @@ -299,7 +324,8 @@ public static Table loadTable(
isRestCatalog ? null : lockContext,
catalogContext,
catalog.supportsVersionManagement(),
catalog.supportsPartitionModification());
catalog.supportsPartitionModification(),
tableAccessContext);
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return wrapped.getTable(identifier);
}

@Override
public Table getTable(Identifier identifier, TableAccessContext accessContext)
throws TableNotExistException {
return wrapped.getTable(identifier, accessContext);
}

@Override
public View getView(Identifier identifier) throws ViewNotExistException {
return wrapped.getView(identifier);
Expand Down Expand Up @@ -461,6 +467,13 @@ public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List
return wrapped.authTableQuery(identifier, select);
}

@Override
public TableQueryAuthResult authTableQuery(
Identifier identifier, @Nullable List<String> select, TableAccessContext accessContext)
throws TableNotExistException {
return wrapped.authTableQuery(identifier, select, accessContext);
}

@Override
public void repairCatalog() {
wrapped.repairCatalog();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;

/** Context describing why a table is accessed. */
public class TableAccessContext implements Serializable {

private static final long serialVersionUID = 1L;

/** Type of table access. */
public enum AccessType {
DIRECT,
BLOB_VIEW_FALLBACK
}

private final AccessType accessType;
@Nullable private final Identifier fallbackFrom;

private TableAccessContext(AccessType accessType, @Nullable Identifier fallbackFrom) {
this.accessType = Objects.requireNonNull(accessType, "accessType");
this.fallbackFrom = fallbackFrom;
}

public static TableAccessContext direct() {
return new TableAccessContext(AccessType.DIRECT, null);
}

public static TableAccessContext blobViewFallback(Identifier fallbackFrom) {
return new TableAccessContext(
AccessType.BLOB_VIEW_FALLBACK,
Objects.requireNonNull(fallbackFrom, "fallbackFrom"));
}

public AccessType accessType() {
return accessType;
}

@Nullable
public Identifier fallbackFrom() {
return fallbackFrom;
}

public boolean isBlobViewFallback() {
return accessType == AccessType.BLOB_VIEW_FALLBACK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.TableAccessContext;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -149,7 +150,13 @@ public void alterTable(

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = wrapped.getTable(identifier);
return getTable(identifier, TableAccessContext.direct());
}

@Override
public Table getTable(Identifier identifier, TableAccessContext accessContext)
throws TableNotExistException {
Table table = wrapped.getTable(identifier, accessContext);
if (table instanceof FileStoreTable) {
return PrivilegedFileStoreTable.wrap(
(FileStoreTable) table, privilegeManager.getPrivilegeChecker(), identifier);
Expand Down
26 changes: 24 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.TableAccessContext;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.catalog.TableQueryAuthResult;
import org.apache.paimon.consumer.ConsumerInfo;
Expand Down Expand Up @@ -307,6 +308,12 @@ public Table getTableById(String tableId) throws TableIdNotExistException {

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
return getTable(identifier, TableAccessContext.direct());
}

@Override
public Table getTable(Identifier identifier, TableAccessContext accessContext)
throws TableNotExistException {
return CatalogUtils.loadTable(
this,
identifier,
Expand All @@ -316,7 +323,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
null,
null,
context,
true);
true,
accessContext);
}

@Override
Expand Down Expand Up @@ -614,9 +622,23 @@ public void replaceTable(Identifier identifier, Schema newSchema, boolean ignore
@Override
public TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String> select)
throws TableNotExistException {
return authTableQuery(identifier, select, TableAccessContext.direct());
}

@Override
public TableQueryAuthResult authTableQuery(
Identifier identifier, @Nullable List<String> select, TableAccessContext accessContext)
throws TableNotExistException {
checkNotSystemTable(identifier, "authTable");
try {
AuthTableQueryResponse response = api.authTableQuery(identifier, select);
AuthTableQueryResponse response =
api.authTableQuery(
identifier,
select,
accessContext.accessType().name(),
accessContext.fallbackFrom() == null
? null
: accessContext.fallbackFrom().getFullName());
return new TableQueryAuthResult(response.filter(), response.columnMasking());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public InnerTableRead newRead() {
? new DataEvolutionTableRead(
providerFactories,
schema(),
catalogEnvironment.identifier(),
catalogEnvironment.catalogContext(),
() -> new AppendTableRead(providerFactories, schema()))
: new AppendTableRead(providerFactories, schema());
Expand Down
Loading
Loading