资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

RabbitMQ第五课使用技巧

1) C++ 获取消息数据
amqp_rpc_reply_t ret;
timeval tvTimeout;
tvTimeout.tv_sec = 1;
tvTimeout.tv_usec = 0;
ret = amqp_consume_message(conn, &envelope, &valTimeOut, 0);

成都创新互联公司是一家专注网站建设、网络营销策划、微信小程序开发、电子商务建设、网络推广、移动互联开发、研究、服务为一体的技术型公司。公司成立10年以来,已经为近千家生料搅拌车各业的企业公司提供互联网服务。现在,服务的近千家客户与我们一路同行,见证我们的成长;未来,我们一起分享成功的喜悦。

if (AMQP_RESPONSE_NORMAL == ret.reply_type)
{
   std::string strAMQPMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
}

误区: std::string strAMQPMsg = char*)envelope.message.body.bytes 存在多余的数据
误区: 没有设置接收超时,而是直接传递NULL,导致函数进入死循环


2)发送消息的时候,返回错误信息:AMQP_STATUS_SOCKET_ERROR
AMQP_STATUS_SOCKET_ERROR = -0x0009,               /**< A socket error occurred */
服务器在一定时间之内,收到客户端的消息,就会主动断开连接,因此客户端需要跟服务器Broker重新建立连接,如果不想断开连接,需要发送心跳


3)确认数据是否已经发送成功

      关于消费者就不用代码来获取消息了,直接在RabbitMQ Management点击某个队列的名字,然后Get Message(s) 即可获取消息内容

4)指定消息的超时时间
某些实际的应用场景中会产生许多过期的消息时间,可以通过设置amqp_basic_properties_t的超时时间参数expiration来解决队列中的超时数据过多的问题

amqp_basic_properties_t props;
props._flags = AMQP_BASIC_EXPIRATION_FLAG;
props.expiration = amqp_cstring_bytes("10000");//超时10秒
amqp_basic_publish(conn, channnel, exchange, queue, 0, 0, &props, message);

5)声明队列,返回错误信息:AMQP_RESPONSE_SERVER_EXCEPTION

原因:1.交换机是否创建成功 2.声明的队列是否已经创建过,并且已经存在的队列跟现在的队列的属性不一致,例如auto_delete自动删除属性,或者durable持久化属性

导致的问题:当返回该错误,说明跟broker的连接已经中断,必须重新建立连接,否则,继续调用其他函数接口会一直阻塞

解决: 通过web手动删除队列

6)只知道队列的情况下获取数据

实际上说明声明的交换机和队列都必须唯一

 amqp_connection_state_t connState = amqp_new_connection();
 amqp_socket_t *pSocket = amqp_tcp_socket_new(connState);
 if (!pSocket) {
  amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(connState);
  std::cout << "跟消息服务器创建连接失败" << std::endl;
  return;
 }
 int nConnStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
 if (AMQP_STATUS_OK != nConnStatus) {
  amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(connState);
  return;
 }

 amqp_rpc_reply_t  rpcReply = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
 if (AMQP_RESPONSE_NORMAL != rpcReply.reply_type)
 {
  std::cout << "登陆消息服务器失败" << std::endl;
  return;
 }

 amqp_channel_open(connState, 1);
 amqp_basic_consume(connState, 1, amqp_cstring_bytes("passerby-000001"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

 amqp_frame_t frame;
 std::cout << "登陆消息服务器成功,开始接收数据" << std::endl;

 while (1)
 {
  amqp_envelope_t envelope;

  amqp_maybe_release_buffers(connState);
  timeval tvTimeout;
  tvTimeout.tv_sec = 10;
  tvTimeout.tv_usec = 0;
  amqp_rpc_reply_t ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);

  if (AMQP_RESPONSE_NORMAL != ret.reply_type)
  {
   if (AMQP_STATUS_SOCKET_ERROR == ret.library_error)
   {
    std::cout << "跟消息服务器连接中断,清理资源,重连连接" << std::endl;
    break;
   }
   if (AMQP_STATUS_TIMEOUT == ret.library_error)
   {
    std::cout << "等待消息服务器消息超时,继续等待" << std::endl;
    continue;
   }
   std::cout << "跟消息服务器连接出现异常,清理资源,重连连接" << std::endl;
   break;
  }
  else
  {
   std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
   std::cout << "接收到的抓拍信息:" << strRecvMsg<< std::endl;
   amqp_destroy_envelope(&envelope);
   continue;
  }
 }
 amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
 amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
 amqp_destroy_connection(connState);

7)生产者不产生队列,消费者通过指定的交换机和routing-key,创建队列,然后将该队列绑定到交换机上

 char const* pszExchange = "passerByExchange";
 char const* pszRoutingKey = "passerby-000001";

 amqp_connection_state_t connState = amqp_new_connection();
 amqp_socket_t* pSocket = amqp_tcp_socket_new(connState);
 if (!pSocket) {
  amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(connState);
  return;
 }

 int nStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
 if (nStatus) {
  amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(connState);
  return;
 }

 amqp_rpc_reply_t  replyLogin = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
 if (AMQP_RESPONSE_NORMAL != replyLogin.reply_type)
 {
  std::cout << "登陆消息服务器失败" << std::endl;
  return;
 }

 amqp_channel_open(connState, 1);
 amqp_queue_declare_ok_t *r = amqp_queue_declare(
  connState, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
 amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
 if (queueName.bytes == NULL)
 {
  amqp_bytes_free(queueName);
  amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(connState);
  return;
 }

 amqp_queue_bind(connState, 1, queueName, amqp_cstring_bytes(pszExchange),
  amqp_cstring_bytes(pszRoutingKey), amqp_empty_table);
 amqp_basic_consume(connState, 1, queueName, amqp_empty_bytes, 0, 1, 0,
  amqp_empty_table);
 amqp_frame_t frame;

 while (1)
 {
  amqp_rpc_reply_t ret;
  amqp_envelope_t envelope;

  amqp_maybe_release_buffers(connState);
  timeval tvTimeout;
  tvTimeout.tv_sec = 10;
  tvTimeout.tv_usec = 0;
  ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);

  if (AMQP_RESPONSE_NORMAL != ret.reply_type)
  {
   if (AMQP_STATUS_TIMEOUT == ret.library_error)
   {
    std::cout << "接收消息超时" << std::endl;
    continue;
   }
   std::cout << "连接消息服务器异常,清理资源退出" << std::endl;
   break;
  }
  else
  {
   std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
   std::string strGBK = UTF8ToGBK(strRecvMsg.c_str());
   amqp_destroy_envelope(&envelope);
  }
 }
 amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
 amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
 amqp_destroy_connection(connState);

8)amqp_basic_consume函数不能连续调用多次同时消费多个队列

代码如下:

amqp_basic_consume(connState, 1, amqp_cstring_bytes("alarm"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

amqp_basic_consume(connState, 1, amqp_cstring_bytes("capture"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

只能执行第一句代码,第二句代码会一直阻塞

9)登陆MQ服务器,进行心跳交互代码

 amqp_rpc_reply_t  replyLogin = amqp_login(conn, "/", 0, 131072, 120, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());

第五个参数,指定了跟服务器多少秒发送一次心跳,如果不发心跳,跟服务器在连接一段时间之后,断开,当然,也要考虑到长连接也可能在网络异常情况下断开


名称栏目:RabbitMQ第五课使用技巧
分享网址:http://www.cdkjz.cn/article/gpehoo.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220