Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageSize;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFollowerReadFailoverProxyProvider;
import org.apache.hadoop.ozone.om.helpers.BasicOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs.Builder;
Expand All @@ -54,8 +56,14 @@
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatusLight;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.kohsuke.MetaInfServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;
import picocli.CommandLine.Option;
Expand All @@ -73,6 +81,8 @@
public class OmMetadataGenerator extends BaseFreonGenerator
implements Callable<Void> {

private static final Logger LOG = LoggerFactory.getLogger(OmMetadataGenerator.class);

@Option(names = {"-v", "--volume"},
description = "Name of the volume which contains the test data. Will be"
+ " created if missing.",
Expand Down Expand Up @@ -134,15 +144,27 @@ public class OmMetadataGenerator extends BaseFreonGenerator
)
private String omServiceID;

@Option(
names = "--clients",
description = "The number of Ozone clients used.",
defaultValue = "1")
private int clientsNo;

@Option(
names = "--enable-follower-read-affinity",
description = "Only useful when OM follower reads is enabled. " +
"If this is enabled, the clients will be initialized to only point " +
"to the OM followers in a round-robin fashion. If disabled, " +
"each client points to a random OM node, including leader.",
defaultValue = "false"
)
private boolean enableFollowerAffinity;

@Mixin
private FreonReplicationOptions replication;

private OzoneManagerProtocol ozoneManagerClient;

private ThreadLocal<OmKeyArgs.Builder> omKeyArgsBuilder;

private OzoneBucket bucket;

private OzoneClient[] ozoneClients;
private ContentGenerator contentGenerator;
private final byte[] readBuffer = new byte[4096];
private ReplicationConfig replicationConfig;
Expand All @@ -165,18 +187,39 @@ public Void call() throws Exception {
OzoneConfiguration conf = createOzoneConfiguration();
replicationConfig = replication.fromParamsOrConfig(conf);

try (OzoneClient rpcClient = createOzoneClient(omServiceID, conf)) {
ensureVolumeAndBucketExist(rpcClient, volumeName, bucketName);
ozoneManagerClient = createOmClient(conf, omServiceID);
bucket = rpcClient.getObjectStore().getVolume(volumeName)
.getBucket(bucketName);
if (clientsNo <= 0) {
clientsNo = 1;
}

try (OzoneClient ozClient = createOzoneClient(omServiceID, conf)) {
ensureVolumeAndBucketExist(ozClient, volumeName, bucketName);

ozoneClients = new OzoneClient[clientsNo];
Comment on lines +190 to +197
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description of "--clients" states that "If the number of clients exceeds number of threads N, only N clients will be created." However, this validation does not seem to be present in the code. Currently we only check if clientsNo <= 0.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated the description accordingly. Clients can exceed N number of threads for flexibility purpose.

List<String> followerOMNodeIds = null;
int currentFollowerAffinityIndex = 0;
if (enableFollowerAffinity) {
followerOMNodeIds = getOMFollowerNodeIds(ozClient);
}
for (int i = 0; i < clientsNo; i++) {
OzoneClient ozoneClient = createOzoneClient(omServiceID, conf);
if (enableFollowerAffinity) {
// Point the client proxy to the followers in a round-robin fashion
// This balances the read loads on the OM followers
String followerNodeId = followerOMNodeIds.get(currentFollowerAffinityIndex);
changeInitialProxyForFollowerRead(ozoneClient, followerNodeId);
currentFollowerAffinityIndex = (currentFollowerAffinityIndex + 1) % followerOMNodeIds.size();
LOG.info("Connected OzoneClient-{} to OM follower {}", i + 1, followerNodeId);
}
ozoneClients[i] = ozoneClient;
Comment thread
xichen01 marked this conversation as resolved.
}
runTests(this::applyOperation);
} finally {
if (ozoneManagerClient != null) {
ozoneManagerClient.close();
omKeyArgsBuilder.remove();
if (ozoneClients != null) {
IOUtils.closeQuietly(ozoneClients);
}
omKeyArgsBuilder.remove();
}

return null;
}

Expand All @@ -201,7 +244,7 @@ private void initMixedOperation() {
// if --ops is A,B,C --opsnum is 3,2,1
// so the operations will be [A, A, A, B, B, C]
// so the thread with seq id [0, 2] will execute A,
// the thread with seq id [3, 4] will execute A,
// the thread with seq id [3, 4] will execute B,
// the thread with seq id [5, 5] will execute C
operations = new Operation[getThreadNo()];
for (int i = 0; i < ops.size(); i++) {
Expand Down Expand Up @@ -311,14 +354,21 @@ private void applyOperation(long counter) throws Exception {
counter = ThreadLocalRandom.current().nextLong(getTestNo());
}
final String keyName = getPath(counter);
// Use counter instead of thread sequence ID so that a single operation can be
// executed by different clients in mixed operations scenario
final OzoneClient ozoneClient = getOzoneClient(counter);
final ClientProtocol clientProtocol = ozoneClient.getProxy();
final OzoneManagerProtocol ozoneManagerClient = clientProtocol.getOzoneManagerClient();
Comment thread
xichen01 marked this conversation as resolved.
switch (operation) {
case CREATE_KEY:
getMetrics().timer(operation.name()).time(() -> performWriteOperation(() ->
bucket.createKey(keyName, dataSize.toBytes(), replicationConfig, emptyMap()), contentGenerator));
clientProtocol.createKey(volumeName, bucketName, keyName, dataSize.toBytes(),
replicationConfig, emptyMap()), contentGenerator));
break;
case CREATE_STREAM_KEY:
getMetrics().timer(operation.name()).time(() -> performWriteOperation(() ->
bucket.createStreamKey(keyName, dataSize.toBytes(), replicationConfig, emptyMap()), contentGenerator));
clientProtocol.createStreamKey(volumeName, bucketName, keyName, dataSize.toBytes(),
replicationConfig, emptyMap()), contentGenerator));
break;
case LOOKUP_KEY:
keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
Expand All @@ -334,18 +384,22 @@ private void applyOperation(long counter) throws Exception {
getMetrics().timer(operation.name()).time(() -> ozoneManagerClient.getKeyInfo(keyArgs, false));
break;
case READ_KEY:
getMetrics().timer(operation.name()).time(() -> performReadOperation(() -> bucket.readKey(keyName), readBuffer));
getMetrics().timer(operation.name()).time(() -> performReadOperation(() ->
clientProtocol.getKey(volumeName, bucketName, keyName), readBuffer));
break;
case READ_FILE:
getMetrics().timer(operation.name()).time(() -> performReadOperation(() -> bucket.readFile(keyName), readBuffer));
getMetrics().timer(operation.name()).time(() -> performReadOperation(() ->
clientProtocol.readFile(volumeName, bucketName, keyName), readBuffer));
break;
case CREATE_FILE:
getMetrics().timer(operation.name()).time(() -> performWriteOperation(() ->
bucket.createFile(keyName, dataSize.toBytes(), replicationConfig, true, false), contentGenerator));
clientProtocol.createFile(volumeName, bucketName, keyName, dataSize.toBytes(),
replicationConfig, true, false), contentGenerator));
break;
case CREATE_STREAM_FILE:
getMetrics().timer(operation.name()).time(() -> performWriteOperation(() ->
bucket.createStreamFile(keyName, dataSize.toBytes(), replicationConfig, true, false), contentGenerator));
clientProtocol.createStreamFile(volumeName, bucketName, keyName, dataSize.toBytes(),
replicationConfig, true, false), contentGenerator));
break;
case LOOKUP_FILE:
keyArgs = omKeyArgsBuilder.get().setKeyName(keyName).build();
Expand Down Expand Up @@ -414,6 +468,11 @@ private void applyOperation(long counter) throws Exception {
}
}

