每天分享最新软件开发,Devops,敏捷,测试以及项目管理最新,最热门的文章,每天花3分钟学习何乐而不为,希望大家点赞,评论,加关注,你的支持是我最大的动力。
在本文中,我们将演示如何实现Mule应用程序发布—订阅设计模式。 在这里我们将使用消息队列代理RabbitMQ。
- 发布/订阅体系结构: 的 发布-订阅模式 ,也被称为Pub / Sub,是一种建筑设计模式,提供了一个框架之间交换消息的发布者和订阅者。 这种模式包括发布者和订阅者依靠继电器信息发布者的message broker用户。
- RabbitMQ : 它是一个开源和独立于平台的消息代理软件,最初的实现 高级消息队列协议 。 它已经被扩展的插件架构支持流媒体面向文本的消息传递协议和MQ遥测传输。 在MuleSoft,我们将使用一个AMQP连接器的集成 。
我们将使用下面的工具来实现这一集成场景。 这是局限于Windows 64位操作系统版本。 RabbitMQ之前安装Erlang是依赖软件。
- Erlang 24.3.4
- RabbitMQ 3.10.5
- Mule Anypoint Studio 7.11或最新的
- Mule Anypoint平台账户
我们将执行安装和配置RabbitMQ的从一开始,随着依赖。
Erlang安装一旦Erlang软件下载,按照以下步骤安装它。
- 双击otp_win64_24.3.4。 exe并单击“下一步”。
- 选择安装路径,点击“安装”。
- 点击“下一个&安装。” 那么你应该看到如下所示的窗口。 一旦它完成100%我们就可以点击关闭按钮。
- 一旦安装成功,然后设置环境变量Erlang安装目录如下。 这就完成了安装Erlang。
RabbitMQ安装
一旦RabbitMQ软件下载,通过以下步骤安装它。
- 双击rabbitmq-server-3.10.5。 exe并单击“下一步”。
- 选择安装路径,点击“安装”。
- 点击“下一步”,完成。
- 一旦安装完成后,打开命令提示符,去“C: \ Program Files \ RabbitMQ服务器\ rabbitmq_server-3.10.5 \ sbin。” 执行以下命令。
rabbitmq-plugins.bat enable rabbitmq_management
- 以上后,执行命令 rabbitmq-plugins启用rabbitmq_shovel rabbitmq_shovel_management 。 然后你应该看到:
我们完成安装和设置。 这可以通过点击验证http://localhost: 15672(默认凭证:客人/客人)。
登录成功后,集成的目的,创建一个新用户(admin / admin)与专用虚拟主机“MuleIntDev。”
我们完成了对RabbitMQ的安装和配置。
Mule应用程序实现在这个应用程序中,我们将使用一个AMQP连接器从RabbitMQ发布和订阅消息。 流将发布一个消息交换和另一个流将消耗从队列的消息。
AMQP操作不同的操作,下面是一个简短的描述。
- 发布: 这是用于发布消息。
- 消费: 这是用于订阅消息,但它不会自动创建任何活跃的消费者应用程序部署。 这不能被用作源在一个流。 流只能使用流参考执行,等等。
- 听众: 这是用于订阅消息。 它会自动创建一个活跃的消费者应用程序部署。
- 发布消费: 发送消息到一个AMQP交换和等待响应所提供的replyTo目的地或动态创建的临时目的地。
- 消 :这是用于ACK AMQP消息交付。
- 拒绝: 这是用来拒绝一个AMQP消息交付。
创建一个新的Mule应用程序后,拖拽 HTTP侦听器 “AMQP”,然后添加模块如下。
前面的列表的AMQP连接器业务,对于本文,我们将使用 P 出版 和 l istener 操作。
现在拖拽 发布 操作到消息流表面。 然后配置AMQP如下。
创建一个交易所RabbitMQ门户如下。
在这之后,创建一个队列的名称”员工。 MuleESB”和绑定此队列“骡子:员工。” 一次 出版商 发布消息的交换,那么同样的信息将被路由到此队列订阅的用户。
现在输入交易名称为“骡子:员工” 发布 操作连接器配置如下。
发布一条消息为RabbitMQ的最终实现使用AQMP协议如下。
现在部署应用程序和测试下面的负载作为一个员工样本有效载荷。
要求:
{
"Employees": [
{
"userId": "Test rirani",
"jobTitleName": "Developer",
"firstName": "Romin",
"lastName": "Irani",
"preferredFullName": "Test Romin Irani",
"employeeCode": "E1",
"region": "CA",
"phoneNumber": "408-1299967",
"emailAddress": "romin.k.test@gmail.com"
}
]
}
现在,我们可以监视RabbitMQ交换,消息发布到交换,消息的队列状态到达自动进入队列。
现在,拖拽 侦听器 操作到消息流面积来实现 消费 操作。 的 侦听器 会听源配置。 在这种情况下,活跃的消费者将自动创建RabbitMQ一旦应用程序部署。 下面是实现的高级概述。
一旦应用程序部署,消费者会自动创建,如果任何消息到达队列,那么同样的将认购。
正如前面有一个信息,即存在的,同样是选择和加工。 因此,队列变空。
这是全面实施的Mule配置文件。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:amqp="http://www.mulesoft.org/schema/mule/amqp"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/amqp http://www.mulesoft.org/schema/mule/amqp/current/mule-amqp.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="5f227be9-f9f0-4b70-9177-c065634a9a5d" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<amqp:config name="AMQP_Config" doc:name="AMQP Config" doc:id="a65f4585-2300-42a3-9aac-0e8cdebdc8a6" >
<amqp:connection host="localhost" virtualHost="MuleIntDev" username="admin" password="admin" />
</amqp:config>
<flow name="employee-rabbitmq-sys-apiFlow" doc:id="6e50b2b8-69a2-47db-9cd9-965076a8a927" >
<http:listener doc:name="Listener" doc:id="309df0b7-e8c1-4fe0-a2f4-764f1ba87eef" config-ref="HTTP_Listener_config" path="/api/v1/rabbitmq-demo"/>
<logger level="INFO" doc:name="BeforePublish" doc:id="b2316c6f-a0e6-473b-8939-bc3af693e73c" message="**Before Publish Into RabbitMQ**" category="**RabbitMQ**"/>
<amqp:publish doc:name="Publish" doc:id="00f1b4c1-1177-4dc9-a843-bbc37a0ab85f" config-ref="AMQP_Config" exchangeName="Mule:Employee"/>
<logger level="INFO" doc:name="AfterPublish" doc:id="706ce978-557c-4ba3-ad47-745c24537392" message="**After Publish Into RabbitMQ**" category="**RabbitMQ**"/>
<ee:transform doc:name="final_transform" doc:id="52ae715d-ab9c-4cd2-9bde-eb17175e8620" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
{
"status": "Message published successfully"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<error-handler >
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="567ef1e9-37ec-4215-8640-19b6a51d809f" >
<ee:transform doc:name="Transform Message" doc:id="e65e9317-e315-46dd-b4ab-36367a300be8" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
error]]></ee:set-payload>
</ee:message>
</ee:transform>
</on-error-propagate>
</error-handler>
</flow>
<flow name="employee-rabbitmq-sys-apiFlow1" doc:id="52121737-50f5-4def-8a04-3611c6a5f3f1" initialState="stopped">
<amqp:listener doc:name="Listener" doc:id="29358f30-ac13-4c2a-8998-cb0c10ac7c49" config-ref="AMQP_Config" queueName="Employee.MuleESB"/>
<logger level="INFO" doc:name="Consumed_Message" doc:id="1c123130-b666-4519-a68a-1c53bfd40ac2" message="#[payload]" category="**RabbitMQ**"/>
</flow>
<flow name="employee-rabbitmq-sys-apiFlow2" doc:id="c350d2aa-cb56-4252-b1db-8597a846874a" >
<amqp:consume doc:name="Consume" doc:id="a8745777-d895-4b7f-98d3-574737d8fb44" config-ref="AMQP_Config" queueName="Employee:MuleESB"/>
</flow>
</mule>
我们完成了 Pub / Sub设计模式 实现。 学习快乐!
,