3 Star 0 Fork 0

mirrors_datastax/pulsar-rabbitmq-gw

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

Starlight for RabbitMQ

Starlight for RabbitMQ acts as a proxy between your RabbitMQ application and Apache Pulsar. It implements the AMQP 0.9.1 protocol used by RabbitMQ clients and translates AMQP frames and concepts to Pulsar ones. The proxy can be run as a standalone jar, a Pulsar Pluggable Protocol Handler or a Pulsar Proxy extension.

Limitations

This is currently not implemented but on the roadmap:

  • Headers exchanges
  • Exclusive consumers
  • Non durable exchanges and queues
  • Transient messages (all messages are persisted)

RabbitMQ and Pulsar work in a pretty different way. Starlight for RabbitMQ was designed to make the most benefit from Pulsar's scalability. This results in some differences of behavior:

  • Canceling an AMQP consumer will requeue the messages that were received through it since it also closes the associated Pulsar consumers.

Get started

Download and build Starlight for RabbitMQ

To build from code, complete the following steps:

  1. Clone the project from GitHub.
git clone https://github.com/datastax/starlight-for-rabbitmq.git
cd starlight-for-rabbitmq
  1. Build the project.
mvn clean install -DskipTests

You can find the executable jar file in the following directory.

./starlight-rabbitmq/target/starlight-rabbitmq-${version}-jar-with-dependencies.jar

You can find the nar file in the following directory.

./starlight-rabbitmq/target/starlight-rabbitmq-${version}.nar

Running Starlight for RabbitMQ as a standalone executable jar

  1. Set the URLs of the Pulsar brokers and the ZooKeeper configuration store in a configuration file. Eg:
    brokerServiceURL=pulsar://localhost:6650
    brokerWebServiceURL=http://localhost:8080
    configurationStoreServers=localhost:2181
    
  2. Run as a Java application and provide the configuration file path in the -c/--config option:
    java -jar ./starlight-rabbitmq/target/starlight-rabbitmq-${version}-jar-with-dependencies.jar -c conf/starlight-rabbitmq.conf
    

Running Starlight for RabbitMQ as a protocol handler

Starlight for RabbitMQ can be embedded directly into the Pulsar brokers by loading it as a protocol handler.

  1. Set the configuration of the Starlight for RabbitMQ protocol handler in the broker configuration file (generally broker.conf or standalone.conf). Example where the NAR file was copied into the ./protocols directory:

    messagingProtocols=rabbitmq
    protocolHandlerDirectory=./protocols
    
  2. Set the AMQP service listeners. Note that the hostname value in listeners is the same as Pulsar broker's advertisedAddress. The following is an example.

    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=127.0.0.1
    
  3. Start the Pulsar broker

Running Starlight for RabbitMQ as a Pulsar Proxy extension

Starlight for RabbitMQ can be embedded into the Pulsar Proxy by loading it as a proxy extension.

  1. Set the configuration of the Starlight for RabbitMQ proxy extension in the proxy configuration file (generally proxy.conf or standalone.conf). Example where the NAR file was copied into the ./proxyextensions directory:

    proxyExtensions=rabbitmq
    proxyExtensionsDirectory=./proxyextensions
    
  2. Set the AMQP service listeners. Note that the hostname value in listeners is the same as Pulsar proxy's advertisedAddress. The following is an example.

    amqpListeners=amqp://127.0.0.1:5672
    advertisedAddress=127.0.0.1
    
  3. Start the Pulsar Proxy

Checking that it works

You can use a RabbitMQ/AMQP-0.9.1 client or a tool such as RabbitMQ PerfTest to check that everything works correctly.

For instance the following Python script creates a queue, publishes a message that will be routed to this queue, reads the message from the queue and deletes the queue

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672))
channel = connection.channel()

try:
    channel.queue_declare("test-queue")
    print("created test-queue queue")
    channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8'))
    print("published message test")
    _, _, res = channel.basic_get(queue="test-queue", auto_ack=True)
    assert res is not None, "should have received a message"
    print("received message: " + res.decode())
    channel.queue_delete("test-queue")
    print("deleted test-queue queue")
finally:
    connection.close()

Configuration

Generic configuration

Name Description Default
configurationStoreServers Zookeeper configuration store connection string (as a comma-separated list)
amqpListeners Used to specify multiple advertised listeners for the proxy. The value must format as amqp[s]://<host>:<port>, multiple listeners should be separated with commas. amqp://127.0.0.1:5672
amqpSessionCountLimit The maximum number of sessions which can exist concurrently on an AMQP connection. 256
amqpHeartbeatDelay The default period with which Broker and client will exchange heartbeat messages (in seconds) when using AMQP. Clients may negotiate a different heartbeat frequency or disable it altogether. 0
amqpHeartbeatTimeoutFactor Factor to determine the maximum length of that may elapse between heartbeats being received from the peer before an AMQP0.9 connection is deemed to have been broken. 2
amqpNetworkBufferSize AMQP Network buffer size. 2097152 (2MB)
amqpMaxMessageSize AMQP Max message size. 104857600 (100MB)
amqpDebugBinaryDataLength AMQP Length of binary data sent to debug log. 80
amqpConnectionCloseTimeout Timeout in ms after which the AMQP connection closes even if a ConnectionCloseOk frame is not received 2000
amqpBatchingEnabled Whether batching messages is enabled in AMQP true

