Test_Consumer2.java 1.76 KB
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.nutz.json.Json;

import java.util.List;
public class Test_Consumer2 {
private static void send(){
	final DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("orders");
//	 final DefaultMQProducer producer = new DefaultMQProducer("messageTransition");  
	consumer.setNamesrvAddr("10.28.6.54:9876");  
//	  producer.setInstanceName("Producer");  //ORDER_MQ_REFUNDNOTPASS
	  try {
		
		  consumer.subscribe("losGateway", "losOrderItems");
		  consumer.registerMessageListener(new MessageListenerConcurrently() {

			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
					ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
				boolean success=Boolean.FALSE;
				for(MessageExt msg:paramList){
					String topic=msg.getTopic();
					String tag=msg.getTags();
					if(topic.equals("losGateway")&&tag.equals("losOrderItems")){
						success=true;
						System.out.println(Json.toJson(msg));
					}
				}
				
				
				    if (success) {
					  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
					   }else{
						   return ConsumeConcurrentlyStatus.RECONSUME_LATER;   
					   }
					    
				
			}
			  
		});
		  consumer.start();
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}/*finally{
		consumer.shutdown();
	}*/
	
}
	public static void main(String[] args)  {
		send();
//		DiliMQConsumerImpl consumer=new DiliMQConsumerImpl();
	}

}