From 3ff495770a13e4de45ec20b0300174166edb019a Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 27 Jan 2026 21:39:40 +0200 Subject: [PATCH 1/3] Cleaner should handle retries of killed compaction attemps --- .../apache/hadoop/hive/ql/io/AcidUtils.java | 2 +- .../ql/txn/compactor/handler/TaskHandler.java | 4 +- .../hive/ql/txn/compactor/CompactorTest.java | 102 ++++++++++-------- .../hive/ql/txn/compactor/TestCleaner.java | 2 +- .../TestCleanerWithMinHistoryWriteId.java | 86 +++++++++++++++ .../hadoop/hive/metastore/txn/TxnUtils.java | 3 +- .../txn/jdbc/queries/ReadyToCleanHandler.java | 2 +- 7 files changed, 149 insertions(+), 52 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c0ee570a64ea..6ddb8ab624c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1380,7 +1380,7 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi // Filter out all delta directories that are shadowed by others findBestWorkingDeltas(writeIdList, directory); - if(directory.getOldestBase() != null && directory.getBase() == null && + if (directory.getOldestBase() != null && directory.getBase() == null && isCompactedBase(directory.getOldestBase(), fs, dirSnapshots)) { /* * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java index 0e7a11c573b4..16603f72615f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java @@ -163,7 +163,9 @@ fs, path, getConf(), validWriteIdList, Ref.from(false), false, // Make sure there are no leftovers below the compacted watermark boolean success = false; - getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); + if (info.minOpenWriteId < 0) { + getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString()); + } dir = AcidUtils.getAcidState( fs, path, getConf(), diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index ec2f5dacd75a..f49daf56be1f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -100,7 +100,6 @@ import java.util.Properties; import java.util.Set; import java.util.Stack; -import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -256,10 +255,10 @@ protected long openTxn(TxnType txnType) throws MetaException { rqst.setTxn_type(txnType); if (txnType == TxnType.REPL_CREATED) { rqst.setReplPolicy("default.*"); - rqst.setReplSrcTxnIds(Arrays.asList(1L)); + rqst.setReplSrcTxnIds(List.of(1L)); } List txns = txnHandler.openTxns(rqst).getTxn_ids(); - return txns.get(0); + return txns.getFirst(); } protected long allocateWriteId(String dbName, String tblName, long txnid) @@ -268,7 +267,7 @@ protected long allocateWriteId(String dbName, String tblName, long txnid) = new AllocateTableWriteIdsRequest(dbName, tblName); awiRqst.setTxnIds(Collections.singletonList(txnid)); AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst); - return awiResp.getTxnToWriteIds().get(0).getWriteId(); + return awiResp.getTxnToWriteIds().getFirst().getWriteId(); } protected void addDeltaFileWithTxnComponents(Table t, Partition p, int numRecords, boolean abort) @@ -292,7 +291,7 @@ protected void acquireLock(Table t, Partition p, long txnId) throws Exception { .setTableName(t.getTableName()) .setIsTransactional(true); if (p != null) { - lockCompBuilder.setPartitionName(t.getPartitionKeys().get(0).getName() + "=" + p.getValues().get(0)); + lockCompBuilder.setPartitionName(t.getPartitionKeys().getFirst().getName() + "=" + p.getValues().getFirst()); } LockRequestBuilder requestBuilder = new LockRequestBuilder().setUser(null) .setTransactionId(txnId).addLockComponent(lockCompBuilder.build()); @@ -338,14 +337,14 @@ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, in } protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception { - String partValue = (p == null) ? null : p.getValues().get(0); + String partValue = (p == null) ? null : p.getValues().getFirst(); String location = getLocation(t.getTableName(), partValue); Path dir = new Path(location); FileSystem fs = FileSystem.get(conf); FileStatus[] stats = fs.listStatus(dir); - List paths = new ArrayList(stats.length); - for (int i = 0; i < stats.length; i++) { - paths.add(stats[i].getPath()); + List paths = new ArrayList<>(stats.length); + for (FileStatus stat : stats) { + paths.add(stat.getPath()); } return paths; } @@ -394,13 +393,9 @@ protected void burnThroughTransactions(String dbName, String tblName, int num, S } } - protected void stopThread() { - stop.set(true); - } - private StorageDescriptor newStorageDescriptor(String location, List sortCols) { StorageDescriptor sd = new StorageDescriptor(); - List cols = new ArrayList(2); + List cols = new ArrayList<>(2); cols.add(new FieldSchema("a", "varchar(25)", "still no comment")); cols.add(new FieldSchema("b", "int", "comment")); sd.setCols(cols); @@ -411,7 +406,7 @@ private StorageDescriptor newStorageDescriptor(String location, List sort SerDeInfo serde = new SerDeInfo(); serde.setSerializationLib(LazySimpleSerDe.class.getName()); sd.setSerdeInfo(serde); - List bucketCols = new ArrayList(1); + List bucketCols = new ArrayList<>(1); bucketCols.add("a"); sd.setBucketCols(bucketCols); @@ -424,13 +419,14 @@ private StorageDescriptor newStorageDescriptor(String location, List sort // I can't do this with @Before because I want to be able to control when the thread starts private void runOneLoopOfCompactorThread(CompactorThreadType type) throws Exception { TestTxnDbUtil.setConfValues(conf); - CompactorThread t; - switch (type) { - case INITIATOR: t = new Initiator(); break; - case WORKER: t = new Worker(); break; - case CLEANER: t = new Cleaner(); break; - default: throw new RuntimeException("Huh? Unknown thread type."); - } + CompactorThread t = switch (type) { + case INITIATOR -> + new Initiator(); + case WORKER -> + new Worker(); + case CLEANER -> + new Cleaner(); + }; t.setConf(conf); stop.set(true); t.init(stop); @@ -454,7 +450,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, boolean allBucketsPresent, long visibilityId) throws Exception { - String partValue = (p == null) ? null : p.getValues().get(0); + String partValue = (p == null) ? null : p.getValues().getFirst(); Path location = new Path(getLocation(t.getTableName(), partValue)); String filename = null; switch (type) { @@ -469,7 +465,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco if (bucket == 0 && !allBucketsPresent) { continue; // skip one } - Path partFile = null; + Path partFile; if (type == FileType.LEGACY) { partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0"); } else { @@ -477,7 +473,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco fs.mkdirs(dir); partFile = AcidUtils.createBucketFile(dir, bucket); if (type == FileType.LENGTH_FILE) { - partFile = new Path(partFile.toString() + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + partFile = new Path(partFile + AcidUtils.DELTA_SIDE_FILE_SUFFIX); } } FSDataOutputStream out = fs.create(partFile); @@ -497,9 +493,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco static class MockInputFormat implements AcidInputFormat { @Override - public AcidInputFormat.RowReader getReader(InputSplit split, - Options options) throws - IOException { + public AcidInputFormat.RowReader getReader(InputSplit split, Options options) { return null; } @@ -508,7 +502,7 @@ public RawReader getRawReader(Configuration conf, boolean collapseEvents, ValidWriteIdList validWriteIdList, Path baseDirectory, Path[] deltaDirectory, Map deltaToAttemptId) throws IOException { - List filesToRead = new ArrayList(); + List filesToRead = new ArrayList<>(); if (baseDirectory != null) { if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) { Path p = AcidUtils.createBucketFile(baseDirectory, bucket); @@ -521,8 +515,8 @@ public RawReader getRawReader(Configuration conf, boolean collapseEvents, } } - for (int i = 0; i < deltaDirectory.length; i++) { - Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket); + for (Path path : deltaDirectory) { + Path p = AcidUtils.createBucketFile(path, bucket); FileSystem fs = p.getFileSystem(conf); if (fs.exists(p)) { filesToRead.add(p); @@ -543,25 +537,22 @@ public RecordReader getRecordReader(InputSplit inputSp } @Override - public boolean validateInput(FileSystem fs, HiveConf conf, List files) throws - IOException { + public boolean validateInput(FileSystem fs, HiveConf conf, List files) { return false; } } static class MockRawReader implements AcidInputFormat.RawReader { private final Stack filesToRead; - private final Configuration conf; private FSDataInputStream is = null; private final FileSystem fs; private boolean lastWasDelete = true; MockRawReader(Configuration conf, List files) throws IOException { - filesToRead = new Stack(); + filesToRead = new Stack<>(); for (Path file : files) { filesToRead.push(file); } - this.conf = conf; fs = FileSystem.get(conf); } @@ -599,7 +590,7 @@ public boolean next(RecordIdentifier identifier, Text text) throws IOException { try { identifier.readFields(is); line = is.readLine(); - } catch (EOFException e) { + } catch (EOFException ignored) { } if (line == null) { // Set our current entry to null (since it's done) and try again. @@ -642,8 +633,7 @@ public float getProgress() throws IOException { static class MockOutputFormat implements AcidOutputFormat { @Override - public RecordUpdater getRecordUpdater(Path path, Options options) throws - IOException { + public RecordUpdater getRecordUpdater(Path path, Options options) { return null; } @@ -656,7 +646,7 @@ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecord public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, - Progressable progress) throws IOException { + Progressable progress) { return null; } @@ -729,6 +719,10 @@ String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) { } protected long compactInTxn(CompactionRequest rqst) throws Exception { + return compactInTxn(rqst, CommitAction.COMMIT); + } + + long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exception { txnHandler.compact(rqst); FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); findNextCompactRequest.setWorkerId("fred"); @@ -740,20 +734,34 @@ protected long compactInTxn(CompactionRequest rqst) throws Exception { ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId); GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(); writeIdsRequest.setValidTxnList(validTxnList.writeToString()); - writeIdsRequest - .setFullTableNames(Collections.singletonList(TxnUtils.getFullTableName(rqst.getDbname(), rqst.getTablename()))); + writeIdsRequest.setFullTableNames( + Collections.singletonList(TxnUtils.getFullTableName(rqst.getDbname(), rqst.getTablename()))); // with this ValidWriteIdList is capped at whatever HWM validTxnList has - ValidCompactorWriteIdList tblValidWriteIds = TxnUtils - .createValidCompactWriteIdList(txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds().get(0)); + ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( + txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds().getFirst()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); txnHandler.updateCompactorState(ci, compactorTxnId); - txnHandler.markCompacted(ci); - txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); - Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); + + switch (commitAction) { + case COMMIT -> { + txnHandler.markCompacted(ci); + txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); + + Thread.sleep(MetastoreConf.getTimeVar( + conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); + } + case ABORT -> + txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId)); + } return compactorTxnId; } + enum CommitAction { + COMMIT, + ABORT, + NONE + } protected static Map gaugeToMap(String metric) throws Exception { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index ef8c928e6e8b..dd5234e279d8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -1215,7 +1215,7 @@ public void testCleanupOnConcurrentMinorCompactions() throws Exception { Assert.assertTrue(sawDelta); } - private void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception { + void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception { AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName); awiRqst.setTxnIds(Collections.singletonList(txnId)); AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java index 279f953d9be3..a7ede6b251fd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java @@ -18,9 +18,95 @@ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix; +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestCleanerWithMinHistoryWriteId extends TestCleaner { + + @BeforeEach + @Override + public void setup() throws Exception { + super.setup(); + MetastoreConf.setLongVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 0); + } + @Override protected boolean useMinHistoryWriteId() { return true; } + + @Test + public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception { + Table t = prepareTestTable(); + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + long compactTxn = compactInTxn(rqst, CommitAction.ABORT); + addBaseFile(t, null, 25L, 25, compactTxn); + + txnHandler.revokeTimedoutWorkers(1L); + compactTxn = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn); + + startCleaner(); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); + + // Check that the files are removed + List paths = getDirectories(conf, t, null); + assertEquals(1, paths.size()); + assertEquals(addVisibilitySuffix("base_25", 27), paths.getFirst().getName()); + } + + @Test + public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception { + Table t = prepareTestTable(); + CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); + long compactTxn = compactInTxn(rqst, CommitAction.NONE); + addBaseFile(t, null, 25L, 25, compactTxn); + + txnHandler.revokeTimedoutWorkers(1L); + compactTxn = compactInTxn(rqst); + addBaseFile(t, null, 25L, 25, compactTxn); + + startCleaner(); + + // Check there are no compactions requests left. + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.FAILED_RESPONSE, rsp.getCompacts().getFirst().getState()); + assertEquals("txnid:26 is open and <= hwm: 27", rsp.getCompacts().getFirst().getErrorMessage()); + + // Check that the files are not removed + List paths = getDirectories(conf, t, null); + assertEquals(6, paths.size()); + } + + private Table prepareTestTable() throws Exception { + Table t = newTable("default", "camtc", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaFile(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 25L, 25, 2); + + burnThroughTransactions("default", "camtc", 25); + return t; + } + } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index d98afb7376ac..cd56311abc05 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -88,7 +88,8 @@ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns if (isAbortCleanup) { exceptions[i] = txnId; } else { - assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWatermark; + throw new IllegalStateException( + JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + highWatermark); } } ++i; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java index 26bb2bf6d115..1e1ea51420c9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java @@ -88,7 +88,7 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw "ON \"cq1\".\"CQ_DATABASE\" = \"hwm\".\"MH_DATABASE\"" + " AND \"cq1\".\"CQ_TABLE\" = \"hwm\".\"MH_TABLE\""; - whereClause += " AND (\"CQ_HIGHEST_WRITE_ID\" < \"MIN_OPEN_WRITE_ID\" OR \"MIN_OPEN_WRITE_ID\" IS NULL)"; + whereClause += " AND (\"CQ_HIGHEST_WRITE_ID\" < \"MIN_OPEN_WRITE_ID\"-1 OR \"MIN_OPEN_WRITE_ID\" IS NULL)"; } else if (minOpenTxnWaterMark > 0) { whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)"; From 2313e423e5ca139f7328ab074a5ccbab2cdafa0e Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 28 Jan 2026 15:21:55 +0200 Subject: [PATCH 2/3] redundant setup --- .../hive/ql/txn/compactor/service/AcidCompactionService.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index cf3f0ccee024..755bd1f75c94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -196,9 +196,6 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { txnWriteIds.addTableValidWriteIdList(tblValidWriteIds); conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString()); - msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(), - ImmutableMap.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName))); - ci.highestWriteId = tblValidWriteIds.getHighWatermark(); //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about //it until after any data written by it are physically removed From adeac3764cdd39cac928d25922d53a768b0e71e5 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Wed, 28 Jan 2026 18:12:17 +0200 Subject: [PATCH 3/3] test refactor + use junit5 --- .../service/AcidCompactionService.java | 1 - .../hadoop/hive/ql/TestTxnCommands2.java | 26 +- .../hive/ql/testutil/TxnStoreHelper.java | 73 ++++ .../hive/ql/txn/compactor/CompactorTest.java | 40 +- .../TestAbortCleanupUsingCompactionCycle.java | 6 +- ...gCompactionCycleWithMinHistoryWriteId.java | 6 +- .../hive/ql/txn/compactor/TestCleaner.java | 372 ++++++++---------- .../hive/ql/txn/compactor/TestCleaner2.java | 1 + .../handler/TestAbortedTxnCleaner.java | 169 ++++---- .../ql/txn/compactor/handler/TestHandler.java | 11 +- .../hadoop/hive/metastore/HMSHandler.java | 2 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 2 +- .../hadoop/hive/metastore/txn/TxnStore.java | 2 +- 13 files changed, 394 insertions(+), 317 deletions(-) create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/testutil/TxnStoreHelper.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java index 755bd1f75c94..ab1581c836e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.txn.compactor.service; -import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index c992c5970e13..b06edd18dd0e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.scheduled.ScheduledQueryExecutionService; import org.apache.hadoop.hive.ql.schq.MockScheduledQueryService; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.apache.hadoop.hive.ql.txn.compactor.CompactorFactory; import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline; import org.apache.hadoop.hive.ql.txn.compactor.MRCompactor; @@ -99,6 +100,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.rules.ExpectedException; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; @@ -2396,9 +2398,13 @@ public void testCleanerForTxnToWriteId() throws Exception { // See MinOpenTxnIdWaterMarkFunction, OpenTxnTimeoutLowBoundaryTxnIdHandler // TODO: revisit wait logic waitUntilAllTxnFinished(); - txnMgr.openTxn(ctx, "u1"); + + long txnId = txnMgr.openTxn(ctx, "u1"); txnMgr.getValidTxns(); + TxnStoreHelper.wrap(txnHandler) + .registerMinOpenWriteId("default", Table.ACIDTBL.name(), txnId); + // Start an INSERT statement transaction and roll back this transaction. hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true); runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData5)); @@ -2437,10 +2443,24 @@ public void testCleanerForTxnToWriteId() throws Exception { // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID. txnMgr.commitTxn(); - txnMgr.openTxn(ctx, "u1"); + + // When useMinHistoryWriteId is enabled, compaction write HWM must be < min open writeId -1 to allow cleanup. + if (ConfVars.useMinHistoryWriteId()) { + txnId = txnMgr.openTxn(ctx, "u1"); + TxnStoreHelper.wrap(txnHandler) + .allocateTableWriteId("default", Table.ACIDTBL.name(), txnId); + txnMgr.commitTxn(); + } + + txnId = txnMgr.openTxn(ctx, "u1"); txnMgr.getValidTxns(); - // The txn opened after the compaction commit should not effect the Cleaner + + TxnStoreHelper.wrap(txnHandler) + .registerMinOpenWriteId("default", Table.ACIDTBL.name(), txnId); + + // Transactions initiated after a compaction commit must not affect the Cleaner. runCleaner(hiveConf); + txnHandler.cleanEmptyAbortedAndCommittedTxns(); txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/testutil/TxnStoreHelper.java b/ql/src/test/org/apache/hadoop/hive/ql/testutil/TxnStoreHelper.java new file mode 100644 index 000000000000..5e362d81d82f --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/testutil/TxnStoreHelper.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.testutil; + +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; +import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; + +import java.util.Collections; + +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars; + +public class TxnStoreHelper { + + private final TxnStore txnHandler; + + private TxnStoreHelper(TxnStore txnHandler) { + this.txnHandler = txnHandler; + } + + public static TxnStoreHelper wrap(TxnStore txnHandler) { + return new TxnStoreHelper(txnHandler); + } + + /** + * Allocates a new write ID for the table in the given transaction. + */ + public long allocateTableWriteId(String dbName, String tblName, long txnId) + throws TxnAbortedException, NoSuchTxnException, MetaException { + AllocateTableWriteIdsRequest request = new AllocateTableWriteIdsRequest(dbName, tblName.toLowerCase()); + request.setTxnIds(Collections.singletonList(txnId)); + + AllocateTableWriteIdsResponse response = txnHandler.allocateTableWriteIds(request); + return response.getTxnToWriteIds().getFirst().getWriteId(); + } + + /** + * Registers the min open write ID for the table in the given transaction. + */ + public void registerMinOpenWriteId(String dbName, String tblName, long txnId) throws MetaException { + if (!ConfVars.useMinHistoryWriteId()) { + return; + } + long maxWriteId = txnHandler.getMaxAllocatedTableWriteId( + new MaxAllocatedTableWriteIdRequest(dbName, tblName.toLowerCase())) + .getMaxWriteId(); + + txnHandler.addWriteIdsToMinHistory(txnId, + Collections.singletonMap( + TxnUtils.getFullTableName(dbName, tblName), maxWriteId + 1)); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index f49daf56be1f..3b685aa2125b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; @@ -263,11 +264,8 @@ protected long openTxn(TxnType txnType) throws MetaException { protected long allocateWriteId(String dbName, String tblName, long txnid) throws MetaException, TxnAbortedException, NoSuchTxnException { - AllocateTableWriteIdsRequest awiRqst - = new AllocateTableWriteIdsRequest(dbName, tblName); - awiRqst.setTxnIds(Collections.singletonList(txnid)); - AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst); - return awiResp.getTxnToWriteIds().getFirst().getWriteId(); + return TxnStoreHelper.wrap(txnHandler) + .allocateTableWriteId(dbName, tblName, txnid); } protected void addDeltaFileWithTxnComponents(Table t, Partition p, int numRecords, boolean abort) @@ -419,13 +417,11 @@ private StorageDescriptor newStorageDescriptor(String location, List sort // I can't do this with @Before because I want to be able to control when the thread starts private void runOneLoopOfCompactorThread(CompactorThreadType type) throws Exception { TestTxnDbUtil.setConfValues(conf); + CompactorThread t = switch (type) { - case INITIATOR -> - new Initiator(); - case WORKER -> - new Worker(); - case CLEANER -> - new Cleaner(); + case INITIATOR -> new Initiator(); + case WORKER -> new Worker(); + case CLEANER -> new Cleaner(); }; t.setConf(conf); stop.set(true); @@ -724,26 +720,37 @@ protected long compactInTxn(CompactionRequest rqst) throws Exception { long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exception { txnHandler.compact(rqst); + FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); findNextCompactRequest.setWorkerId("fred"); findNextCompactRequest.setWorkerVersion(WORKER_VERSION); + CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest); ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : rqst.getRunas(); + long compactorTxnId = openTxn(TxnType.COMPACTION); + // Need to create a valid writeIdList to set the highestWriteId in ci - ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId); - GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(); + ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( + txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)), compactorTxnId); + + GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest( + Collections.singletonList( + ci.getFullTableName().toLowerCase())); writeIdsRequest.setValidTxnList(validTxnList.writeToString()); - writeIdsRequest.setFullTableNames( - Collections.singletonList(TxnUtils.getFullTableName(rqst.getDbname(), rqst.getTablename()))); + // with this ValidWriteIdList is capped at whatever HWM validTxnList has ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( - txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds().getFirst()); + txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds() + .getFirst()); ci.highestWriteId = tblValidWriteIds.getHighWatermark(); txnHandler.updateCompactorState(ci, compactorTxnId); switch (commitAction) { + case MARK_COMPACTED -> + txnHandler.markCompacted(ci); + case COMMIT -> { txnHandler.markCompacted(ci); txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId)); @@ -760,6 +767,7 @@ long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exce enum CommitAction { COMMIT, ABORT, + MARK_COMPACTED, NONE } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java index 01625d431129..cef2e93b62ef 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.junit.jupiter.api.BeforeEach; public class TestAbortCleanupUsingCompactionCycle extends TestCleaner { - @Override + @BeforeEach + @Override public void setup() throws Exception { super.setup(); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false); + MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java index 77782534ec42..38511d692f80 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java @@ -18,14 +18,16 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.junit.jupiter.api.BeforeEach; public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner { - @Override + @BeforeEach + @Override public void setup() throws Exception { super.setup(); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false); + MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false); } @Override diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index dd5234e279d8..25e300338a21 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -18,35 +18,26 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionResponse; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.GetTableRequest; -import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest; -import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils; import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler; import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory; import org.apache.hive.common.util.ReflectionUtil; -import org.junit.Assert; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -68,6 +59,10 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME; import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar; import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; @@ -144,17 +139,19 @@ public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception { // Check retry attempts updated Optional compactionByTxnId = txnHandler.getCompactionByTxnId(compactTxn); - Assert.assertTrue("Expected compactionInfo, but got nothing returned", compactionByTxnId.isPresent()); + assertTrue(compactionByTxnId.isPresent(), "Expected compactionInfo, but got nothing returned"); CompactionInfo ci = compactionByTxnId.get(); // Check if state is still 'ready for cleaning' - Assert.assertEquals(String.format("Expected 'r' (ready for cleaning) state, but got: '%c'", ci.state), 'r', ci.state); + assertEquals('r', ci.state, + "Expected 'r' (ready for cleaning) state, but got: '%c'".formatted(ci.state)); // Check if error message was set correctly - Assert.assertEquals(String.format("Expected error message: '%s', but got '%s'", errorMessage, ci.errorMessage), - errorMessage, ci.errorMessage); + assertEquals(errorMessage, ci.errorMessage, + "Expected error message: '%s', but got '%s'".formatted(errorMessage, ci.errorMessage)); // Check if retentionTime was set correctly int cleanAttempts = (int)(Math.log(ci.retryRetention / retryRetentionTime) / Math.log(2)) + 1; - Assert.assertEquals(String.format("Expected %d clean attempts, but got %d", i, cleanAttempts), i, cleanAttempts); + assertEquals(i, cleanAttempts, + "Expected %d clean attempts, but got %d".formatted(i, cleanAttempts)); } //Do a final run to reach the maximum retry attempts, so the state finally should be set to failed @@ -167,14 +164,14 @@ public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception { cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(String.format("Expected %d CompactionInfo, but got %d", 1, scr.getCompactsSize()), - 1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); + assertEquals(1, scr.getCompactsSize(), + "Expected %d CompactionInfo, but got %d".formatted(1, scr.getCompactsSize())); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); //The state finally should be set to failed. - Assert.assertEquals(String.format("Expected '%s' state, but got '%s'", "failed", scre.getState()), - "failed", scre.getState()); - Assert.assertEquals(String.format("Expected error message: '%s', but got '%s'", errorMessage, scre.getErrorMessage()), - errorMessage, scre.getErrorMessage()); + assertEquals("failed", scre.getState(), + "Expected '%s' state, but got '%s'".formatted("failed", scre.getState())); + assertEquals(errorMessage, scre.getErrorMessage(), + "Expected error message: '%s', but got '%s'".formatted(errorMessage, scre.getErrorMessage())); } @Test @@ -225,7 +222,7 @@ public void testRetentionAfterFailedCleanup() throws Exception { cleaner.run(); - Assert.assertEquals(0, reference.get().size()); + assertEquals(0, reference.get().size()); } @Test @@ -236,7 +233,6 @@ public void cleanupAfterMajorTableCompaction() throws Exception { addDeltaFile(t, null, 21L, 22L, 2); addDeltaFile(t, null, 23L, 24L, 2); - burnThroughTransactions("default", "camtc", 25); CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); @@ -247,13 +243,13 @@ public void cleanupAfterMajorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals(addVisibilitySuffix("base_25", 26), paths.getFirst().getName()); } @Test @@ -275,13 +271,13 @@ public void cleanupAfterIOWAndMajorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals(addVisibilitySuffix("base_25", 26), paths.getFirst().getName()); } @Test @@ -296,43 +292,24 @@ public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except burnThroughTransactions("default", "camtc", 25); CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); - txnHandler.compact(rqst); - - FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); - findNextCompactRequest.setWorkerId("fred"); - findNextCompactRequest.setWorkerVersion(WORKER_VERSION); - CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest); - ci.runAs = System.getProperty("user.name"); - long compactTxn = openTxn(TxnType.COMPACTION); - - ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList( - txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)), compactTxn); - GetValidWriteIdsRequest validWriteIdsRqst = new GetValidWriteIdsRequest(Collections.singletonList(ci.getFullTableName())); - validWriteIdsRqst.setValidTxnList(validTxnList.writeToString()); - - ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList( - txnHandler.getValidWriteIds(validWriteIdsRqst).getTblValidWriteIds().get(0)); - ci.highestWriteId = tblValidWriteIds.getHighWatermark(); - txnHandler.updateCompactorState(ci, compactTxn); - - txnHandler.markCompacted(ci); + long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED); // Open a query during compaction long longQuery = openTxn(); - if (useMinHistoryWriteId()) { - allocateTableWriteId("default", "camtc", longQuery); - } + TxnStoreHelper.wrap(txnHandler) + .registerMinOpenWriteId("default", "camtc", longQuery); + txnHandler.commitTxn(new CommitTxnRequest(compactTxn)); startCleaner(); - // The long running query should prevent the cleanup + // The long-running query should prevent the cleanup ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(4, paths.size()); + assertEquals(4, paths.size()); // After the commit cleaning can proceed txnHandler.commitTxn(new CommitTxnRequest(longQuery)); @@ -340,13 +317,13 @@ public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except startCleaner(); rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals(addVisibilitySuffix("base_25", 26), paths.getFirst().getName()); } @Test @@ -369,13 +346,13 @@ public void cleanupAfterMajorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, p); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_25", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_25", paths.getFirst().getName()); } @Test @@ -396,12 +373,12 @@ public void cleanupAfterMinorTableCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path p : paths) { if (p.getName().equals("base_20")) { @@ -409,11 +386,11 @@ public void cleanupAfterMinorTableCompaction() throws Exception { } else if (p.getName().equals(makeDeltaDirName(21, 24))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + p.getName()); + fail("Unexpected file " + p.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); + assertTrue(sawBase); + assertTrue(sawDelta); } @Test @@ -436,12 +413,12 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, p); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) { @@ -449,11 +426,11 @@ public void cleanupAfterMinorPartitionCompaction() throws Exception { } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + path.getName()); + fail("Unexpected file " + path.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); + assertTrue(sawBase); + assertTrue(sawDelta); } @Test @@ -475,13 +452,13 @@ public void cleanupAfterMajorPartitionCompactionNoBase() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, p); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_25", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_25", paths.getFirst().getName()); } @Test @@ -504,7 +481,7 @@ public void droppedTable() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + assertEquals(0, rsp.getCompactsSize()); } @Test @@ -529,7 +506,7 @@ public void droppedPartition() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(0, rsp.getCompactsSize()); + assertEquals(0, rsp.getCompactsSize()); } @Test @@ -559,13 +536,13 @@ public void processCompactionCandidatesInParallel() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(10, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(10, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed for (Partition pa : partitions) { List paths = getDirectories(conf, t, pa); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) { @@ -573,11 +550,11 @@ public void processCompactionCandidatesInParallel() throws Exception { } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + path.getName()); + fail("Unexpected file " + path.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); + assertTrue(sawBase); + assertTrue(sawDelta); } } @@ -602,8 +579,8 @@ public void delayedCleanupAfterMajorCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); // putting current thread to sleep to get pass the retention time Thread.sleep(conf.getTimeVar(HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)); @@ -611,13 +588,13 @@ public void delayedCleanupAfterMajorCompaction() throws Exception { startCleaner(); // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_25", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_25", paths.getFirst().getName()); } @Test @@ -643,8 +620,8 @@ public void delayedCleanupAfterMinorCompactionOnPartition() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); // putting current thread to sleep to get pass the retention time Thread.sleep(conf.getTimeVar(HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)); @@ -653,12 +630,12 @@ public void delayedCleanupAfterMinorCompactionOnPartition() throws Exception { // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed List paths = getDirectories(conf, t, p); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) { @@ -666,11 +643,11 @@ public void delayedCleanupAfterMinorCompactionOnPartition() throws Exception { } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + path.getName()); + fail("Unexpected file " + path.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); + assertTrue(sawBase); + assertTrue(sawDelta); } @Test @@ -709,19 +686,19 @@ public void delayedCleanupAfterMinorAndMajorCompaction() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, rsp.getCompactsSize()); + assertEquals(2, rsp.getCompactsSize()); for (ShowCompactResponseElement c : rsp.getCompacts()) { if (c.getType() == CompactionType.MAJOR) { - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, c.getState()); + assertEquals(TxnStore.CLEANING_RESPONSE, c.getState()); } else { - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, c.getState()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, c.getState()); } } // Check that the files are removed List paths = getDirectories(conf, t, p); // base_20, minor delta, delta_23 and base_23 - Assert.assertEquals(4, paths.size()); + assertEquals(4, paths.size()); // putting current thread to sleep to get pass the retention time Thread.sleep(conf.getTimeVar(HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)); @@ -730,14 +707,14 @@ public void delayedCleanupAfterMinorAndMajorCompaction() throws Exception { // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); // Check that the files are removed paths = getDirectories(conf, t, p); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals(addVisibilitySuffix("base_23", 25), paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals(addVisibilitySuffix("base_23", 25), paths.getFirst().getName()); } @Test @@ -766,12 +743,13 @@ public void testReadyForCleaningPileup() throws Exception { // make sure cleaner didn't remove anything, and cleaning is still queued List paths = getDirectories(conf, t, p); - Assert.assertEquals("Expected 4 files after minor compaction, instead these files were present " + paths, - 4, paths.size()); + assertEquals(4, paths.size(), + "Expected 4 files after minor compaction, instead these files were present " + paths); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals("Expected 1 compaction in queue, got: " + rsp.getCompacts(), 1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); - Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType()); + assertEquals(1, rsp.getCompactsSize(), + "Expected 1 compaction in queue, got: " + rsp.getCompacts()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); + assertEquals(CompactionType.MINOR, rsp.getCompacts().getFirst().getType()); // major compaction addDeltaFile(t, p, 23L, 23L, 1); @@ -785,12 +763,13 @@ public void testReadyForCleaningPileup() throws Exception { // make sure cleaner didn't remove anything, and 2 cleaning are still queued paths = getDirectories(conf, t, p); - Assert.assertEquals("Expected 7 files after minor compaction, instead these files were present " + paths, - 7, paths.size()); + assertEquals(7, paths.size(), + "Expected 7 files after minor compaction, instead these files were present " + paths); rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals("Expected 2 compactions in queue, got: " + rsp.getCompacts(), 2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(2, rsp.getCompactsSize(), + "Expected 2 compactions in queue, got: " + rsp.getCompacts()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); // unblock the cleaner and run again txnHandler.commitTxn(new CommitTxnRequest(blockingTxn)); @@ -799,14 +778,14 @@ public void testReadyForCleaningPileup() throws Exception { // make sure cleaner removed everything below base_24, and both compactions are successful paths = getDirectories(conf, t, p); - Assert.assertEquals(1, paths.size()); + assertEquals(1, paths.size()); rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals("Expected 2 compactions in queue, got: " + rsp.getCompacts(), 2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(2, rsp.getCompactsSize(), + "Expected 2 compactions in queue, got: " + rsp.getCompacts()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(1).getState()); } - @Override boolean useHive130DeltaDirName() { return false; @@ -838,12 +817,12 @@ public void noCleanupAfterMajorCompaction() throws Exception { startCleaner(); // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed List paths = getDirectories(conf, t, null); - Assert.assertEquals(4, paths.size()); + assertEquals(4, paths.size()); //With no clean up false t = ms.getTable(new GetTableRequest("default", "dcamc")); @@ -855,13 +834,13 @@ public void noCleanupAfterMajorCompaction() throws Exception { startCleaner(); // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_25", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_25", paths.getFirst().getName()); } @Test @@ -886,16 +865,14 @@ public void noCleanupAfterMinorCompactionOnPartition() throws Exception { // Check there are no compactions requests left. ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are not removed List paths = getDirectories(conf, t, p); - Assert.assertEquals(4, paths.size()); + assertEquals(4, paths.size()); // compaction with no cleanup false - ArrayList list = new ArrayList<>(); - list.add("ds=today"); p = ms.getPartition("default", "dcamicop", "ds=today"); p.getParameters().put("NO_CLEANUP", "false"); ms.alter_partition("default", "dcamicop", p); @@ -907,12 +884,12 @@ public void noCleanupAfterMinorCompactionOnPartition() throws Exception { // Check there are no compactions requests left. rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); // Check that the files are removed paths = getDirectories(conf, t, p); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20")) { @@ -920,11 +897,11 @@ public void noCleanupAfterMinorCompactionOnPartition() throws Exception { } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + path.getName()); + fail("Unexpected file " + path.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); + assertTrue(sawBase); + assertTrue(sawDelta); } @Test @@ -943,8 +920,8 @@ public void withSingleBaseCleanerSucceeds() throws Exception { startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); } @Test @@ -966,12 +943,12 @@ public void withNewerBaseCleanerSucceeds() throws Exception { startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); List paths = getDirectories(conf, t, null); // we should retain both 25 and 26 - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); } @Test @@ -991,8 +968,8 @@ public void withNotYetVisibleBase() throws Exception { startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); } @Test @@ -1011,18 +988,19 @@ public void cleanMultipleTimesWithSameWatermark() throws Exception { CompactionResponse response = txnHandler.compact(rqst); - Assert.assertFalse(response.isAccepted()); - Assert.assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", response.getErrormessage()); + assertFalse(response.isAccepted()); + assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", + response.getErrormessage()); startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_22", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_22", paths.getFirst().getName()); } @Test @@ -1034,25 +1012,26 @@ public void nothingToCleanAfterAbortsBase() throws Exception { addBaseFile(t, null, 20L, 1); addDeltaFile(t, null, 21L, 21L, 2); addDeltaFile(t, null, 22L, 22L, 2); - burnThroughTransactions(dbName, tableName, 22, null, new HashSet(Arrays.asList(21L, 22L))); + burnThroughTransactions(dbName, tableName, 22, null, new HashSet<>(Arrays.asList(21L, 22L))); CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); compactInTxn(rqst); CompactionResponse response = txnHandler.compact(rqst); - Assert.assertFalse(response.isAccepted()); - Assert.assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", response.getErrormessage()); + assertFalse(response.isAccepted()); + assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", + response.getErrormessage()); startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals("base_20", paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals("base_20", paths.getFirst().getName()); } @Test @@ -1064,25 +1043,26 @@ public void nothingToCleanAfterAbortsDelta() throws Exception { addDeltaFile(t, null, 20L, 20L, 1); addDeltaFile(t, null, 21L, 21L, 2); addDeltaFile(t, null, 22L, 22L, 2); - burnThroughTransactions(dbName, tableName, 22, null, new HashSet(Arrays.asList(21L, 22L))); + burnThroughTransactions(dbName, tableName, 22, null, new HashSet<>(Arrays.asList(21L, 22L))); CompactionRequest rqst = new CompactionRequest(dbName, tableName, CompactionType.MAJOR); compactInTxn(rqst); CompactionResponse response = txnHandler.compact(rqst); - Assert.assertFalse(response.isAccepted()); - Assert.assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", response.getErrormessage()); + assertFalse(response.isAccepted()); + assertEquals("Compaction is already scheduled with state='ready for cleaning' and id=1", + response.getErrormessage()); startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); List paths = getDirectories(conf, t, null); - Assert.assertEquals(1, paths.size()); - Assert.assertEquals(makeDeltaDirName(20,20), paths.get(0).getName()); + assertEquals(1, paths.size()); + assertEquals(makeDeltaDirName(20,20), paths.getFirst().getName()); } @Test @@ -1102,9 +1082,9 @@ public void testReady() throws Exception { // block cleaner with an open txn long txnId = openTxn(); - if (useMinHistoryWriteId()) { - allocateTableWriteId(dbName, tblName, txnId); - } + TxnStoreHelper.wrap(txnHandler) + .registerMinOpenWriteId(dbName, tblName, txnId); + CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR); rqst.setPartitionname(partName); long ctxnid = compactInTxn(rqst); @@ -1113,11 +1093,12 @@ public void testReady() throws Exception { // make sure cleaner didn't remove anything, and cleaning is still queued List paths = getDirectories(conf, t, p); - Assert.assertEquals("Expected 5 files after minor compaction, instead these files were present " + paths, 5, - paths.size()); + assertEquals(5, paths.size(), + "Expected 5 files after minor compaction, instead these files were present " + paths); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals("Expected 1 compaction in queue, got: " + rsp.getCompacts(), 1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize(), + "Expected 1 compaction in queue, got: " + rsp.getCompacts()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState()); } @Test @@ -1157,9 +1138,9 @@ public void testCompactionHighWatermarkIsHonored() throws Exception { txnHandler.abortTxn(new AbortTxnRequest(openTxnId)); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(2, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); - Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); + assertEquals(2, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(1).getState()); List actualDirs = getDirectories(conf, t, p).stream() .map(Path::getName).sorted() @@ -1172,7 +1153,7 @@ public void testCompactionHighWatermarkIsHonored() throws Exception { makeDeltaDirName(23, 23), makeDeltaDirName(24, 24) ); - Assert.assertEquals("Directories do not match", expectedDirs, actualDirs); + assertEquals(expectedDirs, actualDirs, "Directories do not match"); } @Test @@ -1196,11 +1177,11 @@ public void testCleanupOnConcurrentMinorCompactions() throws Exception { startCleaner(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, rsp.getCompactsSize()); - Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState()); + assertEquals(1, rsp.getCompactsSize()); + assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState()); List paths = getDirectories(conf, t, null); - Assert.assertEquals(2, paths.size()); + assertEquals(2, paths.size()); boolean sawBase = false, sawDelta = false; for (Path path : paths) { if (path.getName().equals("base_20_v0000021")) { @@ -1208,19 +1189,10 @@ public void testCleanupOnConcurrentMinorCompactions() throws Exception { } else if (path.getName().equals(addVisibilitySuffix(makeDeltaDirNameCompacted(22, 23), 25))) { sawDelta = true; } else { - Assert.fail("Unexpected file " + path.getName()); + fail("Unexpected file " + path.getName()); } } - Assert.assertTrue(sawBase); - Assert.assertTrue(sawDelta); - } - - void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception { - AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName); - awiRqst.setTxnIds(Collections.singletonList(txnId)); - AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst); - - txnHandler.addWriteIdsToMinHistory(txnId, Collections.singletonMap(dbName + "." + tblName, - awiResp.getTxnToWriteIds().get(0).getWriteId())); + assertTrue(sawBase); + assertTrue(sawDelta); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java index fa663e35b6c2..78344a50eb86 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java @@ -21,6 +21,7 @@ * Same as TestCleaner but tests delta file names in Hive 1.3.0 format */ public class TestCleaner2 extends TestCleaner { + @Override boolean useHive130DeltaDirName() { return true; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java index 7e73d06c9ff1..d6f79214a450 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; @@ -37,20 +38,20 @@ import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest; import org.apache.hadoop.hive.ql.txn.compactor.FSRemover; import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runInitiator; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; public class TestAbortedTxnCleaner extends TestHandler { @@ -74,7 +75,7 @@ public void testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exce Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); @@ -82,7 +83,7 @@ public void testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exce List directories = getDirectories(conf, t, null); // All aborted directories removed, hence 1 committed delta directory must be present - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); } @Test @@ -105,7 +106,7 @@ public void testCleaningOfAbortedDirectoriesForSinglePartition() throws Exceptio Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); @@ -113,7 +114,7 @@ public void testCleaningOfAbortedDirectoriesForSinglePartition() throws Exceptio List directories = getDirectories(conf, t, p); // All aborted directories removed, hence 1 committed delta directory must be present - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); } @Test @@ -143,7 +144,7 @@ public void testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Excep Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(2)).clean(any(CleanupRequest.class)); @@ -151,11 +152,11 @@ public void testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Excep List directories = getDirectories(conf, t, p1); // All aborted directories removed, hence 1 committed delta directory must be present - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); directories = getDirectories(conf, t, p2); // All aborted directories removed, hence 1 committed delta directory must be present - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); } @Test @@ -186,7 +187,7 @@ public void testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); @@ -195,7 +196,7 @@ public void testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws List directories = getDirectories(conf, t, null); // All aborted directories below min open write ID are removed, // hence 1 open, 1 committed, 1 aborted delta directory must be present - Assert.assertEquals(3, directories.size()); + assertEquals(3, directories.size()); // Commit the long open txn txnHandler.commitTxn(new CommitTxnRequest(openTxnId)); @@ -221,8 +222,8 @@ public void testCleaningOfAbortedDirectoriesOnTopOfBase() throws Exception { // Check if there is a one base file List directories = getDirectories(conf, t, null); // Both base and delta files are present since we haven't cleaned yet. - Assert.assertEquals(5, directories.size()); - Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); + assertEquals(5, directories.size()); + assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); // 3 aborted deltas addDeltaFileWithTxnComponents(t, null, 2, true); @@ -237,15 +238,15 @@ public void testCleaningOfAbortedDirectoriesOnTopOfBase() throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); directories = getDirectories(conf, t, null); - Assert.assertEquals(1, directories.size()); - Assert.assertTrue(directories.get(0).getName().startsWith(AcidUtils.BASE_PREFIX)); + assertEquals(1, directories.size()); + assertTrue(directories.getFirst().getName().startsWith(AcidUtils.BASE_PREFIX)); } @Test @@ -268,8 +269,8 @@ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception { // Check if there is a one base file List directories = getDirectories(conf, t, null); // Both base and delta files are present since we haven't cleaned yet. - Assert.assertEquals(5, directories.size()); - Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); + assertEquals(5, directories.size()); + assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count()); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); MetadataCache metadataCache = new MetadataCache(true); @@ -279,7 +280,7 @@ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); @@ -287,7 +288,7 @@ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception { directories = getDirectories(conf, t, null); // The table is already compacted, so we must see 1 base delta - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); } @Test @@ -326,11 +327,11 @@ public void testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); List directories = getDirectories(conf, t, null); - Assert.assertEquals(5, directories.size()); + assertEquals(5, directories.size()); } @ParameterizedTest @@ -346,7 +347,7 @@ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartit addDeltaFileWithTxnComponents(t, p, 2, false); addDeltaFileWithTxnComponents(t, p, 2, true); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true); + MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); MetadataCache metadataCache = new MetadataCache(true); FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache)); @@ -358,28 +359,28 @@ public void testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartit String compactionQueuePresence = "SELECT COUNT(*) FROM \"COMPACTION_QUEUE\" " + " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL"); - Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); + assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks(any(HiveConf.class)); - Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); - Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " + + assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, compactionQueuePresence)); + assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " + " WHERE \"CC_DATABASE\" = '" + dbName+ "' AND \"CC_TABLE\" = '" + tableName + "' AND \"CC_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL"))); - Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " + + assertEquals(1, TestTxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\" " + " WHERE \"CTC_DATABASE\" = '" + dbName+ "' AND \"CTC_TABLE\" = '" + tableName + "' AND \"CTC_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL"))); List directories = getDirectories(conf, t, null); // All aborted directories removed, hence 1 committed delta directory must be present - Assert.assertEquals(1, directories.size()); + assertEquals(1, directories.size()); } @ParameterizedTest @@ -409,21 +410,21 @@ public void testRetryEntryOnFailures(boolean isPartitioned) throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedTxnHandler, Mockito.times(1)).setCleanerRetryRetentionTimeOnError(any(CompactionInfo.class)); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), + assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); } @@ -445,7 +446,7 @@ public void testRetryInfoBeingUsed(boolean isPartitioned) throws Exception { long retryRetentionTime = 10000; HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); + MetastoreConf.setTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); MetadataCache metadataCache = new MetadataCache(true); FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache)); TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache, @@ -458,20 +459,20 @@ public void testRetryInfoBeingUsed(boolean isPartitioned) throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); // Delay for time specified in retry retention. @@ -482,11 +483,11 @@ public void testRetryInfoBeingUsed(boolean isPartitioned) throws Exception { cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); - // The retry record must be not present since it will deleted due to successful abort cleanup. - Assert.assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize()); + // The retry record must be not present since it would be deleted due to successful abort cleanup. + assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize()); } @ParameterizedTest @@ -515,20 +516,20 @@ public void testRetryWithinRetentionTime(boolean isPartitioned) throws Exception Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), + assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); @@ -537,18 +538,18 @@ public void testRetryWithinRetentionTime(boolean isPartitioned) throws Exception cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); // The retry entry is not removed since retry conditions are not achieved hence its not picked for cleanup. scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); - Assert.assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), + assertEquals(Long.toString(MetastoreConf.getTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); } @@ -570,7 +571,7 @@ public void testRetryUpdateRetentionTimeWhenFailedTwice(boolean isPartitioned) t long retryRetentionTime = 10000; HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); + MetastoreConf.setTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); MetadataCache metadataCache = new MetadataCache(true); FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache)); TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache, @@ -583,20 +584,20 @@ public void testRetryUpdateRetentionTimeWhenFailedTwice(boolean isPartitioned) t Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); // Delay for time specified in retry retention. @@ -605,18 +606,18 @@ public void testRetryUpdateRetentionTimeWhenFailedTwice(boolean isPartitioned) t cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); // The retry entry must reflect double the retention time now. - Assert.assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); } @@ -637,7 +638,7 @@ public void testRetryUpdateErrorMessageWhenFailedTwice(boolean isPartitioned) th long retryRetentionTime = 10000; HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); + MetastoreConf.setTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, retryRetentionTime, TimeUnit.MILLISECONDS); MetadataCache metadataCache = new MetadataCache(true); FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache)); TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache, @@ -650,20 +651,20 @@ public void testRetryUpdateErrorMessageWhenFailedTwice(boolean isPartitioned) th Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing first retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Long.toString(retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); // Delay for time specified in retry retention. @@ -676,18 +677,18 @@ public void testRetryUpdateErrorMessageWhenFailedTwice(boolean isPartitioned) th cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing second retry")); // The retry entry must reflect double the retention time now. - Assert.assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Long.toString(2 * retryRetentionTime), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); } @@ -705,7 +706,7 @@ public void testZeroRetryRetentionTimeForAbortCleanup(boolean isPartitioned) thr addDeltaFileWithTxnComponents(t, p, 2, false); HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0); - MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 0, TimeUnit.MILLISECONDS); + MetastoreConf.setTimeVar(conf, ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 0, TimeUnit.MILLISECONDS); MetadataCache metadataCache = new MetadataCache(true); FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache)); TaskHandler taskHandler = new AbortedTxnCleaner(conf, txnHandler, metadataCache, @@ -718,20 +719,20 @@ public void testZeroRetryRetentionTimeForAbortCleanup(boolean isPartitioned) thr Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest()); - Assert.assertEquals(1, scr.getCompactsSize()); - ShowCompactResponseElement scre = scr.getCompacts().get(0); - Assert.assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) + assertEquals(1, scr.getCompactsSize()); + ShowCompactResponseElement scre = scr.getCompacts().getFirst(); + assertTrue(scre.getDbname().equals(dbName) && scre.getTablename().equals(tableName) && (isPartitioned ? scre.getPartitionname().equals("ds=" + partName) : scre.getPartitionname() == null) && "ready for cleaning".equalsIgnoreCase(scre.getState()) && scre.getType() == CompactionType.ABORT_TXN_CLEANUP && scre.getErrorMessage().equalsIgnoreCase("Testing retry")); String whereClause = " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" + tableName + "' AND \"CQ_PARTITION\"" + (isPartitioned ? " = 'ds=" + partName + "'" : " IS NULL") + " AND \"CQ_TYPE\" = 'c' AND \"CQ_STATE\" = 'r'"; String retryRetentionQuery = "SELECT \"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" " + whereClause; - Assert.assertEquals(Integer.toString(0), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) + assertEquals(Integer.toString(0), TestTxnDbUtil.queryToString(conf, retryRetentionQuery, false) .replace("\n", "").trim()); Mockito.doAnswer(InvocationOnMock::callRealMethod).when(mockedFSRemover).clean(any()); @@ -739,10 +740,10 @@ public void testZeroRetryRetentionTimeForAbortCleanup(boolean isPartitioned) thr cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(taskHandler)); + cleaner.setCleanupHandlers(List.of(taskHandler)); cleaner.run(); // The retry entry should be removed since retry conditions are achieved because retry retention time is 0. - Assert.assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize()); + assertEquals(0, txnHandler.showCompact(new ShowCompactRequest()).getCompactsSize()); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java index 1279cd6cc180..4c482914a76f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java @@ -32,15 +32,14 @@ import org.apache.hadoop.hive.ql.txn.compactor.FSRemover; import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache; import org.apache.hadoop.hive.ql.txn.compactor.TestCleaner; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -69,7 +68,7 @@ public void testCompactionHandlerAndFsRemover() throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(stop); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class)); @@ -100,12 +99,12 @@ public void testMetaCache() throws Exception { Cleaner cleaner = new Cleaner(); cleaner.setConf(conf); cleaner.init(new AtomicBoolean(true)); - cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler)); + cleaner.setCleanupHandlers(List.of(mockedTaskHandler)); cleaner.run(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); + assertEquals(1, compacts.size()); Mockito.verify(mockedMetadataCache, times(4)).computeIfAbsent(any(), any()); Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any()); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java index 31af4c9efcd5..b81ee8739ad8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java @@ -7689,7 +7689,7 @@ public AllocateTableWriteIdsResponse allocate_table_write_ids( @Override public MaxAllocatedTableWriteIdResponse get_max_allocated_table_write_id(MaxAllocatedTableWriteIdRequest rqst) throws MetaException { - return getTxnHandler().getMaxAllocatedTableWrited(rqst); + return getTxnHandler().getMaxAllocatedTableWriteId(rqst); } @Override diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5c58be0d6e3a..34b29c8e1c56 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -690,7 +690,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } @Override - public MaxAllocatedTableWriteIdResponse getMaxAllocatedTableWrited(MaxAllocatedTableWriteIdRequest rqst) throws MetaException { + public MaxAllocatedTableWriteIdResponse getMaxAllocatedTableWriteId(MaxAllocatedTableWriteIdRequest rqst) throws MetaException { return jdbcResource.execute(new GetMaxAllocatedTableWriteIdHandler(rqst)); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 8092768b91d8..58eafb35e45f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -345,7 +345,7 @@ AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest */ @SqlRetry @Transactional(POOL_TX) - MaxAllocatedTableWriteIdResponse getMaxAllocatedTableWrited(MaxAllocatedTableWriteIdRequest rqst) + MaxAllocatedTableWriteIdResponse getMaxAllocatedTableWriteId(MaxAllocatedTableWriteIdRequest rqst) throws MetaException; /**