Authentication configuration

Name Description Default
authenticationEnabled Whether authentication is enabled for the proxy false
amqpAuthenticationMechanisms Authentication mechanism name list for AMQP (a comma-separated list of mecanisms. Eg: PLAIN,EXTERNAL) PLAIN
tokenSecretKey Configure the secret key to be used to validate auth tokens. The key can be specified like: tokenSecretKey=data:;base64,xxxxxxxxx or tokenSecretKey=file:///my/secret.key. Note: key file must be DER-encoded.
tokenPublicKey Configure the public key to be used to validate auth tokens. The key can be specified like: tokenPublicKey=data:;base64,xxxxxxxxx or tokenPublicKey=file:///my/secret.key. Note: key file must be DER-encoded.
tokenAuthClaim Specify the token claim that will be used as the authentication "principal" or "role". The "subject" field will be used if this is left blank
tokenAudienceClaim The token audience "claim" name, e.g. "aud". It is used to get the audience from token. If it is not set, the audience is not verified.
tokenAudience The token audience stands for this broker. The field tokenAudienceClaim of a valid token need contains this parameter.

Broker client configuration

Name Description Default
brokerServiceURL The service URL pointing to the broker cluster.
brokerWebServiceURL The Web service URL pointing to the broker cluster
brokerClientAuthenticationPlugin The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers
brokerClientAuthenticationParameters The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers
amqpBrokerClientAuthenticationParameters If set, the RabbitMQ service will use these parameters to authenticate on Pulsar's brokers. If not set, the brokerClientAuthenticationParameters setting will be used. This setting allows to have different credentials for the Pulsar proxy and for the RabbitMQ service
tlsEnabledWithBroker Whether TLS is enabled when communicating with Pulsar brokers. false
brokerClientTrustCertsFilePath The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers
brokerClientTlsEnabledWithKeyStore Whether the proxy use KeyStore type to authenticate with Pulsar brokers
brokerClientTlsTrustStoreType TLS TrustStore type configuration for proxy: JKS, PKCS12 used by the proxy to authenticate with Pulsar brokers
brokerClientTlsTrustStore TLS TrustStore path for proxy, used by the Pulsar proxy to authenticate with Pulsar brokers
brokerClientTlsTrustStorePassword TLS TrustStore password for proxy, used by the Pulsar proxy to authenticate with Pulsar brokers

TLS configuration

Name Description Default
tlsCertRefreshCheckDurationSec TLS certificate refresh duration in seconds. If the value is set 0, check TLS certificate every new connection. 300
tlsCertificateFilePath Path for the TLS certificate file
tlsKeyFilePath Path for the TLS private key file
tlsTrustCertsFilePath Path for the trusted TLS certificate pem file
tlsAllowInsecureConnection Accept untrusted TLS certificate from client. If true, a client with a cert which cannot be verified with the tlsTrustCertsFilePath cert will be allowed to connect to the server, though the cert will not be used for client authentication
tlsHostnameVerificationEnabled Whether the hostname is validated when the proxy creates a TLS connection with brokers false
tlsRequireTrustedClientCertOnConnect Whether client certificates are required for TLS. Connections are rejected if the client certificate isn’t trusted. false
tlsProtocols Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- TLSv1.3, TLSv1.2
tlsCiphers Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
tlsRequireTrustedClientCertOnConnect Whether client certificates are required for TLS. Connections are rejected if the client certificate isn't trusted
tlsEnabledWithKeyStore Enable TLS with KeyStore type configuration for proxy
tlsProvider TLS Provider
tlsKeyStoreType TLS KeyStore type configuration for proxy: JKS, PKCS12
tlsKeyStore TLS KeyStore path for proxy
tlsKeyStorePassword TLS KeyStore password for proxy
tlsTrustStoreType TLS TrustStore type configuration for proxy: JKS, PKCS12
tlsTrustStore TLS TrustStore path for proxy
tlsTrustStorePassword TLS TrustStore password for proxy

Under the hood

