任务池的搭建与使用

任务池是后端应用程序发布任务和云计算服务软件领取任务的地方,对应的,需要在RabbitMQ中创建一个消息队列作为任务池。后端应用程序发布任务时,通过默认交换机和任务消息名称,把任务发送到指定的任务消息队列中。

多个云计算服务软件同时监听任务消息队列,同一任务只会被一个云计算服务软件获取。后端应用程序发布任务和云计算服务软件领取任务的流程如图5.25所示。需要注意的是,如果有多种任务和多种与之匹配的云计算服务软件,则需要创建多个任务消息队列。

细说云计算的三种服务模式(我们来浅谈云计算服务架构任务池的搭建和使用)(1)

图5.25 后端应用程序发布任务和云计算服务软件领取任务的流程

除了以上介绍的任务发布与任务领取流程以外,一般来说,任务消息队列还需要关注以下3个细节:

·任务消息队列需要被设置成持久化,避免由于RabbitMQ服务重启而导致任务消息丢失。

·云计算服务软件在监听任务消息队列前,需要设置为手动确认消息,这样即使云计算服务软件在执行任务过程中异常退出(未确认任务消息),任务消息仍会被其他云计算服务软件获取并执行。

·云计算服务软件在监听任务消息队列前,需要设置为在没确认消息之前不接收新消息,确保任务消息不会积压在少数几个云计算服务软件当中。

任务池搭建与使用的相关代码如代码5.15所示,其中,示例代码是使用C 编写的,而后端应用程序一般是使用Java等开发语言编写的,所以在实际编码中,后端应用程序发送任务消息部分的编码可能会有所区别。

代码5.15 任务池搭建与使用的相关代码

//创建任务消息队列,由云计算服务软件或后端应用程序创建都可以

const char* queueNameStr = "任务消息队列名称";

amqp_boolean_t durable = 1; //设置任务消息队列持久化(重启后消息仍不丢失)

amqp_boolean_t autodelete = 0; //设置任务消息队列不自动删除

amqp_queue_declare(connection, channel, amqp_cstring_bytes(queue

NameStr), 0,

durable, 0, autodelete, amqp_empty_table);

//后端应用程序相关

const char* exchange = ""; //默认交换机名称(名称为空字符)

const char* routingkey = "任务消息队列名称"; //路由键设置为任务消息队列名称

//设置消息的相关信息

amqp_basic_properties_t props;

props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |

AMQP_BASIC_DELIVERY_MODE_FLAG; //与下面的配置对应

props.content_type = amqp_cstring_bytes("text/plain"); //消息主体的类型

props.delivery_mode = 2; //持久化消息

//发送消息

const char* messagebody = "发送的任务消息";

amqp_basic_publish(connection, channel, amqp_cstring_bytes(exchange),

amqp_cstring_bytes(routingkey), 0, 0, &props,

amqp_cstring_bytes(messagebody);

//云计算服务软件相关

//设置在没确认消息之前不接收新消息,需要在订阅消息队列前设置

uint16_t prefetchCount = 1;

amqp_basic_qos(connection, channel, 0, prefetchCount, 0);

//订阅消息队列,只调用一次即可

amqp_boolean_t noack = 0; //设置为消息需要手动确认

const char* queueNameStr = "任务消息队列名称";

amqp_basic_consume(connection, channel, amqp_cstring_bytes

(queueNameStr),

amqp_empty_bytes, 0, noack, 0,amqp_empty_table);

//获取消息,可以多次调用,无消息时会自动阻塞

amqp_envelope_t envelope; //定义接收消息的变量

amqp_maybe_release_buffers(connection); //清理buffers

amqp_consume_message(connection, &envelope, NULL, 0); //获取消息

envelope.message.body.bytes; //消息主体的开始指针(char *)envelope.message.body.len; //消息主体的长度

//确认消息,确认消息后,任务消息队列才会向云计算服务软件发送新的任务消息

//envelope.delivery_tag为所获取信息的标识

amqp_basic_ack(connection, channel, envelope.delivery_tag, 0);

任务消息一般为JSON格式,这样能做到灵活配置。后端应用程序在发送任务时,需要把任务消息转换成字符串。而云计算服务在获取到任务消息后,需要把字符串转成JSON格式。任务消息一般包含任务标识、任务ID、任务参数、任务开始时回调后端应用程序的RESTful API地址及任务结束时回调后端应用程序的RESTful API地址等,如代码5.16所示。

代码5.16 任务消息示例

{

"mission" : "任务标识",

"missionID" : "任务ID",

"stateCallBack" : {

"startMissionUrl" : "任务开始时回调后端应用程序的RESTful API地址",

"endMissionUrl" : "任务结束时回调后端应用程序的RESTful API地址"

},

"stateParam" : {

"参数1" : "",

"参数2" : {

}

}

}

本文给大家讲解的内容是云计算服务架构任务池的搭建和使用
  1. 下篇文章给大家讲解的内容是大型网站架构的技术细节:云计算服务架构指令池的搭建和使用
  2. 感谢大家的支持
,