2.6.7 RabbitMQ -- Masstransit 详解Consumer 消费者

在 MassTransit 中,一个消费者可以消费一种或多种消息

消费者的类型包括:普通消费者,saga,saga 状态机,路由活动(分布式追踪),处理器 handlers,工作消费者 job comsumers

Consumer

public class Program { public static async Task Main() { var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Consumer<SubmitOrderConsumer>(); }); }); } }

继承 IConsumer,实现 Consume 方法

class SubmitOrderConsumer : IConsumer<SubmitOrder> { public async Task Consume(ConsumeContext<SubmitOrder> context) { await context.Publish<OrderSubmitted>(new { context.Message.OrderId }); } }

三个原则:

Instance

public class Program { public static async Task Main() { var submitOrderConsumer = new SubmitOrderConsumer(); var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Instance(submitOrderConsumer); }); }); } }

所有接收到的消息都由一个消费者来实例来处理(请确保这个消费者类是线程安全)

Consumer 每次接收到消息都会 new 一个实例

Handler

public class Program { public static async Task Main() { var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.ReceiveEndpoint("order-service", e => { e.Handler<SubmitOrder>(async context => { await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}"); }); }); }); } }

通过一个委托 Lambda 方法,来消费消息

OthersProducer 生产者

消息的生产可以通过两种方式产生:发送和发布

发送的时候需要指定一个具体的地址 DestinationAddress,发布的时候消息会被广播给所有订阅了这个消息类型的消费者

基于这两种规则,消息被定义为:命令 command 和事件 event

send

可以调用以下对象的 send 方法来发送 command:

ConsumeContext

public class SubmitOrderConsumer : IConsumer<SubmitOrder> { private readonly IOrderSubmitter _orderSubmitter; public SubmitOrderConsumer(IOrderSubmitter submitter) => _orderSubmitter = submitter; public async Task Consume(IConsumeContext<SubmitOrder> context) { await _orderSubmitter.Process(context.Message); await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow)); } }

ISendEndpointProvider

public async Task SendOrder(ISendEndpointProvider sendEndpointProvider) { var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress); await endpoint.Send(new SubmitOrder { OrderId = "123" }); }

publish发送地址短地址

云原生架构师基础笔记(云原生架构师训练营)(1)

Convention Map

在配置文件中指定 map 规则

EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));

直接发送

public class SubmitOrderConsumer : IConsumer<SubmitOrder> { private readonly IOrderSubmitter _orderSubmitter; public SubmitOrderConsumer(IOrderSubmitter submitter) => _orderSubmitter = submitter; public async Task Consume(IConsumeContext<SubmitOrder> context) { await _orderSubmitter.Process(context.Message); await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow)); } }

可以调用以下对象的 publish 方法来发送 event:

IPublishEndpoint

public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint) { await publishEndpoint.Publish<OrderSubmitted>(new { OrderId = "27", OrderDate = DateTime.UtcNow, }); }

Request-Response 请求-响应

Request-Response 模式让应用程序之间解耦之后,依然采用同步的方式

Consumer

public async Task Consume(ConsumeContext<CheckOrderStatus> context) { var order = await _orderRepository.Get(context.Message.OrderId); if (order == null) throw new InvalidOperationException("Order not found"); await context.RespondAsync<OrderStatusResult>(new { OrderId = order.Id, order.Timestamp, order.StatusCode, order.StatusText }); }

需要处理返回类型 OrderStatusResult,异步方式模拟同步,实际上同样有消息队列,消费者处理过程

IClientFactory

public interface IClientFactory { IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout); IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout); RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout); }

通过 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory

IRequestClient

public interface IRequestClient<TRequest> where TRequest : class { RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout); Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout); }

RequestClient 可以创建请求,或者直接获得响应

Send a request

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status"); var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress); var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

,