【JAVA并发编程实战】10、并发程序的测试

package cn.study.concurrency.ch12;


public class Util {
    public static int xorShift(int y)
    {
        //进行左移和无符号右移,最后异或操作(异或,当两个位数据不同的时候为1,否则为0)
        y ^= (y << 6);
        y ^= (y >>> 21);
        y ^= (y << 7);
        return y;//y初始值是随机种子
    }
    
    public static void main(String[] args) {
        for(int i = 0; i < 10; ++i)
        {
            
            System.out.println(xorShift((int) System.nanoTime()));
        }
    }
}

2、缓存队列

package cn.study.concurrency.ch12;

import java.util.concurrent.Semaphore;

public class BoundedBuffer<E> {
    //信号量
    private final Semaphore availableItems, availableSpaces;
    private final E[] items;
    private int putPosition=0, takePosition=0;
    
    public BoundedBuffer(int capacity)
    {
        availableItems = new Semaphore(0);
        availableSpaces = new Semaphore(capacity);
        items = (E[]) new Object[capacity];
    }
    
    public boolean isEmpty()
    {
        //这个表示已经是空的了
        return availableItems.availablePermits() == 0;
    }
    
    public boolean isFull()
    {
        //表明这个是满的队列
        return availableSpaces.availablePermits() == 0;
    }
    
    //放入一个对象,首先向availableSpaces请求一个信号量,然后结束之后返回一个availableItems信号
    public void put(E x) throws InterruptedException
    {
        //减少一个许可
        availableSpaces.acquire();
        doInsert(x);
        //添加一个许可
        availableItems.release();
    }
    
    //释放一个数据对象
    public E take() throws InterruptedException
    {
        //当释放一个对象的时候,减少一个连接许可
        availableItems.acquire();
        E item = doExtract();
        availableSpaces.release();//取出数据之后,吧能插入的可能添加一个
        return item;
    }
    
    private synchronized void doInsert(E x)
    {
        int i = putPosition;
        items[i] = x;
        putPosition = (++i == items.length) ? 0 : i;
    }
    
    //不论是取数据,还是获取数据,都是循环取得
    private synchronized E doExtract()
    {
        int i = takePosition;
        E x = items[i];
        items[i] = null;
        takePosition = (++i == items.length) ? 0: i;
        return x;
    }
    
    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
        bb.take();
        if(bb.isEmpty())
        {
            System.out.println("空");
        }
        
        if(bb.isFull())
        {
            System.out.println("满");
        }
    }
}

3、测试方法

package cn.study.concurrency.ch12;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class PutTakeTest {
    //创建线程池
    private static final ExecutorService pool = Executors.newCachedThreadPool();
    //原子型的int数据,最后用来统计线程设计是否合理,是否线程安全
    private final AtomicInteger putSum = new AtomicInteger(0);
    private final AtomicInteger takeSum = new AtomicInteger(0);
    private final CyclicBarrier barrier;    //栅栏
    private final BoundedBuffer<Integer> bb;    //队列
    private final int nTrials, nPairs;    //要创建的队列数量和栅栏的屏障点
    
    //构造函数
    public PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new BoundedBuffer<Integer>(capacity);
        this.nTrials = ntrials;
        this.nPairs = npairs;
        //+1是为了,在所有的线程都建立结束之后,最后编码手动决定什么时候启动所有线程
        this.barrier = new CyclicBarrier(2 * npairs + 1);
    }
    
    //生产者线程
    class Producer implements Runnable
    {
        @Override
        public void run() {
            try {
                //设计随机种子,异或操作
                int seed = (this.hashCode() ^ (int)System.nanoTime());
                int sum = 0;
                barrier.await();//进行栅栏等待
                for(int i = nTrials; i > 0; --i)
                {
                    bb.put(seed);
                    sum += seed;
                    seed = Util.xorShift(seed);//获取随机值
                }
                //获取值,并且添加到putsum中
                putSum.getAndAdd(sum);
                barrier.await();//添加一个栅栏,标识运行结束
            } catch (Exception e) {
                System.out.println("????producer");
            }
        }
    }
    
    //消费者线程
    class Consumer implements Runnable
    {
        @Override
        public void run() {
            try {
                barrier.await();//进行栅栏等待
                int sum = 0;
                for(int i = nTrials; i > 0; --i)
                {
                    sum += bb.take();
                }
                //获取值,并且添加到putsum中
                takeSum.getAndAdd(sum);
                barrier.await();//添加一个栅栏,标识运行结束
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(Thread.currentThread().getName() + "????consumer");
            }
        }
    }
    
    //测试函数
    public void test()
    {
        try {
            for(int i = 0; i < nPairs; ++i)
            {
                pool.execute(new Producer());
                pool.execute(new Consumer());
            }
            //当现场全部添加好了之后,打开栅栏
            barrier.await();    //第2n+1个
            //然后线程执行之后,每个线程执行完又调用了一次await,当所有的都执行完了之后
            barrier.await();
            //全部结束之后判断是否结果对等
            if(putSum.get() == takeSum.get())
            {
                System.out.println("程序是OK的");
            }
            else
            {
                System.out.println("程序有安全漏洞");
            }
        } catch (Exception e) {
            System.out.println("????test");
        }
    }
    
    public static void main(String[] args) {
        //队列容量是10,每个线程起10个,一共放100000个数据
        new PutTakeTest(10, 2, 100).test();
        pool.shutdown();//执行完,结束线程
    }
}