Competing-Consumers模式支持多个并发消费者来处理消费消息队列上接收到的消息。该模式令系统能够并发的处理消息,并且优化吞吐,增加扩展性和可用性,同时平衡工作负载。

问题

在云环境中运行的程序可能需要处理大量的请求。相对于同步的处理每个请求,通常更常用的技术是由应用将请求通过消息系统来传给其他的服务(消费者服务),然后由其他的服务来异步处理。该策略能够确保应用中的业务逻辑不会因为请求的处理而产生阻塞。

请求的数量可能会因为各种各样的原因,在不同的时刻在数量上的差别非常大。突然的大规模的客户行为或者多个代理发送聚合的大量请求都可能引起难以预期的工作负载。在峰值期间,系统可能每秒都需要处理几百个请求。而其他的时间可能需要处理的请求数则大大减少。另外,需要处理这些请求的工作所需要的时间可能相差通常很大。如果只使用一个消费服务,可能会引起消费服务被请求淹没,或者消息系统可能会因为应用涌入的大量的请求而过载等问题。为了处理这种波动的工作负载,系统必须要运行多个消费服务实例。然而,多个消费服务实例也需要谨慎的控制,以确保每个消息仅仅发送给一个消费者。工作负载需要很好的进行均衡,才能确保单个消费服务实例到达性能瓶颈。

解决方案

可以考虑使用消息队列来实现应用和消费者服务之间的通信。应用将请求以一定的格式的消息提交到消息队列,然后消费者服务实例接收消息队列中的消息,对消息进行处理。使用该种方式,可以将一个消费者服务调用来订阅一个消息队列。图1展示了该种方式的架构。

图1 使用消息队列来讲工作分发到不同的服务实例上

该种解决方案会带来以下优势:

实现Competing-Consumers模式需要考虑的问题

在决定需要实现Competing-Consumers模式的时候,需要考虑如下问题:

何时使用该模式

当碰到以下场景时,适合使用Competing-Consumers模式:

以下场景不适合使用Competing-Consumers模式:

一些消息系统支持会话机制,允许生产者将消息聚合在一起,由同一个消费者来处理。这种机制可以用来优化消息(如果支持该功能)来实现对消息进行排序,让某个消费者以一定的顺序来执行生产者发送的消息。

使用举例

Windows Azure系统存储队列和服务总线队列,可以用来实现Competing-Consumers模式。应用将消息发送给消息队列,然后由消费者任务从队列中获取消息并进行处理。为了拥有更好的弹性,服务总线队列可以令消费者在从队列中使用PeekLock模式中获取信息。使用PeekLock模式并不会直接移除消息队列中的信息,只是将其隐藏起来,令其对消费者不再可见。而最初消费消息的消费者可以在其完成消息处理之后,将消息从消息队列移除。如果消费者不可用,PeekLock会之后因为处理的超时,将消息重置为消费者可见,允许其他消费者消费。

关于更多使用Windows Azure Service Bus队列的详细信息,可以参考MSDN上面的Service Bus Queues, Topics, and Subscriptions。关于更多使用Windows Azure存储队列的信息,可以参考How to use the Queue Storage Service.

下面的代码来自CompetingConsumers解决方案中的QueueManager中的代码,其中介绍了如何令开发者在web或者worker的启动事件中通过使用QueueCient实例来创建队列。

private string queueName = ...;
private string connectionString = ...;
...
public async Task Start()
{
    // Check if the queue already exists.
    var manager = NamespaceManager.CreateFromConnectionString(this.connectionString);
    if (!manager.QueueExists(this.queueName))
    {
        var queueDescription = new QueueDescription(this.queueName);
        // Set the maximum delivery count for messages in the queue. A message
        // is automatically dead-lettered after this number of deliveries. The
        // default value for dead letter count is 10.
        queueDescription.MaxDeliveryCount = 3;
        await manager.CreateQueueAsync(queueDescription);
    }
    ...
    // Create the queue client. By default the PeekLock method is used.
    this.client = QueueClient.CreateFromConnectionString(
    this.connectionString, this.queueName);
}

下面的代码片段展示了应用如何创建和将消息发送到消息队列。

public async Task SendMessagesAsync()
{
    // Simulate sending a batch of messages to the queue.
    var messages = new List<BrokeredMessage>();
    for (int i = 0; i < 10; i++)
    {
        var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
        messages.Add(message);
    }
    await this.client.SendBatchAsync(messages);
}

下面的代码展示了一个消费者服务实例以事件驱动的方式从队列中获取消息。ReceiveMessages函数的参数参数processMessageTask是一个代理,当消费者接收到消息的时候就会执行。方法是以异步的方式执行的。

private ManualResetEvent pauseProcessingEvent;
...
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask)
{
    // Set up the options for the message pump.
    var options = new OnMessageOptions();
    // When AutoComplete is disabled it is necessary to manually
    // complete or abandon the messages and handle any errors.
    options.AutoComplete = false;
    options.MaxConcurrentCalls = 10;
    options.ExceptionReceived += this.OptionsOnExceptionReceived;
    // Use of the Service Bus OnMessage message pump.
    // The OnMessage method must be called once, otherwise an exception will occur.
    this.client.OnMessageAsync(
        async (msg) =>
        {
            // Will block the current thread if Stop is called.
            this.pauseProcessingEvent.WaitOne();
            // Execute processing task here.
            await processMessageTask(msg);
        }, 
        options
    );
}
...
private void OptionsOnExceptionReceived(object sender,
    ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    ...
}

另外,Windows Azure还支持自动伸缩的特性,可以用来在队列的长度发生波动的时候启动和关闭消费者实例。关于更多的信息,可以参考Autoscaling Guidance。另外,并非一定需要维持一对一的生产者消费者关系,可以存在一个生产实例,多个消费实例的情况。关于更多的信息,可以参考Compute Resource Consolidation模式

相关的其他模式

在实现Competing-Consumers模式的时候,可以参考如下其他模式: