代码拉取完成,页面将自动刷新
package cn.enjoyedu.setmsg;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:消息的属性的控制
*/
public class ReplyToConsumer {
public static void main(String[] argv)
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.01");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
/*创建交换器*/
channel.exchangeDeclare(ReplyToProducer.EXCHANGE_NAME,
"direct",false);
/*声明一个队列*/
String queueName = "replyto";
channel.queueDeclare(queueName,false,false,
false,null);
/*绑定,将队列和交换器通过路由键进行绑定*/
String routekey = "error";/*表示只关注error级别的日志消息*/
channel.queueBind(queueName,ReplyToProducer.EXCHANGE_NAME,routekey);
System.out.println("waiting for message........");
/*声明了一个消费者*/
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
AMQP.BasicProperties respProp
= new AMQP.BasicProperties.Builder()
.replyTo(properties.getReplyTo())
//要回复的消息ID
.correlationId(properties.getMessageId())
.build();
channel.basicPublish("", respProp.getReplyTo() ,
respProp ,
("Hi,"+message).getBytes("UTF-8"));
}
};
/*消费者正式开始在指定队列上消费消息*/
channel.basicConsume(queueName,true,consumer);
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。