小规模的流处理框架.Part 1: thread pools

2019年12月06日 阅读数:156
这篇文章主要向大家介绍小规模的流处理框架.Part 1: thread pools,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

原文连接 做者:Tomasz Nurkiewicz 译者:simonwang
(译者:强力推荐这篇文章,做者设计了一个用于小流量的流式数据处理框架,并详细给出了每个须要注意的设计细节,对比了不一样设计方案的优缺点,可以让你对流处理过程,某些设计模式和设计原则以及指标度量工具备一个更深入的认识!)
GeeCON 2016上我为个人公司准备了一个编程竞赛,此次的任务是设计并实现一个可以知足如下要求的系统:
html

系统可以每秒处理1000个任务,每个Event至少有2个属性:java

  • clientId-咱们但愿每一秒有多个任务是在同一个客户端下处理的(译者:不一样的clientId对应不一样的ClientProjection,即对应不一样的一系列操做)
  • UUID-全局惟一的

消费一个任务要花费10毫秒,为这样的流设计一个消费者:算法

  1. 可以实时的处理任务
  2. 和同一个客户端有关的任务应该被有序地处理,例如你不能对拥有同一个clientId的任务序列使用并行处理
  3. 若是10秒内出现了重复的UUID,丢弃它。假设10秒后不会重复

有几个关于以上要求的重要细节:数据库

  1. 1000events/s的任务量,消耗一个event要10ms,1s内能消耗100个event,那么为了保证明时性,就须要10个并发的消费者。
  2. events拥有汇集的ID(clientId),在1s内咱们但愿多个event可以被指定到同一个给定的client上,而且咱们不可以并发地或无序地处理这些event。
  3. 咱们必须以某种方式忽略重复的信息,最可能的方法就是记住最近10s内全部的ID,这就须要暂时保存一万个UUID。

在这篇文章中,我会引导大家使用一些成功的方案并作一些小小的突破,你将要学习如何使用精确地有针对性的度量器来解决问题。编程

Naive sequential processing

咱们能够在迭代器中处理这个问题,首先咱们能够对API作一些假设,想象一下它会是这个样子:设计模式

01 interface EventStream {
02  
03     void consume(EventConsumer consumer);
04  
05 }
06  
07 @FunctionalInterface
08 interface EventConsumer {
09     Event consume(Event event);
10 }
11  
12 @Value
13 class Event {
14  
15     private final Instant created = Instant.now();
16     private final int clientId;
17     private final UUID uuid;
18  
19 }

一个典型的推送式API,和JMS很像。须要注意的是EventConsumer是阻塞的,这就意味着它不会返回新的Event,除非前一个已经被处理完毕了。这仅仅是我作出的一个假设,并且它没有太大的违反以前的要求,这也是JMS中消息侦听者的工做机制。下面是一个简单的实现,这个实现只是简单的添加了一个工做间隔为10ms的侦听器:安全

1 class ClientProjection implements EventConsumer {
2  
3     @Override
4     public Event consume(Event event) {
5         Sleeper.randSleep(101);//译者:这里只是用睡眠来代替实际编程中一些耗时的操做
6         return event;
7     }
8  
9 }

固然在现实生活中这个consumer可能会在数据库中作一些存储操做,或者进行远程调用等等。我在睡眠时间的分布上添加了一些随机性,目的是使得手动测试更加贴近实际状况(译者:实际状况中耗时操做的用时不尽相同,因此要随机化):数据结构

