Pipe和Filter模式

Pipe和Filter模式将执行复杂处理的任务分解为可重复使用的一系列离散元素。这种模式可以提高性能,可扩展性和可重用性,允许执行部署和缩放独立处理的任务元素。

问题

一个应用程序可能根据其处理的不同的信息需要执行各种复杂的任务。一个简单的,但不灵活的方法就是可以将应用的处理作为一个单独的模块。但是,如果部分相同的处理需要在应用程序的其他地方,这种方法可能会减少代码重构,重用,优化的机会。

下图1说明了单独模块方法处理数据的流程。应用程序接收和处理来自两个源的数据。每个源的数据由一个单独的模块处理,它们执行一系列任务来转换这些数据,然后将结果传递给应用程序的业务逻辑。

图1. 实现独立模块的解决方案

很多独立模块执行的一些任务在功能上非常相似,但是因为这些模块是分开设计的,实现任务的代码都是紧密耦合在一个模块中,其中的重复部分无法得到重用来提高扩展性和重用性。

然而,每个模块执行的处理任务,或每个任务的部署要求,可能都会随着业务需求的修改而改变。有些任务可能是计算密集型的,可能会受益于运行在强大的硬件,而其他任务可能不需要这样昂贵的资源。此外,在未来可能需要执行一些额外的处理,或命令执行的任务可能会改变。所以需要一个技能解决这些问题,同时也能增加代码重用的解决方案。

解决方案

将每个数据流所需的处理分解成一组离散的组件(或过滤器),然后由每个组件来执行一个任务是一种不错的解决方案。通过标准化每个组件接收和发出的数据的格式,这些组件(过滤器)可以组合成一个管道。这种解决方案有助于避免重复代码,并且在需求发生变化的时候,可以很容易地删除,替换或集成额外的组件来实现功能。图2显示了这种结构的一个例子。

图2 通过管道和过滤器的解决方案

处理单个请求的时间取决于管道中最慢的过滤器的速度。尤其在大量请求发送到组件的时候,某个或者某些组件就可能成为系统的性能瓶颈。管道结构的一个主要优点就在于,它为运行缓慢的过滤器提供了使用并行实例的机会,这样使系统能够均衡负载,以提高吞吐量。

组成管道的过滤器完全可以运行在不同的机器上,并且它们可以利用许多云环境提供的弹性能够来实现独立扩展。一个计算密集型的过滤器可以运行在高性能硬件上,而其他要求较低的过滤器可以运行在稍差的硬件上。过滤器甚至不必在相同的数据中心或不在同一个地点,管道解决方案允许管道中的每个元素可以在接近其所需资源的环境中运行。

图3展示了一个应用了管道处理数据流的例子:

图3 管道中组件的负载分担

如果一个过滤器的输入和输出被结构化为一个流,它就有可能对多个过滤器进行并行处理。管道中的第一个过滤器可以开始它的工作,并开始分发它的处理结果,这是直接传递到下一个过滤器的序列之前,第一个过滤器已完成其工作。

另一个好处是管道过滤器模式可以提供很好的跳转。如果过滤器失败或正在运行的机器不再可用,则管道可以重新安排过滤器正在执行的工作,并将此工作直接指向组件的另一实例。一个过滤器的故障并不会导致整个管道的故障。

使用管道和过滤器模式与事务补偿模式相结合,可以提供一种替代的方法来实现分布式事务。分布式事务可以分解成独立的有偿任务,每一个都可以通过使用一个过滤器,还实现了事务补偿模式。管道中的过滤器可以作为独立的托管任务来执行,这些任务还可以在物理位置上接近他们所维护的数据,以降低网络代价。

实现管道过滤器模式需要考虑的问题

开发者在考虑实现Pipe和Filter模式的时候,需要考虑以下一些方面:

何时使用该模式

在以下一些场景的时候,可以考虑使用管道过滤器模式:

d管道过滤器模式在以下一些场景可能不是十分适用:

使用举例

开发者可以选择使用消息队列架构来实现管道过滤器模式。消息队列不断的接收那些没有处理的消息。然后令所有的过滤器组件监听消息队列的消息。当有新的消息来得时候,执行其工作,然后不断的消费消息,并且将处理后的消息添加到下一个队列之中。然后另一个类似的任务同样监听消息队列的消息进行处理,直到整个管道中的过滤器都完成了处理为止。

图4 通过消息队列实现管道过滤器模式

如果开发者是基于Windows Azure来开发的话,可以使用Windows Azure服务总线队列来为解决方案提供一个可靠的可扩展的队列机制。下面的ServiceBusPipeFilter类就是这样的一个例子。下面的代码展示了一个过滤器如何从队列中接收消息,处理消息,并将结果发送到另一个队列之中。

