Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
LOG.debug("currentLogFile: " + log.getPath().toString());
if (AbstractFSWALProvider.isMetaFile(log.getPath())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip hbase:meta log file: " + log.getPath().getName());
LOG.debug("Skip {} log file: {}", conn.getMetaTableName(), log.getPath().getName());
}
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -131,7 +130,7 @@ public static void updateMetaWithFavoredNodesInfo(
puts.add(put);
}
}
try (Table table = connection.getTable(TableName.META_TABLE_NAME)) {
try (Table table = connection.getTable(connection.getMetaTableName())) {
table.put(puts);
}
LOG.info("Added " + puts.size() + " region favored nodes in META");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ private void processMetaRecord(Result result) throws IOException {
* Initialize the region assignment snapshot by scanning the hbase:meta table
*/
public void initialize() throws IOException {
LOG.info("Start to scan the hbase:meta for the current region assignment " + "snappshot");
LOG.info("Start to scan {} for the current region assignment snapshot",
connection.getMetaTableName());
// Scan hbase:meta to pick up user regions
try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
try (Table metaTable = connection.getTable(connection.getMetaTableName());
ResultScanner scanner = metaTable.getScanner(HConstants.CATALOG_FAMILY)) {
for (;;) {
Result result = scanner.next();
Expand All @@ -187,7 +188,8 @@ public void initialize() throws IOException {
}
}
}
LOG.info("Finished to scan the hbase:meta for the current region assignment" + "snapshot");
LOG.info("Finished scanning {} for the current region assignment snapshot",
connection.getMetaTableName());
}

private void addRegion(RegionInfo regionInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableReg
final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
final boolean excludeOfflinedSplitParents) {
CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
if (TableName.META_TABLE_NAME.equals(tableName)) {
if (TableName.isMetaTableName(tableName)) {
future.completeExceptionally(new IOException(
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
Expand All @@ -55,6 +56,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaTableNameRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaTableNameResponse;

/**
* Base class for rpc based connection registry implementation.
Expand Down Expand Up @@ -250,6 +253,19 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getActiveMaster");
}

@Override
public CompletableFuture<TableName> getMetaTableName() {
return tracedFuture(() -> this.<GetMetaTableNameResponse> call(
(c, s, d) -> s.getMetaTableName(c, GetMetaTableNameRequest.getDefaultInstance(), d),
GetMetaTableNameResponse::hasTableName, "getMetaTableName()").thenApply(resp -> {
if (resp.hasTableName() && !resp.getTableName().isEmpty()) {
return TableName.valueOf(resp.getTableName());
} else {
return TableName.META_TABLE_NAME;
}
}), getClass().getSimpleName() + ".getMetaTableName");
}

@Override
public void close() {
trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ public interface AsyncConnection extends Closeable {
*/
Configuration getConfiguration();

/**
* Returns the meta table name for this cluster.
* <p>
* This value is fetched from the cluster during connection establishment and cached for the
* lifetime of this connection. For most clusters, this will be "hbase:meta". For read replica
* clusters or other specialized configurations, this may return a different table name.
* <p>
* @return The meta table name for this cluster
*/
TableName getMetaTableName();

/**
* Retrieve a AsyncRegionLocator implementation to inspect region information on a table. The
* returned AsyncRegionLocator is not thread-safe, so a new instance should be created for each
Expand Down Expand Up @@ -104,6 +115,15 @@ default AsyncTable<ScanResultConsumer> getTable(TableName tableName, ExecutorSer
return getTableBuilder(tableName, pool).build();
}

/**
* Retrieve an {@link AsyncTable} implementation for accessing the meta table. This method returns
* the correct meta table for this connection
* @return An AsyncTable to use for interactions with the meta table
*/
default AsyncTable<AdvancedScanResultConsumer> getMetaTable() {
return getTable(getMetaTableName());
}

/**
* Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class AsyncConnectionImpl implements AsyncConnection {

final ConnectionRegistry registry;

private final TableName metaTableName;

protected final int rpcTimeout;

protected final RpcClient rpcClient;
Expand Down Expand Up @@ -128,14 +130,16 @@ public class AsyncConnectionImpl implements AsyncConnection {
private volatile ConnectionOverAsyncConnection conn;

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this(conf, registry, clusterId, localAddress, user, Collections.emptyMap());
TableName metaTableName, SocketAddress localAddress, User user) {
this(conf, registry, clusterId, metaTableName, localAddress, user, Collections.emptyMap());
}

public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) {
TableName metaTableName, SocketAddress localAddress, User user,
Map<String, byte[]> connectionAttributes) {
this.conf = conf;
this.user = user;
this.metaTableName = metaTableName;
this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);

if (user.isLoginFromKeytab()) {
Expand Down Expand Up @@ -219,6 +223,10 @@ public Configuration getConfiguration() {
return conf;
}

public TableName getMetaTableName() {
return metaTableName;
}

@Override
public boolean isClosed() {
return closed.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.hadoop.hbase.HConstants.NINES;
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
import static org.apache.hadoop.hbase.HConstants.ZEROES;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.createRegionLocations;
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_CACHE_INVALIDATE_INTERVAL;
Expand Down Expand Up @@ -238,14 +237,15 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
CatalogReplicaLoadBalanceSimpleSelector.class.getName());

this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory
.createSelector(replicaSelectorClass, META_TABLE_NAME, conn, () -> {
.createSelector(replicaSelectorClass, conn.getMetaTableName(), conn, () -> {
int numOfReplicas = CatalogReplicaLoadBalanceSelector.UNINITIALIZED_NUM_OF_REPLICAS;
try {
RegionLocations metaLocations = conn.registry.getMetaRegionLocations()
.get(conn.connConf.getMetaReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
numOfReplicas = metaLocations.size();
} catch (Exception e) {
LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
LOG.error("Failed to get table {}'s region replication, ", conn.getMetaTableName(),
e);
}
return numOfReplicas;
});
Expand Down Expand Up @@ -427,7 +427,7 @@ private void locateInMeta(TableName tableName, LocateRequest req) {
// do nothing
}

conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
conn.getTable(conn.getMetaTableName()).scan(scan, new AdvancedScanResultConsumer() {

private boolean completeNormally = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
Expand Down Expand Up @@ -217,7 +216,7 @@ void clearCache(TableName tableName) {
new TableSpanBuilder(conn).setName("AsyncRegionLocator.clearCache").setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
if (tableName.equals(conn.getMetaTableName())) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
CompletableFuture<List<HRegionLocation>> future = ClientMetaTableAccessor
.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME), tableName);
.getTableHRegionLocations(conn.getTable(conn.getMetaTableName()), tableName);
addListener(future, (locs, error) -> locs.forEach(loc -> {
// the cache assumes that all locations have a serverName. only add if that's true
if (loc.getServerName() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public interface Connection extends Abortable, Closeable {
/** Returns Configuration instance being used by this Connection instance. */
Configuration getConfiguration();

/**
* Returns the meta table name for this cluster.
* <p>
* This value is fetched from the cluster during connection establishment and cached for the
* lifetime of this connection. For most clusters, this will be "hbase:meta". For read replica
* clusters or other specialized configurations, this may return a different table name.
* <p>
* @return The meta table name for this cluster
*/
TableName getMetaTableName();

/**
* Retrieve a Table implementation for accessing a table. The returned Table is not thread safe, a
* new instance should be created for each using thread. This is a lightweight operation, pooling
Expand Down Expand Up @@ -95,6 +106,15 @@ default Table getTable(TableName tableName, ExecutorService pool) throws IOExcep
return getTableBuilder(tableName, pool).build();
}

/**
* Retrieve a Table implementation for accessing the meta table. This method returns the correct
* meta table for this connection (hbase:meta or hbase:meta_suffix).
* @return A Table to use for interactions with the meta table
*/
default Table getMetaTable() throws IOException {
return getTable(getMetaTableName());
}

/**
* <p>
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,29 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(URI conne
future.completeExceptionally(new IOException("clusterid came back null"));
return;
}
Class<? extends AsyncConnection> clazz = appliedConf.getClass(
HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(user.runAs((PrivilegedExceptionAction<
? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf,
registry, clusterId, null, user, connectionAttributes)));
} catch (Exception e) {
registry.close();
future.completeExceptionally(e);
}
// Fetch meta table name from registry
addListener(registry.getMetaTableName(), (metaTableName, metaError) -> {
if (metaError != null) {
registry.close();
future.completeExceptionally(metaError);
return;
}
if (metaTableName == null) {
registry.close();
future.completeExceptionally(new IOException("meta table name came back null"));
return;
}
Class<? extends AsyncConnection> clazz = appliedConf.getClass(
HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
try {
future.complete(user.runAs((PrivilegedExceptionAction<
? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf,
registry, clusterId, metaTableName, null, user, connectionAttributes)));
} catch (Exception e) {
registry.close();
future.completeExceptionally(e);
}
});
});
return future;
}, "ConnectionFactory.createAsyncConnection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public Configuration getConfiguration() {
return conn.getConfiguration();
}

@Override
public TableName getMetaTableName() {
return conn.getMetaTableName();
}

@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -48,6 +49,15 @@ public interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Get the name of the meta table for this cluster.
* <p>
* Should only be called once, similar to {@link #getClusterId()}.
* <p>
* @return CompletableFuture containing the meta table name
*/
CompletableFuture<TableName> getMetaTableName();

/**
* Return the connection string associated with this registry instance. This value is
* informational, used for annotating traces. Values returned may not be valid for establishing a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private static int checkReplicaId(int regionId) {
this.replicaId = checkReplicaId(replicaId);
this.offLine = offLine;
this.regionName = RegionInfo.createRegionName(this.tableName, this.startKey, this.regionId,
this.replicaId, !this.tableName.equals(TableName.META_TABLE_NAME));
this.replicaId, !TableName.isMetaTableName(this.tableName));
this.encodedName = RegionInfo.encodeRegionName(this.regionName);
this.hashCode = generateHashCode(this.tableName, this.startKey, this.endKey, this.regionId,
this.replicaId, this.offLine, this.regionName);
Expand Down Expand Up @@ -232,7 +232,7 @@ public boolean containsRow(byte[] row) {
/** Returns true if this region is a meta region */
@Override
public boolean isMetaRegion() {
return tableName.equals(TableName.META_TABLE_NAME);
return TableName.isMetaTableName(tableName);
}

/** Returns True if has been split and has daughters. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;

Expand Down Expand Up @@ -405,7 +404,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
AsyncAdminBuilderBase builder) {
this.connection = connection;
this.retryTimer = retryTimer;
this.metaTable = connection.getTable(META_TABLE_NAME);
this.metaTable = connection.getTable(connection.getMetaTableName());
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
Expand Down Expand Up @@ -1012,7 +1011,7 @@ List<RegionInfo>> adminCall(controller, stub,

@Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
if (tableName.equals(connection.getMetaTableName())) {
return connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
.collect(Collectors.toList()));
Expand Down Expand Up @@ -1303,7 +1302,7 @@ private CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFa
* List all region locations for the specific table.
*/
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
if (TableName.META_TABLE_NAME.equals(tableName)) {
if (connection.getMetaTableName().equals(tableName)) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
addListener(connection.registry.getMetaRegionLocations(), (metaRegions, err) -> {
if (err != null) {
Expand Down
Loading
Loading