01 class Sleeper {
02  
03     private static final Random RANDOM = new Random();
04  
05     static void randSleep(double mean, double stdDev) {
06         final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
07         try {
08             TimeUnit.MICROSECONDS.sleep((long) micros);
09         catch (InterruptedException e) {
10             throw new RuntimeException(e);
11         }
12     }
13  
14 }
15  
16 //...
17  
18 EventStream es = new EventStream();  //some real implementation here
19 es.consume(new ClientProjection());

以上的代码可以编译并运行,但为了知足设计要求咱们必需要插入一些度量器。最重要的度量器就是有关于信息消费的潜伏期,这个潜伏期指的是从信息的产生到开始处理的这段时间。咱们使用 Dropwizard Metrics来实现这个潜伏期的度量:多线程

01 class ClientProjection implements EventConsumer {
02  
03     private final ProjectionMetrics metrics;
04  
05     ClientProjection(ProjectionMetrics metrics) {
06         this.metrics = metrics;
07     }
08  
09     @Override
10     public Event consume(Event event) {
11         metrics.latency(Duration.between(event.getCreated(), Instant.now()));
12         Sleeper.randSleep(101);
13         return event;
14     }
15  
16 }

ProjectionMetrics类的功能以下(主要就是将event的潜伏期用柱状图的形式表现出来):并发

01 import com.codahale.metrics.Histogram;
02 import com.codahale.metrics.MetricRegistry;
03 import com.codahale.metrics.Slf4jReporter;
04 import lombok.extern.slf4j.Slf4j;
05  
06 import java.time.Duration;
07 import java.util.concurrent.TimeUnit;
08  
09 @Slf4j
10 class ProjectionMetrics {
11  
12     private final Histogram latencyHist;
13  
14     ProjectionMetrics(MetricRegistry metricRegistry) {
15         final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
16                 .outputTo(log)
17                 .convertRatesTo(TimeUnit.SECONDS)
18                 .convertDurationsTo(TimeUnit.MILLISECONDS)
19                 .build();
20         reporter.start(1, TimeUnit.SECONDS);
21         latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class"latency"));
22     }
23  
24     void latency(Duration duration) {
25         latencyHist.update(duration.toMillis());
26     }
27 }

如今当你运行这个解决方案时,你很快就会发现潜伏期的中值和第99.9%的值(分别指的是第count/2个值和第99.9%*count个值)都在无限增加:

1 type=HISTOGRAM, [...] count=84,   min=0,  max=795,   mean=404.88540608274104, [...]
2     median=414.0,   p75=602.0,   p95=753.0,   p98=783.0,   p99=795.0,   p999=795.0
3 type=HISTOGRAM, [...] count=182,  min=0,  max=1688,  mean=861.1706371990878,  [...]
4     median=869.0,   p75=1285.0,  p95=1614.0,  p98=1659.0,  p99=1678.0,  p999=1688.0
5  
6 [...30 seconds later...]
7  
8 type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
9     median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0

在运行了30s以后咱们的应用程序处理event会出现平均15s的延迟,所以它并不具有完整的实时性,显然缺乏并发才是缘由所在。咱们的ClientProjection事件消费者会花费10ms去完成事件处理,因此它每秒最多能够处理100个event,然而咱们须要更多的处理量。咱们必需要加强ClientProjection同时不违反其余的设计要求!

Naive thread pool

最显而易见的解决方法是对EventConsumer使用多线程技术,最简单的实现途径就是利用ExecutorService:

01 import java.util.concurrent.ExecutorService;
02 import java.util.concurrent.Executors;
03  
04 class NaivePool implements EventConsumer, Closeable {
05  
06     private final EventConsumer downstream;
07     private final ExecutorService executorService;
08  
09     NaivePool(int size, EventConsumer downstream) {
10         this.executorService = Executors.newFixedThreadPool(size);
11         this.downstream = downstream;
12     }
13  
14     @Override
15     public Event consume(Event event) {
16         executorService.submit(() -> downstream.consume(event));
17         return event;
18     }
19  
20     @Override
21     public void close() throws IOException {
22         executorService.shutdown();
23     }
24 }

这里咱们使用了装饰者模式。最初的ClientProjection实现EventConsumer是可行的,但咱们利用加入了并发的另外一个EventConsumer实现对ClientProjection进行包装。这就容许咱们可以将更复杂的行为组合起来而不用更改ClientProjection自己,这种设计能够:

  • 解耦:不一样的EventConsumer互不影响,但它们却能够自由地组合在一块儿,在同一个线程池中工做
  • 单一职责:每一个EventConsumer只作一项工做,并将本身委托给下一个组件即线程池
  • 开放/关闭原则:咱们能够改变系统的行为却不用修改现有实现

