diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index 5edd5b3e8c92..3407189fcf6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -283,4 +283,11 @@ public int getTimeout() { * @throws IllegalStateException if this service's state isn't FAILED. */ Throwable failureCause(); + + /** + * Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs + * here. + */ + default void beforePersistingReplicationOffset() throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 4e122ef5e8b9..59a22abe256c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -31,7 +31,9 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,6 +74,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Class that handles the source of a replication stream. Currently does not handle more than 1 @@ -148,6 +151,12 @@ public class ReplicationSource implements ReplicationSourceInterface { public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; private int waitOnEndpointSeconds = -1; + public static final String SHIPPER_MONITOR_INTERVAL = + "hbase.replication.source.shipper.monitor.interval.ms"; + private static final long DEFAULT_SHIPPER_MONITOR_INTERVAL = 1 * 60 * 1000L; // 1 minute + private ScheduledExecutorService shipperMonitorExecutor; + private long monitorIntervalMs; + private Thread initThread; /** @@ -230,6 +239,16 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); + + this.monitorIntervalMs = + conf.getLong(SHIPPER_MONITOR_INTERVAL, DEFAULT_SHIPPER_MONITOR_INTERVAL); + + this.shipperMonitorExecutor = + Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() + .setNameFormat("ShipperMonitor-" + queueId).setDaemon(true).build()); + + this.shipperMonitorExecutor.scheduleAtFixedRate(this::restartDeadWorkersIfNeeded, + monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS); } private void decorateConf() { @@ -758,6 +777,10 @@ private void terminate(String reason, Exception cause, boolean clearMetrics, boo this.metrics.terminate(); } } + + if (shipperMonitorExecutor != null) { + shipperMonitorExecutor.shutdownNow(); + } } @Override @@ -866,4 +889,30 @@ public long getTotalReplicatedEdits() { long getSleepForRetries() { return sleepForRetries; } + + private void restartDeadWorkersIfNeeded() { + for (String walGroupId : workerThreads.keySet()) { + workerThreads.compute(walGroupId, (key, worker) -> { + if (worker == null) { + return null; + } + + if (!worker.isAlive() && !worker.isFinished()) { + LOG.warn("Detected dead shipper for walGroupId={}. Restarting.", walGroupId); + + try { + ReplicationSourceShipper newWorker = createNewShipper(walGroupId); + startShipper(newWorker); + return newWorker; + } catch (Exception e) { + LOG.error("Failed to restart shipper for walGroupId={}", walGroupId, e); + return worker; // keep old entry to retry later + } + } + + return worker; + }); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index d05e4fed045b..a2560c4bb422 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -74,6 +75,12 @@ public enum WorkerState { private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; private final int shipEditsTimeout; + private long accumulatedSizeSinceLastUpdate = 0L; + private long lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + private long offsetUpdateIntervalMs; + private long offsetUpdateSizeThresholdBytes; + private WALEntryBatch lastShippedBatch; + private final List entriesForCleanUpHFileRefs = new ArrayList<>(); public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source, ReplicationSourceWALReader walReader) { @@ -90,6 +97,10 @@ public ReplicationSourceShipper(Configuration conf, String walGroupId, Replicati this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); + this.offsetUpdateIntervalMs = + conf.getLong("hbase.replication.shipper.offset.update.interval.ms", Long.MAX_VALUE); + this.offsetUpdateSizeThresholdBytes = + conf.getLong("hbase.replication.shipper.offset.update.size.threshold", -1L); } @Override @@ -106,9 +117,25 @@ public final void run() { continue; } try { - WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + // check time-based offset persistence + if (shouldPersistLogPosition()) { + // Trigger offset persistence via existing retry/backoff mechanism in shipEdits() + WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush(); + if (emptyBatch != null) shipEdits(emptyBatch); + } + + long pollTimeout = getEntriesTimeout; + if (offsetUpdateIntervalMs != Long.MAX_VALUE) { + long elapsed = EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime; + long remaining = offsetUpdateIntervalMs - elapsed; + if (remaining > 0) { + pollTimeout = Math.min(getEntriesTimeout, remaining); + } + } + WALEntryBatch entryBatch = entryReader.poll(pollTimeout); LOG.debug("Shipper from source {} got entry batch from reader: {}", source.getQueueId(), entryBatch); + if (entryBatch == null) { continue; } @@ -133,6 +160,16 @@ public final void run() { } } + private WALEntryBatch createEmptyBatchForTimeBasedFlush() { + // Reuse last shipped WAL position with 0 entries + if (lastShippedBatch == null) { + return null; + } + WALEntryBatch batch = new WALEntryBatch(0, lastShippedBatch.getLastWalPath()); + batch.setLastWalPosition(lastShippedBatch.getLastWalPosition()); + return batch; + } + private void noMoreData() { if (source.isRecovered()) { LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, @@ -154,15 +191,16 @@ protected void postFinish() { private void shipEdits(WALEntryBatch entryBatch) { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; - if (entries.isEmpty()) { - updateLogPosition(entryBatch); - return; - } int currentSize = (int) entryBatch.getHeapSize(); source.getSourceMetrics() .setTimeStampNextToReplicate(entries.get(entries.size() - 1).getKey().getWriteTime()); while (isActive()) { try { + if (entries.isEmpty()) { + lastShippedBatch = entryBatch; + persistLogPosition(); + return; + } try { source.tryThrottle(currentSize); } catch (InterruptedException e) { @@ -190,13 +228,13 @@ private void shipEdits(WALEntryBatch entryBatch) { } else { sleepMultiplier = Math.max(sleepMultiplier - 1, 0); } - // Clean up hfile references - for (Entry entry : entries) { - cleanUpHFileRefs(entry.getEdit()); - LOG.trace("shipped entry {}: ", entry); + + accumulatedSizeSinceLastUpdate += currentSize; + entriesForCleanUpHFileRefs.addAll(entries); + lastShippedBatch = entryBatch; + if (shouldPersistLogPosition()) { + persistLogPosition(); } - // Log and clean up WAL logs - updateLogPosition(entryBatch); // offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size) // this sizeExcludeBulkLoad has to use same calculation that when calling @@ -215,6 +253,13 @@ private void shipEdits(WALEntryBatch entryBatch) { entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); } break; + } catch (IOException ioe) { + // Offset-Persist failure is treated as fatal to this worker since it might come from + // beforePersistingReplicationOffset. + // ReplicationSource will restart the Shipper, and WAL reading + // will resume from the last successfully persisted offset + throw new ReplicationRuntimeException( + "Failed to persist replication offset; restarting shipper for WAL replay", ioe); } catch (Exception ex) { source.getSourceMetrics().incrementFailedBatches(); LOG.warn("{} threw unknown exception:", @@ -229,6 +274,41 @@ private void shipEdits(WALEntryBatch entryBatch) { } } + private boolean shouldPersistLogPosition() { + if (accumulatedSizeSinceLastUpdate == 0 || lastShippedBatch == null) { + return false; + } + + // Default behaviour to update offset immediately after replicate() + if (offsetUpdateSizeThresholdBytes == -1 && offsetUpdateIntervalMs == Long.MAX_VALUE) { + return true; + } + + return (accumulatedSizeSinceLastUpdate >= offsetUpdateSizeThresholdBytes) + || (EnvironmentEdgeManager.currentTime() - lastOffsetUpdateTime >= offsetUpdateIntervalMs); + } + + private void persistLogPosition() throws IOException { + if (lastShippedBatch == null) { + return; + } + + ReplicationEndpoint endpoint = source.getReplicationEndpoint(); + endpoint.beforePersistingReplicationOffset(); + + // Clean up hfile references + for (Entry entry : entriesForCleanUpHFileRefs) { + cleanUpHFileRefs(entry.getEdit()); + } + entriesForCleanUpHFileRefs.clear(); + + accumulatedSizeSinceLastUpdate = 0; + lastOffsetUpdateTime = EnvironmentEdgeManager.currentTime(); + + // Log and clean up WAL logs + updateLogPosition(lastShippedBatch); + } + private void cleanUpHFileRefs(WALEdit edit) throws IOException { String peerId = source.getPeerId(); if (peerId.contains("-")) {