1 Star 0 Fork 0

bruceyuan10/rabbitmqstudy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ReplyToConsumer.java 2.28 KB
一键复制 编辑 原始数据 按行查看 历史
bruceyuan10 提交于 4年前 . rabbitmq
package cn.enjoyedu.setmsg;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*类说明:消息的属性的控制
*/
public class ReplyToConsumer {
public static void main(String[] argv)
throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.01");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
/*创建交换器*/
channel.exchangeDeclare(ReplyToProducer.EXCHANGE_NAME,
"direct",false);
/*声明一个队列*/
String queueName = "replyto";
channel.queueDeclare(queueName,false,false,
false,null);
/*绑定,将队列和交换器通过路由键进行绑定*/
String routekey = "error";/*表示只关注error级别的日志消息*/
channel.queueBind(queueName,ReplyToProducer.EXCHANGE_NAME,routekey);
System.out.println("waiting for message........");
/*声明了一个消费者*/
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received["+envelope.getRoutingKey()
+"]"+message);
AMQP.BasicProperties respProp
= new AMQP.BasicProperties.Builder()
.replyTo(properties.getReplyTo())
//要回复的消息ID
.correlationId(properties.getMessageId())
.build();
channel.basicPublish("", respProp.getReplyTo() ,
respProp ,
("Hi,"+message).getBytes("UTF-8"));
}
};
/*消费者正式开始在指定队列上消费消息*/
channel.basicConsume(queueName,true,consumer);
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/bruceyuan10/rabbitmqstudy.git
git@gitee.com:bruceyuan10/rabbitmqstudy.git
bruceyuan10
rabbitmqstudy
rabbitmqstudy
master

搜索帮助