From dfc761172fb8699a1907eece105f8f86dc2d3be0 Mon Sep 17 00:00:00 2001 From: Himanshu Gwalani Date: Thu, 8 Jan 2026 14:38:33 +0530 Subject: [PATCH] HOENIX-7669 Enhance Header and Trailer validations to gracefully handle unclosed files --- .../log/InvalidLogHeaderException.java | 30 +++++++ .../log/InvalidLogTrailerException.java | 30 +++++++ .../phoenix/replication/log/LogFile.java | 27 ++++-- .../replication/log/LogFileFormatReader.java | 19 ++-- .../replication/log/LogFileFormatWriter.java | 32 ++----- .../replication/log/LogFileHeader.java | 53 ++++------- .../replication/log/LogFileReader.java | 3 +- .../replication/log/LogFileReaderContext.java | 22 ++++- .../replication/log/LogFileTrailer.java | 59 ++++--------- .../reader/ReplicationLogProcessorTestIT.java | 5 +- .../replication/log/LogFileFormatTest.java | 87 +++++++++++++++++++ .../replication/log/LogFileWriterTest.java | 10 +++ 12 files changed, 252 insertions(+), 125 deletions(-) create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java create mode 100644 phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java new file mode 100644 index 00000000000..500b8b003fa --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.IOException; + +/** Exception thrown when a log file has an invalid header. */ +public class InvalidLogHeaderException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidLogHeaderException(String message) { + super(message); + } + +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java new file mode 100644 index 00000000000..4b528d1be91 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.log; + +import java.io.IOException; + +/** Exception thrown when a log file has an invalid trailer. */ +public class InvalidLogTrailerException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidLogTrailerException(String message) { + super(message); + } + +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java index 631146c4aa3..b1f05f4af9c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -418,21 +419,29 @@ interface Decoder { /** * Utility for determining if a file is a valid replication log file. - * @param fs The FileSystem - * @param path Path to the potential replication log file + * @param conf The Configuration + * @param fs The FileSystem + * @param path Path to the potential replication log file + * @param validateTrailer Whether to validate the trailer * @return true if the file is a valid replication log file, false otherwise * @throws IOException if an I/O problem was encountered */ - static boolean isValidLogFile(final FileSystem fs, final Path path) throws IOException { + static boolean isValidLogFile(final Configuration conf, final FileSystem fs, final Path path, + final boolean validateTrailer) throws IOException { long length = fs.getFileStatus(path).getLen(); try (FSDataInputStream in = fs.open(path)) { - if (LogFileTrailer.isValidTrailer(in, length)) { - return true; - } else { - // Not a valid trailer, do we need to do something (set a flag)? - // Fall back to checking the header. - return LogFileHeader.isValidHeader(in); + // Check if the file is too short to be a valid log file. + if (length < LogFileHeader.HEADERSIZE) { + return false; } + try (LogFileFormatReader reader = new LogFileFormatReader()) { + LogFileReaderContext context = new LogFileReaderContext(conf).setFilePath(path) + .setFileSize(length).setValidateTrailer(validateTrailer); + reader.init(context, (SeekableDataInput) in); + } catch (InvalidLogHeaderException | InvalidLogTrailerException e) { + return false; + } + return true; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java index 2350ba7c7a5..6fbdf6d6455 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java @@ -47,7 +47,6 @@ public class LogFileFormatReader implements Closeable { private ByteBuffer currentBlockBuffer; private long currentBlockDataBytes; private long currentBlockConsumedBytes; - private boolean trailerValidated; private CRC64 crc = new CRC64(); public LogFileFormatReader() { @@ -61,13 +60,13 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I this.currentBlockConsumedBytes = 0; try { readAndValidateTrailer(); - trailerValidated = true; } catch (IOException e) { + // If we are validating the trailer, we cannot proceed without it. + if (context.isValidateTrailer()) { + throw e; + } // Log warning, trailer might be missing or corrupt, proceed without it - LOG.warn( - "Failed to read or validate Log trailer for path: " - + (context != null ? context.getFilePath() : "unknown") + ". Proceeding without trailer.", - e); + LOG.warn("Failed to validate Log trailer for " + context.getFilePath() + ", proceeding", e); trailer = null; // Ensure trailer is null if reading/validation failed } this.decoder = null; @@ -78,8 +77,7 @@ public void init(LogFileReaderContext context, SeekableDataInput input) throws I private void readAndValidateTrailer() throws IOException { if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) { - throw new IOException("File size " + context.getFileSize() - + " is smaller than the fixed trailer size " + LogFileTrailer.FIXED_TRAILER_SIZE); + throw new InvalidLogTrailerException("Short file"); } LogFileTrailer ourTrailer = new LogFileTrailer(); // Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes back from end of file. @@ -337,7 +335,7 @@ private long getEndOfDataOffset() throws IOException { // Validates read counts against trailer counts if trailer was successfully read private void validateReadCounts() { - if (!trailerValidated || trailer == null) { + if (trailer == null) { return; } if (trailer.getBlockCount() != context.getBlocksRead()) { @@ -367,8 +365,7 @@ public String toString() { return "LogFileFormatReader [context=" + context + ", decoder=" + decoder + ", input=" + input + ", header=" + header + ", trailer=" + trailer + ", currentPosition=" + currentPosition + ", currentBlockBuffer=" + currentBlockBuffer + ", currentBlockUncompressedSize=" - + currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes - + ", trailerValidated=" + trailerValidated + "]"; + + currentBlockDataBytes + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]"; } LogFile.Header getHeader() { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java index 30fab50bf4c..be94364a804 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java @@ -40,8 +40,6 @@ public class LogFileFormatWriter implements Closeable { private SyncableDataOutput output; private ByteArrayOutputStream currentBlockBytes; private DataOutputStream blockDataStream; - private boolean headerWritten = false; - private boolean trailerWritten = false; private long recordCount = 0; private long blockCount = 0; private long blocksStartOffset = -1; @@ -59,15 +57,14 @@ public void init(LogFileWriterContext context, SyncableDataOutput output) throws this.currentBlockBytes = new ByteArrayOutputStream(); this.blockDataStream = new DataOutputStream(currentBlockBytes); this.encoder = context.getCodec().getEncoder(blockDataStream); + // Write header immediately when file is created + writeFileHeader(); } private void writeFileHeader() throws IOException { - if (!headerWritten) { - LogFileHeader header = new LogFileHeader(); - header.write(output); - blocksStartOffset = output.getPos(); // First block starts after header - headerWritten = true; - } + LogFileHeader header = new LogFileHeader(); + header.write(output); + blocksStartOffset = output.getPos(); // First block starts after header } public long getBlocksStartOffset() { @@ -75,13 +72,6 @@ public long getBlocksStartOffset() { } public void append(LogFile.Record record) throws IOException { - if (!headerWritten) { - // Lazily write file header - writeFileHeader(); - } - if (trailerWritten) { - throw new IOException("Cannot append record after trailer has been written"); - } if (blockDataStream == null) { startBlock(); // Start the block if needed } @@ -185,15 +175,10 @@ public long getPosition() throws IOException { @Override public void close() throws IOException { - // We use the fact we have already written the trailer as the boolean "closed" condition. - if (trailerWritten) { + if (output == null) { return; } try { - // We might be closing an empty file, handle this case correctly. - if (!headerWritten) { - writeFileHeader(); - } // Close any outstanding block. closeBlock(); // After we write the trailer we consider the file closed. @@ -206,6 +191,7 @@ public void close() throws IOException { } catch (IOException e) { LOG.error("Exception while closing LogFormatWriter", e); } + output = null; } } } @@ -215,7 +201,6 @@ private void writeTrailer() throws IOException { new LogFileTrailer().setRecordCount(recordCount).setBlockCount(blockCount) .setBlocksStartOffset(blocksStartOffset).setTrailerStartOffset(output.getPos()); trailer.write(output); - trailerWritten = true; try { output.sync(); } catch (IOException e) { @@ -227,8 +212,7 @@ private void writeTrailer() throws IOException { @Override public String toString() { return "LogFileFormatWriter [writerContext=" + context + ", currentBlockUncompressedBytes=" - + currentBlockBytes + ", headerWritten=" + headerWritten + ", trailerWritten=" - + trailerWritten + ", recordCount=" + recordCount + ", blockCount=" + blockCount + + currentBlockBytes + ", recordCount=" + recordCount + ", blockCount=" + blockCount + ", blocksStartOffset=" + blocksStartOffset + "]"; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java index c15260b3a5e..fdaced70d31 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java @@ -19,11 +19,9 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; public class LogFileHeader implements LogFile.Header { @@ -35,7 +33,7 @@ public class LogFileHeader implements LogFile.Header { /** Current minor version of the replication log format */ static final int VERSION_MINOR = 0; - static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE; + static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE; private int majorVersion = VERSION_MAJOR; private int minorVersion = VERSION_MINOR; @@ -69,18 +67,27 @@ public LogFile.Header setMinorVersion(int minorVersion) { @Override public void readFields(DataInput in) throws IOException { byte[] magic = new byte[MAGIC.length]; - in.readFully(magic); + try { + in.readFully(magic); + } catch (EOFException e) { + throw (IOException) new InvalidLogHeaderException("Short magic").initCause(e); + } if (!Arrays.equals(MAGIC, magic)) { - throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic) + throw new InvalidLogHeaderException("Bad magic. Got " + Bytes.toStringBinary(magic) + ", expected " + Bytes.toStringBinary(MAGIC)); } - majorVersion = in.readByte(); - minorVersion = in.readByte(); + try { + majorVersion = in.readByte(); + minorVersion = in.readByte(); + } catch (EOFException e) { + throw (IOException) new InvalidLogHeaderException("Short version").initCause(e); + } // Basic version check for now. We assume semver conventions where only higher major // versions may be incompatible. if (majorVersion > VERSION_MAJOR) { - throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + " minor=" - + minorVersion + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR); + throw new InvalidLogHeaderException( + "Unsupported version. Got major=" + majorVersion + " minor=" + minorVersion + + ", expected major=" + VERSION_MAJOR + " minor=" + VERSION_MINOR); } } @@ -96,32 +103,6 @@ public int getSerializedLength() { return HEADERSIZE; } - public static boolean isValidHeader(final FileSystem fs, final Path path) throws IOException { - if (fs.getFileStatus(path).getLen() < HEADERSIZE) { - return false; - } - try (FSDataInputStream in = fs.open(path)) { - return isValidHeader(in); - } - } - - public static boolean isValidHeader(FSDataInputStream in) throws IOException { - in.seek(0); - byte[] magic = new byte[MAGIC.length]; - in.readFully(magic); - if (!Arrays.equals(MAGIC, magic)) { - return false; - } - int majorVersion = in.readByte(); - in.readByte(); // minorVersion, for now we don't use it - // Basic version check for now. We assume semver conventions where only higher major - // versions may be incompatible. - if (majorVersion > VERSION_MAJOR) { - return false; - } - return true; - } - @Override public String toString() { return "LogFileHeader [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion + "]"; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java index 4e4bd4b67ba..8b24f6c0c1c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java @@ -134,7 +134,8 @@ public void close() throws IOException { throw e; } finally { closed = true; - LOG.debug("Closed LogFileReader for path {}", context.getFilePath()); + LOG.debug("Closed LogFileReader for path {}", + context != null ? context.getFilePath() : "null"); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java index 5573df75999..8fbab2e8da0 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java @@ -35,12 +35,17 @@ public class LogFileReaderContext { /** Default for skipping corrupt blocks */ public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true; + public static final String LOGFILE_VALIDATE_TRAILER = + "phoenix.replication.logfile.validate.trailer"; + public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true; + private final Configuration conf; private FileSystem fs; private Path path; private LogFileCodec codec; private long fileSize = -1; private boolean isSkipCorruptBlocks; + private boolean isValidateTrailer; private long blocksRead; private long recordsRead; private long corruptBlocksSkipped; @@ -49,6 +54,8 @@ public LogFileReaderContext(Configuration conf) { this.conf = conf; this.isSkipCorruptBlocks = conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS, DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS); + this.isValidateTrailer = + conf.getBoolean(LOGFILE_VALIDATE_TRAILER, DEFAULT_LOGFILE_VALIDATE_TRAILER); // Note: When we have multiple codec types, instantiate the appropriate type based on // configuration; this.codec = new LogFileCodec(); @@ -145,12 +152,21 @@ public LogFileReaderContext setCorruptBlocksSkipped(long value) { return this; } + public boolean isValidateTrailer() { + return isValidateTrailer; + } + + public LogFileReaderContext setValidateTrailer(boolean validateTrailer) { + this.isValidateTrailer = validateTrailer; + return this; + } + @Override public String toString() { return "LogFileReaderContext [filePath=" + path + ", fileSize=" + fileSize - + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" + codec + ", blocksRead=" - + blocksRead + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped=" - + corruptBlocksSkipped + "]"; + + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", isValidateTrailer=" + isValidateTrailer + + ", codec=" + codec + ", blocksRead=" + blocksRead + ", recordsRead=" + recordsRead + + ", corruptBlocksSkipped=" + corruptBlocksSkipped + "]"; } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java index 84433e0309c..2f77d498d89 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java @@ -19,11 +19,9 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; public class LogFileTrailer implements LogFile.Trailer { @@ -112,23 +110,31 @@ public LogFile.Trailer setTrailerStartOffset(long offset) { } public void readFixedFields(DataInput in) throws IOException { - this.recordCount = in.readLong(); - this.blockCount = in.readLong(); - this.blocksStartOffset = in.readLong(); - this.trailerStartOffset = in.readLong(); - this.majorVersion = in.readByte(); - this.minorVersion = in.readByte(); + try { + this.recordCount = in.readLong(); + this.blockCount = in.readLong(); + this.blocksStartOffset = in.readLong(); + this.trailerStartOffset = in.readLong(); + this.majorVersion = in.readByte(); + this.minorVersion = in.readByte(); + } catch (EOFException e) { + throw (IOException) new InvalidLogTrailerException("Short fixed fields").initCause(e); + } // Basic version check for now. We assume semver conventions where only higher major // versions may be incompatible. if (majorVersion > LogFileHeader.VERSION_MAJOR) { - throw new IOException("Unsupported LogFile version. Got major=" + majorVersion + " minor=" - + minorVersion + ", expected major=" + LogFileHeader.VERSION_MAJOR + " minor=" + throw new InvalidLogTrailerException("Unsupported version. Got major=" + majorVersion + + " minor=" + minorVersion + ", expected major=" + LogFileHeader.VERSION_MAJOR + " minor=" + LogFileHeader.VERSION_MINOR); } byte[] magic = new byte[LogFileHeader.MAGIC.length]; - in.readFully(magic); + try { + in.readFully(magic); + } catch (EOFException e) { + throw (IOException) new InvalidLogTrailerException("Short magic").initCause(e); + } if (!Arrays.equals(LogFileHeader.MAGIC, magic)) { - throw new IOException("Invalid LogFile magic. Got " + Bytes.toStringBinary(magic) + throw new InvalidLogTrailerException("Bad magic. Got " + Bytes.toStringBinary(magic) + ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC)); } } @@ -171,33 +177,6 @@ public int getSerializedLength() { + FIXED_TRAILER_SIZE; } - public static boolean isValidTrailer(final FileSystem fs, final Path path) throws IOException { - try (FSDataInputStream in = fs.open(path)) { - return isValidTrailer(in, fs.getFileStatus(path).getLen()); - } - } - - public static boolean isValidTrailer(FSDataInputStream in, long length) throws IOException { - long offset = length - VERSION_AND_MAGIC_SIZE; - if (offset < 0) { - return false; - } - in.seek(offset); - byte[] magic = new byte[LogFileHeader.MAGIC.length]; - in.readFully(magic); - if (!Arrays.equals(LogFileHeader.MAGIC, magic)) { - return false; - } - int majorVersion = in.readByte(); - in.readByte(); // minorVersion, for now we don't use it - // Basic version check for now. We assume semver conventions where only higher major - // versions may be incompatible. - if (majorVersion > LogFileHeader.VERSION_MAJOR) { - return false; - } - return true; - } - @Override public String toString() { return "LogFileTrailer [majorVersion=" + majorVersion + ", minorVersion=" + minorVersion diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java index ca3c4cfe971..062bef68bdd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java @@ -553,8 +553,11 @@ public void testProcessLogFileForUnClosedFile() throws Exception { writer.append(tableNameString, 1, put); writer.sync(); + // For processing of an unclosed file to work, we need to disable trailer validation + Configuration testConf = new Configuration(conf); + testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER, false); ReplicationLogProcessor spyProcessor = - Mockito.spy(new ReplicationLogProcessor(conf, testHAGroupName)); + Mockito.spy(new ReplicationLogProcessor(testConf, testHAGroupName)); // Create argument captor to capture the actual parameters passed to processReplicationLogBatch ArgumentCaptor>> mapCaptor = ArgumentCaptor.forClass(Map.class); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java index 885f1e35a82..0d473cfd658 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java @@ -294,6 +294,9 @@ public void testLogFileCorruptionTruncatedFinalBlock() throws IOException { byte[] data = writerBaos.toByteArray(); byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint); + // We truncated the final block, so the trailer is gone too. + readerContext.setValidateTrailer(false); + initLogFileReader(truncatedData); List decoded = new ArrayList<>(); @@ -339,6 +342,7 @@ public void testLogFileCorruptionMissingTrailer() throws IOException { LogFileTestUtil.SeekableByteArrayInputStream input = new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData); readerContext.setFileSize(truncatedData.length); + readerContext.setValidateTrailer(false); // This init should log a warning but succeed reader.init(readerContext, input); @@ -373,6 +377,7 @@ public void testLogFileCorruptionPartialTrailer() throws IOException { LogFileTestUtil.SeekableByteArrayInputStream input = new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData); readerContext.setFileSize(truncatedData.length); + readerContext.setValidateTrailer(false); // Init should log a warning but succeed by ignoring the trailer reader.init(readerContext, input); @@ -401,6 +406,88 @@ public void testLogFileCorruptionPartialTrailer() throws IOException { assertEquals("Records read count mismatch", totalRecords, readerContext.getRecordsRead()); } + @Test + public void testFailIfMissingHeader() throws IOException { + // Zero length file + byte[] data = new byte[0]; + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(false); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogHeaderException for zero length file"); + } catch (InvalidLogHeaderException e) { + assertTrue("Exception message should contain 'Short magic'", + e.getMessage().contains("Short magic")); + } + } + + @Test + public void testFailIfInvalidHeader() throws IOException { + initLogFileWriter(); + writer.close(); // Writes valid trailer + byte[] data = writerBaos.toByteArray(); + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(true); + data[0] = (byte) 'X'; // Corrupt the first magic byte + try { + reader.init(readerContext, input); + fail("Expected InvalidLogHeaderException for file with corrupted header magic"); + } catch (InvalidLogHeaderException e) { + assertTrue("Exception message should contain 'Bad magic'", + e.getMessage().contains("Bad magic")); + } + } + + @Test + public void testFailIfMissingTrailer() throws IOException { + initLogFileWriter(); + writeBlock(writer, "B1", 0, 5); + // Don't close the writer, simulate missing trailer + byte[] data = writerBaos.toByteArray(); + // Re-initialize reader with truncated data and trailer validation enabled + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + // Enable trailer validation + readerContext.setValidateTrailer(true); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogTrailerException when trailer is missing"); + } catch (InvalidLogTrailerException e) { + assertTrue("Exception message should contain 'Unsupported version'", + e.getMessage().contains("Unsupported version")); + } + } + + @Test + public void testFailIfInvalidTrailer() throws IOException { + initLogFileWriter(); + writeBlock(writer, "B1", 0, 5); + writer.close(); // Writes valid trailer + byte[] data = writerBaos.toByteArray(); + // Corrupt the trailer by changing the magic bytes + int trailerStartOffset = data.length - LogFileTrailer.FIXED_TRAILER_SIZE; + int magicOffset = + trailerStartOffset + LogFileTrailer.FIXED_TRAILER_SIZE - LogFileHeader.MAGIC.length; + data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte + // Re-initialize reader with corrupted trailer and trailer validation enabled + LogFileTestUtil.SeekableByteArrayInputStream input = + new LogFileTestUtil.SeekableByteArrayInputStream(data); + readerContext.setFileSize(data.length); + readerContext.setValidateTrailer(true); + try { + reader.init(readerContext, input); + fail("Expected InvalidLogTrailerException when trailer magic is corrupt"); + } catch (InvalidLogTrailerException e) { + assertTrue("Exception message should contain 'Bad magic'", + e.getMessage().contains("Bad magic")); + } + } + @Test public void testLogFileCorruptionFirstBlockChecksum() throws IOException { initLogFileWriter(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java index 36dde2fc834..95c6bb51335 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java @@ -137,6 +137,16 @@ public void testLogFileReaderEmptyFile() throws IOException { reader.close(); } + @Test + public void testHeaderWrittenImmediately() throws IOException { + // This should write header immediately + initLogFileWriter(); + // Verify file exists and has content (header should be written) + assertTrue("File should exist after init", localFs.exists(filePath)); + assertEquals("File should have header written", LogFileHeader.HEADERSIZE, writer.getLength()); + writer.close(); + } + private void initLogFileReader() throws IOException { readerContext = new LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(filePath); reader.init(readerContext);