Test_Consumer2.java
1.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.nutz.json.Json;
import java.util.List;
public class Test_Consumer2 {
private static void send(){
final DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("orders");
// final DefaultMQProducer producer = new DefaultMQProducer("messageTransition");
consumer.setNamesrvAddr("10.28.6.54:9876");
// producer.setInstanceName("Producer"); //ORDER_MQ_REFUNDNOTPASS
try {
consumer.subscribe("losGateway", "losOrderItems");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
boolean success=Boolean.FALSE;
for(MessageExt msg:paramList){
String topic=msg.getTopic();
String tag=msg.getTags();
if(topic.equals("losGateway")&&tag.equals("losOrderItems")){
success=true;
System.out.println(Json.toJson(msg));
}
}
if (success) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}/*finally{
consumer.shutdown();
}*/
}
public static void main(String[] args) {
send();
// DiliMQConsumerImpl consumer=new DiliMQConsumerImpl();
}
}