private OzoneClient getOzoneClient(long counter) {
int index = (int) (counter % ozoneClients.length);
return ozoneClients[index];
}

@FunctionalInterface
interface WriteOperation {
OutputStream createStream() throws IOException;
Expand Down Expand Up @@ -464,4 +523,46 @@ enum Operation {
INFO_VOLUME,
MIXED,
}

private List<String> getOMFollowerNodeIds(OzoneClient ozoneClient) throws IOException {
List<OMRoleInfo> omRoleInfos = ozoneClient.getProxy().getOmRoleInfos();
String leaderOMNodeId = null;
List<String> followerOMNodeIds = new ArrayList<>();
for (OMRoleInfo omRoleInfo : omRoleInfos) {
if (omRoleInfo.getServerRole().equals("LEADER")) {
if (leaderOMNodeId != null) {
// This situation should be rare
throw new IllegalStateException("There are more than one leader detected, please retry again");
}
leaderOMNodeId = omRoleInfo.getNodeId();
} else if (omRoleInfo.getServerRole().equals("FOLLOWER")) {
followerOMNodeIds.add(omRoleInfo.getNodeId());
}
}
if (followerOMNodeIds.isEmpty()) {
throw new IllegalArgumentException("There is no follower in the OM service, please retry again");
}
return followerOMNodeIds;
}

private void changeInitialProxyForFollowerRead(OzoneClient ozoneClient, String omNodeId) {
OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
(OzoneManagerProtocolClientSideTranslatorPB)
ozoneClient.getProxy().getOzoneManagerClient();

OmTransport transport = ozoneManagerClient.getTransport();

if (transport instanceof Hadoop3OmTransport) {
Comment thread
xichen01 marked this conversation as resolved.
Hadoop3OmTransport hadoop3OmTransport = (Hadoop3OmTransport) transport;
HadoopRpcOMFollowerReadFailoverProxyProvider followerReadFailoverProxyProvider =
hadoop3OmTransport.getOmFollowerReadFailoverProxyProvider();

if (followerReadFailoverProxyProvider != null) {
followerReadFailoverProxyProvider.changeInitialProxyForTest(omNodeId);
}
} else {
throw new IllegalStateException("The current transport " + transport.getClass().getName() +
" does not support follower read, only Hadoop3OmTransport is supported");
}
}
}