AMQP 0.9.1 (the protocol used by RabbitMQ) employs the concepts of Exchanges, Queues and Bindings to provide basic routing capabilities inside the message broker. These concepts are mapped to Pulsar topics and features. One important architectural decision is that Starlight for RabbitMQ doesn’t interact directly with the managed ledger as in other RabbitMQ integrations for Pulsar. Interacting with the ledger has the advantage of being performant, but the disadvantage is that the broker which interacts with the ledger must have ownership of the topic. Since in AMQP 0.9.1 there is a many-to-many relationship between Exchanges and Queues for a given Virtual host, all Exchanges and Queues and related topics would have to be owned by the same broker. There are techniques to do this using Topic bundles, but the result is that a full AMQP Virtual host `can be handled by only one broker at a time. This is an issue for scalability. So instead, Starlight for RabbitMQ acts as a proxy and uses the Pulsar binary protocol to communicate with the brokers. This way it can leverage Pulsar features such as load balancing of the topics on the brokers, batching of messages, partitioning of topics, and load balancing of the data on the consumers.

On the publishing side, an AMQP exchange is mapped to a topic. Depending on the type of exchange, the publishing routing key may also be included in the topic name.

On the consumer side, Pulsar shared subscriptions are used to represent the AMQP Bindings from an Exchange to a Queue. When creating an AMQP Queue consumer, the proxy creates Pulsar consumers for all the Bindings of the Queue.

When you unbind the Queue, the Pulsar subscription isn’t deleted right away since the consumer may be lagging. Messages are still received from the subscription and filtered if their position is past the end of the binding. When all messages from the binding have been acknowledged, then the corresponding subscription can finally be removed by the SubscriptionCleaner task.

Consistent metadata store

Starlight for RabbitMQ uses Apache Zookeeper to store the AMQP entities metadata consistently. The existing ZooKeeper configuration store can be reused for this, and Starlight for RabbitMQ will employ the /starlight-rabbitmq prefix to write its entries into ZooKeeper.

Security and authentication

Starlight for RabbitMQ supports connections using TLS/mTLS to ensure privacy and security of the communication. It also supports the PLAIN and EXTERNAL mechanisms used by RabbitMQ. Internally, it uses the same AuthenticationService as Apache Pulsar and maps the AMQP mechanisms to existing Pulsar authentication modes. At the moment there is no support for authorization so an authenticated user has full access to all Virtual hosts. Starlight for RabbitMQ can connect to brokers that have TLS and/or authentication, and/or authorization enabled. To perform its operations, Starlight for RabbitMQ currently needs to use an “admin role”. Future versions will relay the principal authenticated to the proxy and use a “proxy role” so operations on the broker will have permissions from the originating application.

PLAIN authentication mechanism

The PLAIN mechanism is mapped to the AuthenticationProviderToken mode of authentication. The username is ignored and the password is used as the JSON Web Token (JWT).

EXTERNAL authentication mechanism

The EXTERNAL mechanism is mapped to the AuthenticationProviderTls mode of authentication. This is the equivalent of the rabbitmq-auth-mechanism-ssl plugin with ssl_cert_login_from parameter set to common_name.

Clustering

Multiple Starlight for RabbitMQ proxies can be launched at the same time for scalability and high availability needs. The proxies are stateless and can be started and stopped at will. They share their configuration in Zookeeper so you can create/delete/bind/unbind exchanges and queues on any proxy, and the configuration will be synchronized on the other proxies. Publishing messages can be done on any proxy. On the receiving side, messages will be dispatched evenly to all connected AMQP consumers since the Pulsar subscriptions are shared ones.

Multi-tenancy

Starlight for RabbitMQ offers support for multi-tenancy by mapping an AMQP Virtual host to a Pulsar tenant and namespace. The mapping depends on the following configuration parameters

Name Description Default
amqpDefaultTenant Default Pulsar tenant used to map short or empty VHosts public
amqpDefaultNamespace Default Pulsar namespace used to map short or empty VHosts default
amqpMapShortVhostToTenant By default short VHosts (not containing a slash character) will be mapped to a namespace on the default tenant. Set this parameter to true to map short VHosts to tenants with the default namespace instead. false

The mapping is done as follows:

  • AMQP vhost / is mapped to Pulsar namespace <amqpDefaultTenant>/<amqpDefaultNamespace>
  • AMQP vhost /<vhost> or <vhost> is mapped to Pulsar namespace <amqpDefaultTenant>/<vhost> if amqpMapShortVhostToTenant is false (default) or to <vhost>/<amqpDefaultNamespace> if amqpMapShortVhostToTenant is true
  • AMQP vhost /<tenant>/<namespace> or <tenant>/<namespace> is mapped to Pulsar namespace <tenant>/<namespace>

This means that AMQP vhosts must only contain characters that are accepted in Pulsar tenant and namespace names (ie. a-zA-Z0-9_-=:.)

Contribute

Release

mvn release:prepare -DautoVersionSubmodules -Prelease -Darguments='-DskipTests'

The GitHub release is handled by a GitHub action whenever a tag is being pushed

System tests

System tests is a test suite which runs a RabbitMQ client against a real Pulsar Cluster and tests the protocol handler is working correctly.

mvn -f rabbitmq-tests integration-test failsafe:verify -Dgroups=com.datastax.oss.starlight.rabbitmqtests.SystemTest \
   -Dtests.systemtests.enabled=true \
   -Dtests.systemtests.pulsar.host=your-pulsar-broker-or-proxy-hostname \
   -Dtests.systemtests.ampqlistener.port=5672
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

暂无描述 展开 收起
README
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

语言

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_datastax/pulsar-rabbitmq-gw.git
git@gitee.com:mirrors_datastax/pulsar-rabbitmq-gw.git
mirrors_datastax
pulsar-rabbitmq-gw
pulsar-rabbitmq-gw
master

搜索帮助