开放/关闭原则一般能够经过注入策略模式和模板方法模式来实现,这很简单。总体的代码以下:

01 MetricRegistry metricRegistry =
02         new MetricRegistry();
03 ProjectionMetrics metrics =
04         new ProjectionMetrics(metricRegistry);
05 ClientProjection clientProjection =
06         new ClientProjection(metrics);
07 NaivePool naivePool =
08         new NaivePool(10, clientProjection);
09 EventStream es = new EventStream();
10 es.consume(naivePool);

咱们写的度量器显示这种改良的方案确实表现的更好:

1 type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]
2     median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
3 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
4     median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0
5  
6 [...30 seconds later...]
7  
8 type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
9     median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0

咱们能够看到延迟虽然也在增加但规模却小得多,30s后潜伏期达到了364ms。这种潜伏期增加是系统问题,咱们须要更多的度量器。注意到NaivePool(你会明白为何这里是naive-初级的)会开启10条线程,这应该足以处理1000个event,每一个要花费10ms。在实际状况下,咱们须要一点额外的处理容量来避免因垃圾回收或小规模峰值负荷所带来的问题。为了证实线程池才是咱们的瓶颈,咱们要监控它内部的队列,这须要一点小小的工做量:

01 class NaivePool implements EventConsumer, Closeable {
02  
03     private final EventConsumer downstream;
04     private final ExecutorService executorService;
05  
06     NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
07         LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
08         String name = MetricRegistry.name(ProjectionMetrics.class"queue");
09         Gauge<Integer> gauge = queue::size;
10         metricRegistry.register(name, gauge);
11         this.executorService =
12                 new ThreadPoolExecutor(
13                         size, size, 0L, TimeUnit.MILLISECONDS, queue);
14         this.downstream = downstream;
15     }
16  
17     @Override
18     public Event consume(Event event) {
19         executorService.submit(() -> downstream.consume(event));
20         return event;
21     }
22  
23     @Override
24     public void close() throws IOException {
25         executorService.shutdown();
26     }
27 }

这里使用ThreadPoolExecutor的目的是为了可以提供自定义的LinkedBlockingQueue实例,接下来就能够监控队列的长度(see:ExecutorService – 10 tips and tricks)。Gauge会周期性地调用queue::size,你须要的时候就会提供队列的长度。度量器显示线程池的大小确实是一个问题:

1 type=GAUGE, name=[...].queue, value=35
2 type=GAUGE, name=[...].queue, value=52
3  
4 [...30 seconds later...]
5  
6 type=GAUGE, name=[...].queue, value=601

不断增加的队列长度进一步加重了队列内正在等待着的task的潜伏期,将线程池的大小增长到10到20之间,最终队列的长度显示合理而且没有失控。然而咱们仍然没有解决重复ID问题,而且也没有解决同一个clientId可能会对它的events进行并发处理的问题。

Obscure locking

让咱们从避免对拥有相同clientId的events使用并行处理开始。若是两个有相同clientId的event一个接一个地来,相继进入线程池队列,那么NaivePool会几乎同时将它们取出队列实现并行处理。开始的时候咱们可能会想到对每个clientId加一个Lock:

01 @Slf4j
02 class FailOnConcurrentModification implements EventConsumer {
03  
04     private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();
05     private final EventConsumer downstream;
06  
07     FailOnConcurrentModification(EventConsumer downstream) {
08         this.downstream = downstream;
09     }
10  
11     @Override
12     public Event consume(Event event) {
13         Lock lock = findClientLock(event);
14         if (lock.tryLock()) {
15             try {
16                 downstream.consume(event);
17             finally {
18                 lock.unlock();
19             }
20         else {
21