The current repo belongs to Closed status, and some functions are restricted. For details, please refer to the description of repo status
3 Star 12 Fork 6

寻根 / RabbitmqConnect
Closed

Create your Gitee Account
Explore and code with more than 8 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
RabbitmqConnect.h 3.86 KB
Copy Edit Web IDE Raw Blame History
寻根 authored 2019-06-11 20:00 . 新文件: Example.cpp
#ifndef XG_RABITTMQCONNECT_H
#define XG_RABITTMQCONNECT_H
////////////////////////////////////////////////////////////////
#include <string>
#include <functional>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#define CHECK_FALSE_RETURN(FUNC) if (FUNC){} else return false;
using namespace std;
class RabbitmqConnect
{
protected:
amqp_socket_t* sock;
amqp_rpc_reply_t res;
amqp_connection_state_t conn;
public:
RabbitmqConnect()
{
res.reply_type = AMQP_RESPONSE_NORMAL;
sock = NULL;
}
~RabbitmqConnect()
{
close();
}
bool getResponse()
{
res = amqp_get_rpc_reply(conn);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
return true;
}
int getErrorCode() const
{
return res.library_error;
}
string getErrorString() const
{
switch (res.reply_type)
{
case AMQP_RESPONSE_NORMAL: return "SUCCESS";
case AMQP_RESPONSE_NONE: return "RESPONSE_NONE";
case AMQP_RESPONSE_LIBRARY_EXCEPTION: return amqp_error_string2(res.library_error);
case AMQP_RESPONSE_SERVER_EXCEPTION:
if (res.reply.id == AMQP_CONNECTION_CLOSE_METHOD || res.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
{
auto msg = ((amqp_connection_close_t*)(res.reply.decoded))->reply_text;
return string((char*)(msg.bytes), (char*)(msg.bytes) + msg.len);
}
return "RESPONSE_SERVER_EXCEPTION";
default: return "UNKNOWN ERROR";
}
}
public:
void close()
{
if (sock)
{
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
sock = NULL;
}
}
bool connect(const string& host, int port, bool ssl = false)
{
close();
conn = amqp_new_connection();
sock = ssl ? amqp_ssl_socket_new(conn) : amqp_tcp_socket_new(conn);
if (sock == NULL)
{
amqp_destroy_connection(conn);
return false;
}
if (amqp_socket_open(sock, host.c_str(), port))
{
amqp_destroy_connection(conn);
sock = NULL;
return false;
}
return true;
}
int send(const string& exchange, const string& qname, const string& data)
{
amqp_basic_properties_t props;
props.delivery_mode = 2;
props.content_type = amqp_cstring_bytes("text/plain");
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
return amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(qname.c_str()), 0, 0, &props, amqp_cstring_bytes(data.c_str()));
}
bool login(const string& usr, const string& pwd, amqp_sasl_method_enum method = AMQP_SASL_METHOD_PLAIN)
{
res = amqp_login(conn, "/", 0, 128 * 1024, 0, method, usr.c_str(), pwd.c_str());
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
amqp_channel_open(conn, 1);
return getResponse();
}
bool recv(const string& exchange, const string& qname, std::function<void(const char*, int)> func, int timeout = 5, bool aotodelete = false)
{
timeval tv;
amqp_bytes_t mq = amqp_cstring_bytes(qname.c_str());
amqp_queue_declare(conn, 1, mq, 0, 1, 0, aotodelete ? 1 : 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_queue_bind(conn, 1, mq, amqp_cstring_bytes(exchange.c_str()), mq, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
amqp_basic_consume(conn, 1, mq, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
CHECK_FALSE_RETURN(getResponse());
tv.tv_sec = timeout;
tv.tv_usec = 0;
while (true)
{
amqp_envelope_t data;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &data, &tv, 0);
CHECK_FALSE_RETURN(res.reply_type == AMQP_RESPONSE_NORMAL);
func((char*)(data.message.body.bytes), data.message.body.len);
amqp_destroy_envelope(&data);
}
return false;
}
};
////////////////////////////////////////////////////////////////
#endif
C++
1
https://gitee.com/xungen/rabbitmqconnect.git
git@gitee.com:xungen/rabbitmqconnect.git
xungen
rabbitmqconnect
RabbitmqConnect
master

Search