The current repo belongs to Closed status, and some functions are restricted. For details, please refer to the description of repo status
2 Star 9 Fork 5

寻根 / RabbitmqConnect
Closed

Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Without author's permission, this code is only for learning and cannot be used for other purposes.
Clone or Download
RabbitmqConnect.h 3.86 KB
Copy Edit Web IDE Raw Blame History
寻根 authored 2019-06-11 20:00 . 新文件: Example.cpp
#ifndef XG_RABITTMQCONNECT_H
#define XG_RABITTMQCONNECT_H
////////////////////////////////////////////////////////////////
#include <string>
#include <functional>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#define CHECK_FALSE_RETURN(FUNC) if (FUNC){} else return false;
using namespace std;
class RabbitmqConnect
{
protected:
amqp_socket_t* sock;
amqp_rpc_reply_t res;
amqp_connection_state_t conn;
public:
RabbitmqConnect()
{
res.reply_type = AMQP_RESPONSE_NORMAL;
sock = NULL;
}
~RabbitmqConnect()
{
close();
}
bool getResponse()
{
res = amqp_get_rpc_reply(conn);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
return true;
}
int getErrorCode() const
{
return res.library_error;
}
string getErrorString() const
{
switch (res.reply_type)
{
case AMQP_RESPONSE_NORMAL: return "SUCCESS";
case AMQP_RESPONSE_NONE: return "RESPONSE_NONE";
case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(res.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
if (res.reply.id == AMQP_CONNECTION_CLOSE_METHOD || res.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
{
auto msg = ((amqp_connection_close_t*)(res.reply.decoded))->reply_text;
return string((char*)(msg.bytes), (char*)(msg.bytes) + msg.len);
}
return "RESPONSE_SERVER_EXCEPTION";
default: return "UNKNOWN ERROR";
}
}
public:
void close()
{
if (sock)
{
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
sock = NULL;
}
}
bool connect(const string& host, int port, bool ssl = false)
{
close();
conn = amqp_new_connection();
sock = ssl ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
if (sock == NULL)
{
amqp_destroy_connection(conn);
return false;
}
if (amqp_socket_open(sock, host.c_str(), port))
{
amqp_destroy_connection(conn);
sock = NULL;
return false;
}
return true;
}
int send(const string& exchange, const string& qname, const string& data)
{
amqp_basic_properties_t props;
props.delivery_mode = 2;
props.content_type = amqp_cstring_bytes("text/plain");
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
return amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(qname.c_str()), 0, 0, &props, amqp_cstring_bytes(data.c_str()));
}
bool login(const string& usr, const string& pwd, amqp_sasl_method_enum method = AMQP_SASL_METHOD_PLAIN)
{
res = amqp_login(conn, "/", 0, 128 * 1024, 0, method, usr.c_str(), pwd.c_str());
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
amqp_channel_open(conn, 1);
return getResponse();
}
bool recv(const string& exchange, const string& qname, std::function<void(const char*, int)> func, int timeout = 5, bool aotodelete = false)
{
timeval tv;
amqp_bytes_t mq = amqp_cstring_bytes(qname.c_str());
amqp_queue_declare(conn, 1, mq, 0, 1, 0, aotodelete ? 1 : 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_queue_bind(conn, 1, mq, amqp_cstring_bytes(exchange.c_str()), mq, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_basic_consume(conn, 1, mq, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
tv.tv_sec = timeout;
tv.tv_usec = 0;
while (true)
{
amqp_envelope_t data;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &data, &tv, 0);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
func((char*)(data.message.body.bytes), data.message.body.len);
amqp_destroy_envelope(&data);
}
return false;
}
};
////////////////////////////////////////////////////////////////
#endif

Comment ( 0 )

Sign in to post a comment

C++
1
https://gitee.com/xungen/rabbitmqconnect.git
git@gitee.com:xungen/rabbitmqconnect.git
xungen
rabbitmqconnect
RabbitmqConnect
master

Search

161121 f78d6d6f 1850385 154831 86f8c370 1850385