public class ServiceBusPipeFilter
{
    ...
    private readonly string inQueuePath;
    private readonly string outQueuePath;
    ...
    private QueueClient inQueue;
    private QueueClient outQueue;
    ...
    public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
    {
        ...
        this.inQueuePath = inQueuePath;
        this.outQueuePath = outQueuePath;
    }
    public void Start()
    {
        ...
        // Create the outbound filter queue if it does not exist.
        ...
        this.outQueue = QueueClient.CreateFromConnectionString(...);
        ...
        // Create the inbound and outbound queue clients.
        this.inQueue = QueueClient.CreateFromConnectionString(...);
    }
    public void OnPipeFilterMessageAsync(
        Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
    {
        ...
        this.inQueue.OnMessageAsync(
        async (msg) =>
        {
            ...
            // Process the filter and send the output to the
            // next queue in the pipeline.
            var outMessage = await asyncFilterTask(msg);
            // Send the message from the filter processor
            // to the next queue in the pipeline.
            if (outQueue != null)
            {
                await outQueue.SendAsync(outMessage);
            }
            // Note: There is a chance that the same message could be sent twice
            // or that a message may be processed by an upstream or downstream
            // filter at the same time.
            // This would happen in a situation where processing of a message was
            // completed, it was sent to the next pipe/queue, and then failed
            // to complete when using the PeekLock method.
            // Idempotent message processing and concurrency should be considered
            // in a real-world implementation.
        }, options);
    }
    
    public async Task Close(TimeSpan timespan)
    {
        // Pause the processing threads.
        this.pauseProcessingEvent.Reset();
        // There is no clean approach for waiting for the threads to complete
        // the processing. This example simply stops any new processing, waits
        // for the existing thread to complete, then closes the message pump
        // and finally returns.
        Thread.Sleep(timespan);
        this.inQueue.Close();
        ...
    }
    ...
}

ServiceBusPipeFilter类中的Start方法连接了一个输入和输出的队列,而Close方法则解除了相应的连接。OnPipeFilterMessageAsync方法则对消息进行了实际的处理,其中asyncFilterTask参数则用来指定要进行的处理。OnPipeFilterMessageAsync方法会持续等待输入队列的消息,然后会基于asyncFilterTask参数来对接收到的消息进行处理,然后将结果发布到输出队列。队列本身都是通过构造函数来指定的。

一般的解决方案都是将过滤器当做一组工作单元的方式来实现的。每一个工作单元都可以独立的进行伸缩,这都取决于业务的复杂性和其所需要执行处理需要的资源的多少。而且,可以通过对过滤器的并行执行来提高吞吐。下面的代码展示了一个名为PipeFilterARoleEntryWindows Azure的工作单元。

public class PipeFilterARoleEntry : RoleEntryPoint
{
    ...
    private ServiceBusPipeFilter pipeFilterA;
    public override bool OnStart()
    {
        ...
        this.pipeFilterA = new ServiceBusPipeFilter(
        ...,
        Constants.QueueAPath,
        Constants.QueueBPath);
        this.pipeFilterA.Start();
        ...
    }
    public override void Run()
    {
        this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
        {
            // Clone the message and update it.
            // Properties set by the broker (Deliver count, enqueue time, ...)
            // are not cloned and must be copied over if required.
            var newMsg = msg.Clone();
            await Task.Delay(500); // DOING WORK
            Trace.TraceInformation(“Filter A processed message:{0} at {1}”,
                msg.MessageId, DateTime.UtcNow);
            newMsg.Properties.Add(Constants.FilterAMessageKey, “Complete”);
            return newMsg;
        });
        ...
    }
    ...
}

该过滤器中包含了一个ServiceBusPipeFilter对象。其中的OnStart方法会连接接收消息的输入队列和接收处理完成消息的输出队列(定义在Constats类中)。而Run方法会调用OnPipeFilterMessagesAsync来对每一个接收到的消息进行一些处理(在该示例中,只是简单的等待一段时间)。当处理完成的时候,会生成一个新的消息(在该示例中,只是在属性中增加了部分信息),然后发送到输出队列。

开发者也可以根据示例代码定义新的RoleEntryPoint实现。在实现上,基本不会有太多的差别,只是在其中的Run方法中处理略有不同。然后将不同的工作单元连接起来就是一个管道过滤器模式的实现了。参考如下代码:

public class FinalReceiverRoleEntry : RoleEntryPoint
{
    ...
    // Final queue/pipe in the pipeline from which to process data.
    private ServiceBusPipeFilter queueFinal;
    public override bool OnStart()
    {
        ...
        // Set up the queue.
        this.queueFinal = new ServiceBusPipeFilter(...,
            Constants.QueueFinalPath);
        this.queueFinal.Start();
        ...
    }
    public override void Run()
    {
        this.queueFinal.OnPipeFilterMessageAsync(
            async (msg) =>
            {
                await Task.Delay(500); // DOING WORK
                // The pipeline message was received.
                Trace.TraceInformation(
                "Pipeline Message Complete - FilterA:{0} FilterB:{1}",
                msg.Properties[Constants.FilterAMessageKey],
                msg.Properties[Constants.FilterBMessageKey]);
                return null;
            }
        );
        ...
    }
    ...
}

相关的其他模式

在实现管道过滤器模式的时候,可以参考如下一些其他的模式: