生产者-消费者模型的3种Java实现:synchronized,signal/notifyAll及BlockingQueue

我的技术博客经常被流氓网站恶意爬取转载。请移步原文:http://www.cnblogs.com/hamhog/p/3555111.html,享受整齐的排版、有效的链接、正确的代码缩进、更好的阅读体验。

【实现1:synchronized】

含测试函数main。

public class ProductManagerUsingSync {
    
    static final int MAX_AMOUNT = 1000;
    int currentAmount;

    /**
     * @param args
     */
    public static void main(String[] args) {
        ProductManagerUsingSync manager = new ProductManagerUsingSync(); 
        
        for (int i = 0; i < 5; i++){
            int consume = (int) Math.round(Math.random()*50) + 10;
            Thread consumerThread = new Thread(new ConsumerWithSync(consume, manager));
            consumerThread.start();
        }
        
        for (int i = 0; i < 10; i++){
            int produce = (int) Math.round(Math.random()*50) + 10;
            Thread producerThread = new Thread(new ProducerWithSync(produce, manager));
            producerThread.start();
        }
    }
    
    public ProductManagerUsingSync() {
        currentAmount = 0;
    }
    
    /**
     * Add product. If can't, return.
     * @param addAmount
     * @return if succeeded.
     */
    public boolean addProduct(int addAmount){
        if (currentAmount + addAmount > MAX_AMOUNT)
            return false;
        
        currentAmount += addAmount;
        System.out.println("produced: " + addAmount + " current: " + currentAmount); 
        return true;
    }

    /**
     * Take product. If can't, return.
     * @param takeAmount The amount of product to take.
* @return if succeeded. */ public boolean takeProduct(int takeAmount){ if (takeAmount > currentAmount) return false; currentAmount -= takeAmount; System.out.println("consumed: " + takeAmount + " current: " + currentAmount); return true; } } class ProducerWithSync implements Runnable { private int amount; private ProductManagerUsingSync manager; ProducerWithSync(int amount, ProductManagerUsingSync manager) { this.amount = amount; this.manager = manager; } @Override public void run() { while (true) { synchronized (manager) { if (manager.addProduct(amount)) return; } } } } class ConsumerWithSync implements Runnable { private int amount; private ProductManagerUsingSync manager; ConsumerWithSync(int amount, ProductManagerUsingSync manager) { this.amount = amount; this.manager = manager; } @Override public void run() { while (true) { synchronized (manager) { if (manager.takeProduct(amount)) return; } } } }

解释:Consumer类和Producer类在run方法中进行产品的生产和消费。重点在于:1. 在尝试生产、消费前会获取manager上的锁。由于所有的生产者、消费者中的manager都是同一个实例,因此消费、生产过程是保证线程安全(单线程串行)的。2. 在生产、消费失败的情况下,会进入死循环,反复再次尝试,直到成功为止。

这种实现方法下,暂时不能生产、消费时需要一直死循环,太占资源了;如果在每次循环之间sleep,则不一定能及时生产、消费。

【实现2:signal/notifyAll】

含测试函数main。

public class ProductManagerUsingSignal {

    static final int MAX_AMOUNT = 1000;
    int currentAmount;

    /**
     * @param args useless
     */
    public static void main(String[] args) {
        ProductManagerUsingSignal manager = new ProductManagerUsingSignal(); 
        
        for (int i = 0; i < 5; i++){
            int consume = (int) Math.round(Math.random()*50);
            Thread consumerThread = new Thread(new Consumer(consume, manager));
            consumerThread.start();
        }
        
        for (int i = 0; i < 10; i++){
            int produce = (int) Math.round(Math.random()*50);
            Thread producerThread = new Thread(new Producer(produce, manager));
            producerThread.start();
        }
    }
    
    public ProductManagerUsingSignal(){
        currentAmount = 0;
    }

    /**
     * Add product. If can't, wait. NotifyAll when finished.
     * @param addAmount The amount of product to add.
     */
    public synchronized void addProduct(int addAmount){
        while (currentAmount + addAmount > MAX_AMOUNT) { 
            try { 
                wait(); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
        currentAmount += addAmount; 
        System.out.println("produced: " + addAmount + " current: " + currentAmount); 
        notifyAll(); 
    }

    /**
     * Take product. If can't, wait. NotifyAll when finished.
     * @param takeAmount The amount of product to take.
     */
    public synchronized void takeProduct(int takeAmount){
        while (takeAmount > currentAmount) { 
            try { 
                wait(); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
        currentAmount -= takeAmount; 
        System.out.println("consumed: " + takeAmount + " current: " + currentAmount); 
        notifyAll();
    }

}

class Producer implements Runnable { 
    private int amount;
    private ProductManagerUsingSignal manager; 

    Producer(int amount, ProductManagerUsingSignal manager) { 
        this.amount = amount; 
        this.manager = manager; 
    } 

    @Override
    public void run() {
        manager.addProduct(amount); 
    } 
} 

class Consumer implements Runnable { 
    private int amount;
    private ProductManagerUsingSignal manager;

    Consumer(int amount, ProductManagerUsingSignal manager) { 
        this.amount = amount; 
        this.manager = manager; 
    } 

    @Override
    public void run() { 
        manager.takeProduct(amount); 
    } 
}

解释:这种实现同样用synchronized保证线程安全;它的重点在于,当生产、消费失败时,会进入wait状态,让位给其他线程;而完成一次成功的生产或消费后,会调用notifyAll方法,唤醒之前等待状态的进程。这种实现在效率上要好于第一种。

【实现3:BlockingQueue】

含测试函数main。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProductManagerUsingBlockingQueue {
BlockingQueue<Integer> sharedQueue;
/**
     * @param args
     */
    public static void main(String[] args) {
        sharedQueue = new LinkedBlockingQueue<Integer>();

        for (int i = 0; i < 10; i++){
            Thread consumerThread = new Thread(new ConsumerWithBlockingQueue(sharedQueue));
            consumerThread.start();
        }

        for (int i = 0; i < 10; i++){
            Thread producerThread = new Thread(new ProducerWithBlockingQueue(i, sharedQueue));
            producerThread.start();
        }
    }

}

class ProducerWithBlockingQueue implements Runnable {

    private int amount;
    private final BlockingQueue<Integer> sharedQueue;

    public ProducerWithBlockingQueue (int amount, BlockingQueue<Integer> sharedQueue) {
        this.amount = amount;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {

        try {
            sharedQueue.put(amount);
            System.out.println("produced: " + amount);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

class ConsumerWithBlockingQueue implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;

    public ConsumerWithBlockingQueue (BlockingQueue<Integer> sharedQueue) {
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        try {
            System.out.println("consumed: " + sharedQueue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

解释:这种方法借助数据结构BlockingQueue(初始化好像应该放在构造函数里,暂时来不及改了),底层原理与signal/notifyAll类似,但代码实现就简洁了许多。

【总结】

在需要实现生产者-消费者模式的场景下,我们可以优先考虑用BlockingQueue来实现。