理解Distuptor原理

Java内置队列

基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要是通过加锁的方式来保证线程安全;基于链表的线程安全队列分为LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也是通过锁的方式来实现线程安全,而后者是通过CAS这种不加锁的方式来实现的(LinkedTransferQueue也是通过CAS实现的)。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确切的范围内);而加锁的方式,可以实现有界队列。在稳定性特别高的系统中,为了防止生产这生产速度过快,导致内存溢出,只能选择有界队列;同时为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列只有ArrayBlockingQueue。

ArrayBlockingQueue伪共享的问题

什么是共享

如下图计算的基本结构,L1、L2、L3分别代表一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享

2021-08-20-10-14-08

当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中(这个比较好理解)。

另外,线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据(这个比较好理解)。

缓存行

Cache是由很多个cache line组成的。每个cache line通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量。

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line(当前位置之后的)。

在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。事实上,你可以非常快速的遍历在连续内存块中分配的任意数据结构。

测试利用cache line和不使用cache line特定的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public class Main {

    static long[][] arr;

    public static void main(String[] args) {
        arr = new long[1024 * 1024][];
        for (int i = 0; i < 1024 * 1024; i++) {
            arr[i] = new long[8];
            for (int j = 0; j < 8; j++) {
                arr[i][j] = 0L;
            }
        }

        long sum = 0L;
        long marked = System.currentTimeMillis();
        for (int i = 0; i < 1024 * 1024; i++) {
            for (int j = 0; j < 8; j++) {
                sum += arr[i][j];
            }
        }
        System.out.println("Loop times: " + (System.currentTimeMillis() - marked) + "ms");

        marked = System.currentTimeMillis();
        for (int i = 0; i < 8; i++) {
            for (int j = 0; j < 1024 * 1024; j++) {
                sum += arr[j][i];
            }
        }
        System.out.println("Loop times: " + (System.currentTimeMillis() - marked) + "ms");
    }
}

什么是伪共享

ArrayBlockingQueue有三个成员变量: - takeIndex:需要被取走的元素下标 - putIndex:可被元素插入的位置的下标 - count:队列中元素的数量

这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果(因为一个线程修改后,其他线程需要重新从主存读取)。

如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取(本来我只需要从主存中只重新获取putIndex、count的,现在好了,我takeIndex也需要重新获取)。

这种无法充分使用缓存行特性的现象,称为伪共享。

对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103

public class Main2 {
    // 用于表明执行次数
    public final static long ITERATIONS = 500L * 1000L * 100L;

    private final int arrayIndex;

    private static ValuePadding[] longs;
    private static ValueNoPadding[] longs2;

    public Main2(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    public static void main(String[] args) throws Exception {

        for (int i = 0; i < 10; i++) {
            System.gc();
            long start = System.currentTimeMillis();
            runTest2(i);
            System.out.println("Thread run " + i + " duration = " + (System.currentTimeMillis() - start));
        }

        for (int i = 0; i < 10; i++) {
            System.gc();
            long start = System.currentTimeMillis();
            runTest(i);
            System.out.println("Thread run " + i + " duration = " + (System.currentTimeMillis() - start));
        }
    }

    private static void runTest(int NUM_THREADS) throws InterruptedException {

        // 创建包含1个到10个ValuePadding的ValuePadding数组
        longs = new ValuePadding[NUM_THREADS];
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new ValuePadding();
        }

        // 创建包含1个到10个Thread的Thread数组
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            final int idxInLong = i;
            threads[i] = new Thread(() -> {
                long idx = ITERATIONS + 1;
                while (0 != --idx) {
                    longs[idxInLong].value = 0L;
                }
            });
        }

        for (Thread thread : threads) {
            thread.start();
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }

    private static void runTest2(int NUM_THREADS) throws InterruptedException {

        // 创建包含1个到10个ValuePadding的ValuePadding数组
        longs2 = new ValueNoPadding[NUM_THREADS];
        for (int i = 0; i < longs2.length; i++) {
            longs2[i] = new ValueNoPadding();
        }

        // 创建包含1个到10个Thread的Thread数组
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            final int idxInLong = i;
            threads[i] = new Thread(() -> {
                long idx = ITERATIONS + 1;
                while (0 != --idx) {
                    longs2[idxInLong].value = 0L;
                }
            });
        }

        for (Thread thread : threads) {
            thread.start();
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }


    public final static class ValuePadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        protected long p9, p10, p11, p12, p13, p14, p15;
    }

    public final static class ValueNoPadding {
        // protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        // protected long p9, p10, p11, p12, p13, p14, p15;
    }
}

该实验的输出结果如下:


Thread run 0 duration = 0
Thread run 1 duration = 347
Thread run 2 duration = 1343
Thread run 3 duration = 2503
Thread run 4 duration = 3456
Thread run 5 duration = 2898
Thread run 6 duration = 3359
Thread run 7 duration = 3413
Thread run 8 duration = 3802
Thread run 9 duration = 3943

Thread run 0 duration = 0
Thread run 1 duration = 313
Thread run 2 duration = 322
Thread run 3 duration = 318
Thread run 4 duration = 417
Thread run 5 duration = 419
Thread run 6 duration = 467
Thread run 7 duration = 456
Thread run 8 duration = 513
Thread run 9 duration = 515

分析一下为什么该实验可以验证通过增加元素的间隔使得不同线程存取的元素位于不同的缓存行上,如下代码所示,这行代码是说我第一条线程将第一个longs数组中第一个ValuePadding的value设置成0L。在内存上,这些ValuePadding会位于不同的缓存行,所以对某个线程而言,即使其他线程修改其他的ValuePadding,也不会影响当前的线程的ValuePadding的缓存行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

// 创建包含1个到10个Thread的Thread数组
Thread[] threads = new Thread[NUM_THREADS];
for (int i = 0; i < threads.length; i++) {
    final int idxInLong = i;
    threads[i] = new Thread(() -> {
        long idx = ITERATIONS + 1;
        while (0 != --idx) {
            longs[idxInLong].value = 0L;
        }
    });
}

而针对ValueNoPadding,由于没有缓存行,所以可能多个ValueNoPadding位于同一个缓存行中,这样就会导致B线程修改B线程的ValueNoPadding时,一定会导致A线程重新的去主存里加载自己的ValueNoPadding,这个时候就造成了伪缓存。

备注:在jdk1.8中,有专门的注解@Contended来避免伪共享,更优雅地解决问题。(这个我还没有研究和使用过)

Disruptor的设计方案

Disruptor采用了以下设计来解决队列速度慢的问题:

  1. 环形数组结构:为了可以避免垃圾回收,采用数组而非链表。且数组对处理器的缓存机制更加友好。

  2. 元素位置定位:数组长度2^n,通过位运算,加速定位的速度。下标采用递增的形式,不用担心index溢出的问题。index是long类型,即使100万的QPS的处理速度,也需要30万年才能用完。

  3. 无锁设计:每个生产者或者消费者线程会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方案时,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

(我能这么理解么:就是在Disruptor中申请空间的过程就是上面说的每个线程分配一段空间)

但是会引入一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与RingBuffer大小相同的buffer:available buffer。当某个位置写入成功的时候,便把available Buffer相应的位置置位,编辑为写入成功。读取的时候,会便利available Buffer,来判断元素是否已经就绪。

(哈哈,我已经猜到是上面的实现了)

(我现在产生了新的问题,如果一个线程申请了空间,确迟迟不写入,会不会导致整个队列被阻塞了)

参考资料

  1. 高性能队列——Disruptor