在业务逻辑的异步处理,系统解耦,分布式通信以及控制高并发的场景下,消息队列有着广泛的应用。本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下:
1.开启activeMQ,访问http://localhost:8080/demo
2 在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;
3.填入要发送的消息,点击生产消息可以向消息队列添加一条消息,我们可以试着添加了四条消息,并观察控制台结果,可以发现每个消息只被某一个消费者接收;
4.重复以上操作发布四条消息,可以看到订阅者的输出结果,表明每个发布的消息可以被两个订阅者全部接收;
5.以上结果表明,向队列生产的每条消息,只能被某一个消费者读取,而发布的消息,可以被每个订阅者重复读取,这是两种模式最大的区别,实际应用中要根据情况来选择。
找一台服务器,把activeMQ的安装文件夹拷贝两次,分别进行配置:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8361"/>
</bean>
这里配置web控制台端口为8361,另外两台分别改为8362、8363。
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63631" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63632"
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63633"
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
zkAddress填zookeeper地址,ZkPath指集群选举信息在zookeeper的存放路径。
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53531?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5662?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53532?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5663?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53533?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5664?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1885?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
注意端口号不要冲突。 依次启动节点,观察zookeeper内数据的值,可以判断当前主节点是哪一台服务器。可以关闭部分服务器来测试集群的高可用性。
failover:(tcp://192.168.252.128:53531,tcp://192.168.252.128:53532,tcp://192.168.252.128:53533)?randomize=false
由于一个activeMQ集群只有一个主服务器在工作,对消息的吞吐量是有限的。要想进行扩展,则需要把多个集群桥接起来,这样连接到任意一个集群,即可与整个集群的队列,生产者,消费者之间进行通信。在activemq.xml新增:
<networkConnectors>
<networkConnector
uri="static:(tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533)"
duplex="false"/>
</networkConnectors>
以上配置即可完成到tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533集群的链接。注意要相互配。
购买地址 |
---|
标题 | 技术领域 | |
---|---|---|
1 | 一种基于微服务架构的车联网大数据分析系统 | 微服务 |
2 | 一种流式数据场景下Elasticsearch索引的自动化扩容方法 | 搜索引擎 |
3 | 大数据钻取分析方法、装置、设备及存储介质 | 大数据分析 |
4 | 一种基于工作流引擎的自动化办公方法和系统 | 工作流引擎 |
5 | 一种低延迟高性能实时数据仓库搭建的方法和系统 | 实时数仓 |
6 | 一种基于数据治理的大数据中台架构系统 | 数据中台 |
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型