Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,53 @@
*/
package org.apache.rocketmq.example.quickstart;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
* This example shows how to subscribe and consume messages using providing {@link DefaultMQPushConsumer}.
* This example shows how to subscribe and consume messages using providing
* {@link DefaultMQPushConsumer}.
*/
public class Consumer {

public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";
/**
* Consumer group name.
*/
public static final String CONSUMER_GROUP = "please_rename_unique_group_name";

/**
* Default NameServer address for testing.
*/
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";

/**
* Topic to consume from.
*/
public static final String TOPIC = "TopicTest";

public static void main(String[] args) throws MQClientException {
/**
* Message tag filter expression.
*/
public static final String TAG_FILTER = "*";

private Consumer() {
}

/**
* Main method to demonstrate message consumption.
*
* @param args Command line arguments
* @throws InterruptedException if interrupted while consuming
* @throws MQClientException if consumer initialization fails
*/
public static void main(final String[] args)
throws InterruptedException, MQClientException {

/*
* Instantiate with specified consumer group name.
Expand All @@ -42,36 +73,45 @@ public static void main(String[] args) throws MQClientException {
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* Alternatively, you may specify name server addresses via exporting
* environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// Uncomment the following line while debugging
// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

/*
* Specify where to start in case the specific consumer group is a brand-new one.
* Specify where to start in case the specific consumer group is a brand-new
* one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

/*
* Subscribe one more topic to consume.
*/
consumer.subscribe(TOPIC, "*");
consumer.subscribe(TOPIC, TAG_FILTER);

/*
* Register callback to execute on arrival of messages fetched from brokers.
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(
final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/*
* Launch the consumer instance.
* Launch the consumer instance.
*/
consumer.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,97 +23,92 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* This class demonstrates how to send messages to brokers using provided {@link DefaultMQProducer}.
* This class demonstrates how to send messages to brokers using provided
* {@link DefaultMQProducer}.
*/
public class Producer {

/**
* The number of produced messages.
*/
public static final int MESSAGE_COUNT = 1000;

/**
* Send timeout in milliseconds.
*/
public static final int SEND_TIMEOUT_MILLIS = 20000;

/**
* Sleep time on error in milliseconds.
*/
public static final int ERROR_SLEEP_MILLIS = 1000;

public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static final String TAG = "TagA";

private Producer() {
}

public static void main(String[] args) throws MQClientException, InterruptedException {

/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

/*
* Specify name server addresses.
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// Uncomment the following line while debugging
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

/*
* Launch the instance.
*/
producer.start();

for (int i = 0; i < MESSAGE_COUNT; i++) {
try {
String messageBody = "Hello RocketMQ " + i;
Message msg = new Message(TOPIC, TAG,
messageBody.getBytes(RemotingHelper.DEFAULT_CHARSET));

SendResult sendResult = producer.send(msg, SEND_TIMEOUT_MILLIS);
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg, 20 * 1000);
/*
* There are different ways to send message, if you don't care about the send result,you can use this way
* There are different ways to send message, if you don't care about the send
* result,you can use this way
* {@code
* producer.sendOneway(msg);
* }
*/

/*
* if you want to get the send result in a synchronize way, you can use this send method
* if you want to get the send result in a synchronize way, you can use this
* send method
* {@code
* SendResult sendResult = producer.send(msg);
* System.out.printf("%s%n", sendResult);
* }
*/

/*
* if you want to get the send result in a asynchronize way, you can use this send method
* if you want to get the send result in a asynchronize way, you can use this
* send method
* {@code
*
* producer.send(msg, new SendCallback() {
* @Override
* public void onSuccess(SendResult sendResult) {
* // do something
* }
* producer.send(msg, new SendCallback() {
*
* @Override
* public void onSuccess(SendResult sendResult) {
* // do something
* }
*
* @Override
* public void onException(Throwable e) {
* // do something
* }
*});
* @Override
* public void onException(Throwable e) {
* // do something
* }
* });
*
*}
* }
*/

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
Thread.sleep(ERROR_SLEEP_MILLIS);
}
}

Expand Down