activemqcpp开发基础手册专业资料.doc
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- activemqcpp 开发 基础 手册 专业 资料
- 资源描述:
-
Activemq-cpp开发手册 丁靖 -05-06 1 引言 1.1 编写目 迅速学习CMS,提高CMS开发效率,提供一种CMS开发参照手册 详细API手册请参照 1.2 功能简介 Activemq-cpp是一种与ActiveMQ交互通讯C++ API开发库,为C++开发者提供了一种访问ActiveMQ接口。 Winkeemq-cpp是一种在Activemq-cpp基本上封装API库,对某些重复机械初始化及销毁清除及某些不关怀细节进行了封装,从而简化了编程。 1.3 术语解析 ActiveMQ :开源消息队列服务器 Broker :消息中介,每个消息队列服务器中至少有一种broker,是消息队列载体 Destination :消息在broker上目地 Queue :消息队列 Topic :主题 Message :消息 Producer :消息产生者 Consumer :消息消费者 Client :客户端,生产者和消费者都在客户端上 Server :Activemq服务器 BrokerUri :客户端访问服务器上broker时Uri 其他资料请参照 2 开发前准备 在开发前必要先安装activemq-cpp及winkeemq-cpp库,详细环节参照《activemq-cpp安装及使用文档.doc》 3 CMS 3.1 概述 CMS(stands for C++ Messaging Service)是一组C++应用程序接口(C++ API),它提供创立、发送、接受、读取消息服务。定义了一组和Sun公司和它合伙伙伴设计CMS API相似公共应用程序接口和相应语法,使得C++程序可以和其她消息组件进行通信。 CMS是一种与厂商无关 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库 API,而 CMS 则提供同样与厂商无关访问办法,以访问消息收发服务。CMS 使您可以通过消息收发服务(有时称为消息中介程序或路由器)从一种 CMS 客户机向另一种客户机发送消息。消息是 CMS 中一种类型对象,由两某些构成:报头和消息主体。报头由路由信息以及关于该消息元数据构成。消息主体则携带着应用程序数据或有效负载。依照有效负载类型来划分,可以将消息分为几种类型,它们分别携带:简朴文本 (TextMessage)、可序列化对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),尚有无有效负载消息 (Message)。 消息收发系统是异步,也就是说,CMS 客户机可以发送消息而不必等待回应。比较可知,这完全不同于基于 RPC (基于远程过程)系统,如 EJB 1.1、CORBA 和 Java RMI 引用实现。在 RPC 中,客户机调用服务器上某个分布式对象一种办法。在办法调用返回之前,该客户机被阻塞;该客户机在可以执行下一条指令之前,必要等待办法调用结束。在 CMS 中,客户机将消息发送给一种虚拟通道(主题或队列),而其他 CMS 客户机则预订或监听这个虚拟通道。当 CMS 客户机发送消息时,它并不等待回应。它执行发送操作,然后继续执行下一条指令。消息也许最后转发到一种或许各种客户机,这些客户机都不需要作出回应。 CMS通用接口集合以异步方式发送或接受消息。异步方式接受消息显然是使用间断网络连接客户 机,诸如移动电话和PDA最佳选取。此外, CMS采用一种宽松结合方式整合公司系统办法,其重要目就是创立可以使用跨平台数据信息、可移植公司级应用程序,而把开发人力解放出来。 CMS消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)。CMS规范并不规定供应商同步支持这两种消息模型,但开发者应当熟悉这两种消息模型优势与缺陷。 P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者但愿每一条消息都可以被解决,那么应当使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是可以被传送到指定位置。 Pub/Sub模型在一到多消息广播时使用。如果一定限度消息传递不可靠性可以被接受话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它合用于所有消息消费程序并不规定可以收到所有信息或者消息消费程序并不想接受到任何消息状况。 CMS通过容许创立持久订阅来简化时间有关性,虽然消息预订者未激活也可以接受到消息。此外,使用 持久订阅还可通过队列提供灵活性和可靠性,而依然容许消息被发给许多接受者。 Topic Subscriber topic Subscriber = topicSession.createDurableSubscriber(topic,subscriptionName); Connection对象表达了到两种消息模型中任一种消息系统连接。服务器端和客户机端对象规定管理创立CMS连接状态。连接是由 Connection Factory创立并且通过JNDI查寻定位。 //获得用于 P2P QueueConnectionFactory QueueConnectionFactory = queueConnectionFactory(); Context messaging = new InitialContext(); QueueConnectionFactory = (QueueConnectionFactory) Messaging.lookup(“QueueConnectionFactory”); //获得用于 pub/sub TopicConnectionFactory TopicConnectonFactory topicConnectionFactory; Context messaging = new InitialContext(); topicConnectionFactory = (TopicConnectionFactory)messaging.lookup(“TopicConnectionFactory”); 注意:用于P2P代码和用于PublishSubscribe代码非常相似。 如果session被标记为transactional话,确认消息就通过确认和校正来自动地解决。如果session没有标记为 transactional,你有三个用于消息确认选项。 · AUTO_ACKNOWLEDGE session将自动地确认收到一则消息。 · CLIENT_ACKNOWLEDGE 客户端程 序将确认收到一则消息,调用这则消息确认办法。 · DUPS_OK_ACKNOWLEDGE 这个选项命令session“懒散”确认消息传递,可以想到,这将导致消息提供者传递某些复制消息也许会出错。这种确认方式只应当用于消息消费程序 可以容忍潜在副本消息存在状况。 queueSession =queueConnection.createQueueSession(false,session.AUTO_ACKNOWLEDGE);//P2P topicSession = topicConnection.createTopicSession(false,session.AUTO_ACKNOWLEDGE);//Pub-Sub 注意:在本例中,一种session目从连结中创立,非值指出session是non-transactional,并且 session将自动地确认收到一则消息。 CMS当前有两种传递消息方式。标记为NON_PERSISTENT消息最多投递一次,而标记 为PERSISTENT消息将使用暂存后再转送机理投递。如果一种CMS服务离线,那么持久性消息不会丢失但是得等到这个服务恢复联机时才会被传递。 因此默认消息传递方式是非持久性。虽然使用非持久性消息也许减少内务和需要存储器,并且这种传递方式只有当你不需要接受所有消息时才使用。 虽然 CMS规范并不需要CMS供应商实现消息优先级路线,但是它需要递送加快消息优先于普通级别消息。CMS定义了从0到9优先级路线级别,0是最低 优先级而9则是最高。更特殊是0到4是正常优先级变化幅度,而5到9是加快优先级变化幅度。举例来说: topicPublisher.publish (message,DeliveryMode.PERSISTENT,8,10000);//Pub-Sub 或 queueSender.send(message,DeliveryMode.PERSISTENT,8,10000);//P2P 这个代码片断,有两种消息模型,映射递送方式是持久,优先级为加快型,生存周期是10000 (以毫秒度量 )。如果生存周期设立为零,这则消息将永远不会过期。当消息需要时间限制否则将使其无效时,设立生存周期是有用。 CMS定义了五种不同消息正文格式,以及调用消息类型,容许你发送并接受以某些不同形式数据,提供既有消息格式某些级别兼容性。 · StreamMessage -- Java原始值数据流 · MapMessage--一套名称-值对 · TextMessage--一种字符串对象 · ObjectMessage--一种序列化 Java对象 · BytesMessage--一种未解释字节数据流 CMS应用程序接口提供用于创立每种类型消息和设立荷载办法例如,为了在一种队列创立并发送一种 TextMessage实例,你可以使用下列语句: TextMessage message = queueSession.createTextMessage(); message.setText(textMsg); 以异步方式接受消息,需要创立一种消息监听器然后注册一种或各种使用MessageConsumerCMS MessageListener接口。会话(主题或队列)负责产生某些消息,这些消息被传送到使用onMessage办法监听者那里。 Using namespace cms; class ExampleListener :public MessageListener { //把消息强制转化为TextMessage格式 public void onMessage(Message message) { TextMessage textMsg = null; // 打开并解决这段消息 } } 当咱们创立QueueReceiver和TopicSubscriber时,咱们传递消息选取器字符串: //P2P QueueReceiver QueueReceiver receiver; receiver = session.createReceiver(queue,selector); //Pub-Sub TopicSubscriber TopicSubscriber subscriber; subscriber = session.createSubscriber(topic,selector); 为了启动消息交付,无论是Pub/Sub还是P2P,都需要调用start办法。 TopicConnection.start();//pub-sub QueueConnection.start();//P2P 当一条消息被捕获时,这条消息做为一条必要被强制转化为恰当消息类型普通Message对象到达。如TextMessage void onMessage(const Message* message){ TextMessage txtMsg=dynamic_cast<TextMessage* >(message); … //对 txtMsg做某些解决 } 停止消息传递,无论是Pub/Sub还是P2P,都调用stop办法。 TopicConnection. stop ();//pub-sub QueueConnection. stop ();//P2P 3.2 接口描述 CMS 支持两种消息类型P2P 和Pub/Sub,分别称作:P2P Domain 和Pub/Sub Domain,这两种接口都继承统一CMS Parent 接口,CMS 重要接口如下所示: CMS Parent ConnectionFactory Connection Destination Session MessageProducer MessageConsumer 如下是对这些接口简朴描述: ConnectionFactory :连接工厂,CMS 用它创立连接 Connection :CMS 客户端到CMS Provider 连接 Destination :消息目地 Session: 一种发送或接受消息线程 MessageProducer: 由Session 对象创立用来发送消息对象 MessageConsumer: 由Session 对象创立用来接受消息对象 3.3 CMS消息模型 CMS 消息由如下几某些构成:消息头,属性,消息体。 消息头(Header) - 消息头包括消息辨认信息和路由信息,消息头包括某些原则属性如:CMSDestination,CMSMessageID 等。 消息头 由谁设立 CMSDestination send 或 publish 办法 CMSDeliveryMode send 或 publish 办法 CMSExpiration send 或 publish 办法 CMSPriority send 或 publish 办法 CMSMessageID send 或 publish 办法 CMSTimestamp send 或 publish 办法 CMSCorrelationID 客户 CMSReplyTo 客户 CMSType 客户 CMSRedelivered CMS Provider 属性(Properties) - 除了消息头中定义好原则属性外,CMS 提供一种机制增长新属性到消息头中,这种新属性包括如下几种: 1. 应用需要用到属性; 2. 消息头中原有某些可选属性; 3. CMS Provider 需要用到属性。 原则CMS 消息头包括如下属性: CMSDestination --消息发送目地 CMSDeliveryMode --传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表达该消息一定要被送到目地,否则会导致应用错误。NON_PERSISTENT 表达偶尔丢失该消息是被容许,这两种模式使开发者可以在消息传递可靠性和吞吐量之间找到平衡点。 CMSMessageID 唯一辨认每个消息标记,由CMS Provider 产生。 CMSTimestamp 一种消息被提交给CMS Provider 到消息被发出时间。 CMSCorrelationID 用来连接到此外一种消息,典型应用是在回答消息中连接到原消息。 CMSReplyTo 提供本消息回答消息目地址。 CMSRedelivered 如果一种客户端收到一种设立了CMSRedelivered 属性消息,则表达也许该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。 CMSType 消息类型辨认符。 CMSExpiration 消息过期时间,等于QueueSender send 办法中timeToLive 值或TopicPublisher publish 办法中timeToLive 值加上发送时刻GMT 时间值。如果timeToLive值等于零,则CMSExpiration 被设为零,表达该消息永但是期。如果发送后,在消息过期时间之后消息还没有被发送到目地,则该消息被清除。 CMSPriority 消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。CMS 不规定CMS Provider 严格按照这十个优先级发送消息,但必要保证加急消息要先于普通消息到达。 消息体(Body) - CMS API 定义了4种消息体格式,也叫消息类型,你可以使用不同形式发送接受数据并可以兼容既有消息格式,下面描述这4种类型: 消息类型 消息体 TextMessage string对象,如xml文献内容 MapMessage 名/值对集合,名是string对象,值类型可以是c++任何基本类型 BytesMessage 字节流 ObjectMessage 对象类型 Message 没有消息体,只有消息头和属性。 下例演示创立并发送一种TextMessage到一种队列: TextMessage message = queueSession.createTextMessage(); message.setText(msg_text);// msg_text is a String message.setCMSType(“text”); queueSender.send(message); 下例演示接受消息并转换为适当消息类型: Message* m = queueReceiver.receive(); If (m->getCMSType() == “text”){ TextMessage txt=dynamic_cast<TextMessage*>(m); // do something }else{ } 4 消息生产者客户端 消息生产者产生消息并将消息发送到broker上队列或主题中。 要使消息生产者生产消息被消息消费者消费,必要满足两个条件: 生产者和消费者必要连接到同一种Broker,即BrokerUri中主机名和端口相似 生产者和消费者必要具备相似destination,即同一种队列名或主题名 4.1 使用activemq-cpp来创立消息生产者 4.1.1 头文献及名字空间 #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Config.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/ExceptionListener.h> #include <stdlib.h> #include <iostream> using namespace activemq; using namespace activemq::core; using namespace cms; using namespace std; 4.1.2 创立一种生产者类 class SimpleProducer { private: Connection* connection; //连接对象 Session* session; //会话 Destination* destination; //消息目地 MessageProducer* producer;//消息生产者 bool useTopic; //与否采用采用主题模式 bool clientAck; //与否自动确认消息接受 unsigned int numMessages; //生产消息数 std::string brokerURI; //连接borker uri std::string destURI; //队列或主题名 public: /./构造函数 SimpleProducer( const std::string& brokerURI, unsigned int numMessages, const std::string& destURI, bool useTopic = false, bool clientAck = false ){ connection = NULL; session = NULL; destination = NULL; producer = NULL; this->numMessages = numMessages; this->useTopic = useTopic; this->brokerURI = brokerURI; this->destURI = destURI; this->clientAck = clientAck; initialize(); } virtual ~SimpleProducer(){ cleanup(); } 4.1.3 初始化及销毁 // 初始化 private: Virtual void initialize(){ try { // 创立连接工厂 ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // 创立一种到broker连接 connection = connectionFactory->createConnection(); connection->start(); // 关闭连接工厂 delete connectionFactory; // 创立一种会话 if( clientAck ) {//消息接受后由消费者客户端确认 session= connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else //消息接受自动确认 session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // 创立一种队列或主题 (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // 创立生产者并设定消息传送模式 producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); }catch ( CMSException& e ) { e.printStackTrace(); } } // 销毁 void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch ( CMSException& e ) { e.printStackTrace();} destination = NULL; try{ if( producer != NULL ) delete producer; }catch ( CMSException& e ) { e.printStackTrace();} producer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch ( CMSException& e ) { e.printStackTrace();} try{ if( session != NULL ) delete session; }catch ( CMSException& e ) { e.printStackTrace();} session = NULL; try{ if( connection != NULL ) delete connection; }catch ( CMSException& e ) { e.printStackTrace();} connection = NULL; } 4.1.4 生产一种消息并发送到队列中 public : void send(){ // 消息内容 string text = (string)"Hello world!thread "; for( std::size_t ix=0;ix<numMessages;++ix ){ // 创立一种文本类型消息 TextMessage* message = session->createTextMessage( text ); // 发送消息 printf( "Sent message #%d \n",ix+1 ); producer->send( message ); // 释放消息 delete message; } } }; 4.1.5 发送消息主程序 Int main(void){ // broker uri std::string brokerURI = "tcp://127.0.0.1:61616" "?wireFormat=openwire" "&transport.useAsyncSend=true" // 发送消息数 unsigned int numMessages = ; // 消息队列名 std::string destURI = "TEST.FOO"; // 使用队列模式 bool useTopics = false; //初始化一种消息生产者对象并发送消息 SimpleProducer producer( brokerURI,numMessages,destURI,useTopics ); producer.send(); return 0; } 4.1.6 总结 综上例子可知,每次发送一种消息到消息队列中都需要定义一种生产者类,并完毕一种机械初始化及销毁过程。为了提高软件开发效率,可以模仿生产者类定义一种消息发送者类,封装有关细节,并编译成共享库以供使用,简化编程过程。 4.2 使用winkeemq-cpp来创立消息生产者 4.2.1 头文献及名字空间 #include <MessageSender.h> using namespace winkeemq; using namespace std; 4.2.2 发送消息主程序 int main(int argc,char* argv[]){ // broker uri std::string brokerURI = "tcp://192.168.1.179:61616" "?wireFormat=openwire" "&wireFormat.maxInactivityDuration=0" "&soKeepAlive=true" "&transport.useAsyncSend=true"; // 队列名 string mqName="mm.mq"; // 创立一种消息发送对象(采用队列模式,每次只发一种消息) MessageSender ms(brokerURI,1,false,mqName); string body=”hello world\n”; // 创立一种文本消息 TextMessage* msg=dynamic_cast<TextMessage* > (ms.createMessag(MessageSender::TEXT_MESSAGE)); // 设定消息体内容 msg->setText(body); // 发送消息 ms.sendMessage(); // 销毁消息 ms.deleteMessage(); } 4.2.3 总结 由上述例子可看出,采用winkeemq-cpp后裔码量精简了诸多,开发员不需要关怀那些机械初始化细节。要创立一种消息生产者,只需要给定Broker uri,队列名,消息目模式,然后调用MessageSendercreateMessage()创立一种详细类型消息,createMessage()参数是一种在MessageSender中定义一种无名enum, 指明消息类型。调用MessageSendersendMessage()发送消息到broker中,最后销毁消息 5 消息消费者客户端 消息消费者从Broker上队列或主题中取出消息并做相应解决。 5.1 使用activemq-cpp来创立消息消息者 5.1.1 头文献及名字空间 #include <activemq/concurrent/Thread.h> #include <activemq/concurrent/Runnable.h> #include <activemq/concurrent/CountDownLatch.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <iostream> using namespace activemq; using namespace activemq::core; using namespace cms; using namespace std; 5.1.2 创立一种生产者类 class SimpleAsyncConsumer :public ExceptionListener, public MessageListener { private: Connection* connection; //连接对象 Session* session; //会话 Destination* destination; //消息目地 MessageConsumer* consumer;;//消息消费者 bool useTopic; //与否采用采用主题模式 bool clientAck; //与否自动确认消息接受 std::string brokerURI; //连接borker uri std::string destURI; //队列或主题名 public: /./构造函数 SimpleAsyncConsumer( const std::string& brokerURI, const std::string& destURI, bool useTopic = false, bool clientAck = false ) { connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->useTopic = useTopic; t展开阅读全文
咨信网温馨提示:1、咨信平台为文档C2C交易模式,即用户上传的文档直接被用户下载,收益归上传人(含作者)所有;本站仅是提供信息存储空间和展示预览,仅对用户上传内容的表现方式做保护处理,对上载内容不做任何修改或编辑。所展示的作品文档包括内容和图片全部来源于网络用户和作者上传投稿,我们不确定上传用户享有完全著作权,根据《信息网络传播权保护条例》,如果侵犯了您的版权、权益或隐私,请联系我们,核实后会尽快下架及时删除,并可随时和客服了解处理情况,尊重保护知识产权我们共同努力。
2、文档的总页数、文档格式和文档大小以系统显示为准(内容中显示的页数不一定正确),网站客服只以系统显示的页数、文件格式、文档大小作为仲裁依据,个别因单元格分列造成显示页码不一将协商解决,平台无法对文档的真实性、完整性、权威性、准确性、专业性及其观点立场做任何保证或承诺,下载前须认真查看,确认无误后再购买,务必慎重购买;若有违法违纪将进行移交司法处理,若涉侵权平台将进行基本处罚并下架。
3、本站所有内容均由用户上传,付费前请自行鉴别,如您付费,意味着您已接受本站规则且自行承担风险,本站不进行额外附加服务,虚拟产品一经售出概不退款(未进行购买下载可退充值款),文档一经付费(服务费)、不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。
4、如你看到网页展示的文档有www.zixin.com.cn水印,是因预览和防盗链等技术需要对页面进行转换压缩成图而已,我们并不对上传的文档进行任何编辑或修改,文档下载后都不会有水印标识(原文档上传前个别存留的除外),下载后原文更清晰;试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓;PPT和DOC文档可被视为“模板”,允许上传人保留章节、目录结构的情况下删减部份的内容;PDF文档不管是原文档转换或图片扫描而得,本站不作要求视为允许,下载前可先查看【教您几个在下载文档中可以更好的避免被坑】。
5、本文档所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用;网站提供的党政主题相关内容(国旗、国徽、党徽--等)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。
6、文档遇到问题,请及时联系平台进行协调解决,联系【微信客服】、【QQ客服】,若有其他问题请点击或扫码反馈【服务填表】;文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“【版权申诉】”,意见反馈和侵权处理邮箱:1219186828@qq.com;也可以拔打客服电话:0574-28810668;投诉电话:18658249818。




activemqcpp开发基础手册专业资料.doc



实名认证













自信AI助手
















微信客服
客服QQ
发送邮件
意见反馈



链接地址:https://www.zixin.com.cn/doc/3032803.html