Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi
// Filter out all delta directories that are shadowed by others
findBestWorkingDeltas(writeIdList, directory);

if(directory.getOldestBase() != null && directory.getBase() == null &&
if (directory.getOldestBase() != null && directory.getBase() == null &&
isCompactedBase(directory.getOldestBase(), fs, dirSnapshots)) {
/*
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ fs, path, getConf(), validWriteIdList, Ref.from(false), false,

// Make sure there are no leftovers below the compacted watermark
boolean success = false;
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
if (info.minOpenWriteId < 0) {
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
}

dir = AcidUtils.getAcidState(
fs, path, getConf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.service;

import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -196,9 +195,6 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());

msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(),
ImmutableMap.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));

ci.highestWriteId = tblValidWriteIds.getHighWatermark();
//this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
//it until after any data written by it are physically removed
Expand Down
115 changes: 66 additions & 49 deletions ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -256,10 +255,10 @@ protected long openTxn(TxnType txnType) throws MetaException {
rqst.setTxn_type(txnType);
if (txnType == TxnType.REPL_CREATED) {
rqst.setReplPolicy("default.*");
rqst.setReplSrcTxnIds(Arrays.asList(1L));
rqst.setReplSrcTxnIds(List.of(1L));
}
List<Long> txns = txnHandler.openTxns(rqst).getTxn_ids();
return txns.get(0);
return txns.getFirst();
}

protected long allocateWriteId(String dbName, String tblName, long txnid)
Expand All @@ -268,7 +267,7 @@ protected long allocateWriteId(String dbName, String tblName, long txnid)
= new AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(Collections.singletonList(txnid));
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
return awiResp.getTxnToWriteIds().get(0).getWriteId();
return awiResp.getTxnToWriteIds().getFirst().getWriteId();
}

protected void addDeltaFileWithTxnComponents(Table t, Partition p, int numRecords, boolean abort)
Expand All @@ -292,7 +291,7 @@ protected void acquireLock(Table t, Partition p, long txnId) throws Exception {
.setTableName(t.getTableName())
.setIsTransactional(true);
if (p != null) {
lockCompBuilder.setPartitionName(t.getPartitionKeys().get(0).getName() + "=" + p.getValues().get(0));
lockCompBuilder.setPartitionName(t.getPartitionKeys().getFirst().getName() + "=" + p.getValues().getFirst());
}
LockRequestBuilder requestBuilder = new LockRequestBuilder().setUser(null)
.setTransactionId(txnId).addLockComponent(lockCompBuilder.build());
Expand Down Expand Up @@ -338,14 +337,14 @@ protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, in
}

protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
String partValue = (p == null) ? null : p.getValues().get(0);
String partValue = (p == null) ? null : p.getValues().getFirst();
String location = getLocation(t.getTableName(), partValue);
Path dir = new Path(location);
FileSystem fs = FileSystem.get(conf);
FileStatus[] stats = fs.listStatus(dir);
List<Path> paths = new ArrayList<Path>(stats.length);
for (int i = 0; i < stats.length; i++) {
paths.add(stats[i].getPath());
List<Path> paths = new ArrayList<>(stats.length);
for (FileStatus stat : stats) {
paths.add(stat.getPath());
}
return paths;
}
Expand Down Expand Up @@ -394,13 +393,9 @@ protected void burnThroughTransactions(String dbName, String tblName, int num, S
}
}

protected void stopThread() {
stop.set(true);
}

private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
StorageDescriptor sd = new StorageDescriptor();
List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
List<FieldSchema> cols = new ArrayList<>(2);
cols.add(new FieldSchema("a", "varchar(25)", "still no comment"));
cols.add(new FieldSchema("b", "int", "comment"));
sd.setCols(cols);
Expand All @@ -411,7 +406,7 @@ private StorageDescriptor newStorageDescriptor(String location, List<Order> sort
SerDeInfo serde = new SerDeInfo();
serde.setSerializationLib(LazySimpleSerDe.class.getName());
sd.setSerdeInfo(serde);
List<String> bucketCols = new ArrayList<String>(1);
List<String> bucketCols = new ArrayList<>(1);
bucketCols.add("a");
sd.setBucketCols(bucketCols);

Expand All @@ -424,13 +419,12 @@ private StorageDescriptor newStorageDescriptor(String location, List<Order> sort
// I can't do this with @Before because I want to be able to control when the thread starts
private void runOneLoopOfCompactorThread(CompactorThreadType type) throws Exception {
TestTxnDbUtil.setConfValues(conf);
CompactorThread t;
switch (type) {
case INITIATOR: t = new Initiator(); break;
case WORKER: t = new Worker(); break;
case CLEANER: t = new Cleaner(); break;
default: throw new RuntimeException("Huh? Unknown thread type.");
}

CompactorThread t = switch (type) {
case INITIATOR -> new Initiator();
case WORKER -> new Worker();
case CLEANER -> new Cleaner();
};
t.setConf(conf);
stop.set(true);
t.init(stop);
Expand All @@ -454,7 +448,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco

private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets,
boolean allBucketsPresent, long visibilityId) throws Exception {
String partValue = (p == null) ? null : p.getValues().get(0);
String partValue = (p == null) ? null : p.getValues().getFirst();
Path location = new Path(getLocation(t.getTableName(), partValue));
String filename = null;
switch (type) {
Expand All @@ -469,15 +463,15 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco
if (bucket == 0 && !allBucketsPresent) {
continue; // skip one
}
Path partFile = null;
Path partFile;
if (type == FileType.LEGACY) {
partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0");
} else {
Path dir = new Path(location, filename);
fs.mkdirs(dir);
partFile = AcidUtils.createBucketFile(dir, bucket);
if (type == FileType.LENGTH_FILE) {
partFile = new Path(partFile.toString() + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
partFile = new Path(partFile + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
}
}
FSDataOutputStream out = fs.create(partFile);
Expand All @@ -497,9 +491,7 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numReco
static class MockInputFormat implements AcidInputFormat<WritableComparable,Text> {

@Override
public AcidInputFormat.RowReader<Text> getReader(InputSplit split,
Options options) throws
IOException {
public AcidInputFormat.RowReader<Text> getReader(InputSplit split, Options options) {
return null;
}

Expand All @@ -508,7 +500,7 @@ public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents,
ValidWriteIdList validWriteIdList,
Path baseDirectory, Path[] deltaDirectory, Map<String, Integer> deltaToAttemptId) throws IOException {

List<Path> filesToRead = new ArrayList<Path>();
List<Path> filesToRead = new ArrayList<>();
if (baseDirectory != null) {
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
Expand All @@ -521,8 +513,8 @@ public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents,

}
}
for (int i = 0; i < deltaDirectory.length; i++) {
Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
for (Path path : deltaDirectory) {
Path p = AcidUtils.createBucketFile(path, bucket);
FileSystem fs = p.getFileSystem(conf);
if (fs.exists(p)) {
filesToRead.add(p);
Expand All @@ -543,25 +535,22 @@ public RecordReader<WritableComparable, Text> getRecordReader(InputSplit inputSp
}

@Override
public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
IOException {
public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) {
return false;
}
}

static class MockRawReader implements AcidInputFormat.RawReader<Text> {
private final Stack<Path> filesToRead;
private final Configuration conf;
private FSDataInputStream is = null;
private final FileSystem fs;
private boolean lastWasDelete = true;

MockRawReader(Configuration conf, List<Path> files) throws IOException {
filesToRead = new Stack<Path>();
filesToRead = new Stack<>();
for (Path file : files) {
filesToRead.push(file);
}
this.conf = conf;
fs = FileSystem.get(conf);
}

Expand Down Expand Up @@ -599,7 +588,7 @@ public boolean next(RecordIdentifier identifier, Text text) throws IOException {
try {
identifier.readFields(is);
line = is.readLine();
} catch (EOFException e) {
} catch (EOFException ignored) {
}
if (line == null) {
// Set our current entry to null (since it's done) and try again.
Expand Down Expand Up @@ -642,8 +631,7 @@ public float getProgress() throws IOException {
static class MockOutputFormat implements AcidOutputFormat<WritableComparable, Text> {

@Override
public RecordUpdater getRecordUpdater(Path path, Options options) throws
IOException {
public RecordUpdater getRecordUpdater(Path path, Options options) {
return null;
}

Expand All @@ -656,7 +644,7 @@ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecord
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass,
boolean isCompressed, Properties tableProperties,
Progressable progress) throws IOException {
Progressable progress) {
return null;
}

Expand Down Expand Up @@ -729,31 +717,60 @@ String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
}

protected long compactInTxn(CompactionRequest rqst) throws Exception {
return compactInTxn(rqst, CommitAction.COMMIT);
}

long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws Exception {
txnHandler.compact(rqst);

FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
findNextCompactRequest.setWorkerId("fred");
findNextCompactRequest.setWorkerVersion(WORKER_VERSION);

CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest);
ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : rqst.getRunas();

long compactorTxnId = openTxn(TxnType.COMPACTION);

// Need to create a valid writeIdList to set the highestWriteId in ci
ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);
GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest();
ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)), compactorTxnId);

GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(
Collections.singletonList(ci.getFullTableName()));
writeIdsRequest.setValidTxnList(validTxnList.writeToString());
writeIdsRequest
.setFullTableNames(Collections.singletonList(TxnUtils.getFullTableName(rqst.getDbname(), rqst.getTablename())));

// with this ValidWriteIdList is capped at whatever HWM validTxnList has
ValidCompactorWriteIdList tblValidWriteIds = TxnUtils
.createValidCompactWriteIdList(txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds().get(0));
ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds()
.getFirst());

ci.highestWriteId = tblValidWriteIds.getHighWatermark();
txnHandler.updateCompactorState(ci, compactorTxnId);
txnHandler.markCompacted(ci);
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));

switch (commitAction) {
case MARK_COMPACTED ->
txnHandler.markCompacted(ci);

case COMMIT -> {
txnHandler.markCompacted(ci);
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));

Thread.sleep(MetastoreConf.getTimeVar(
conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
}
case ABORT ->
txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
}
return compactorTxnId;
}

enum CommitAction {
COMMIT,
ABORT,
MARK_COMPACTED,
NONE
}

protected static Map<String, Integer> gaugeToMap(String metric) throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.junit.jupiter.api.BeforeEach;

public class TestAbortCleanupUsingCompactionCycle extends TestCleaner {
@Override

@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.junit.jupiter.api.BeforeEach;

public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner {
@Override

@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
}

@Override
Expand Down
Loading