java-并发编程

1、并发编程

  并发通常能提高单处理器的程序性能。可是,看到这句话有些违背直觉。多线程的运行增加了线程间切换的开销,仅仅从这个角度看,单线程运行总比多线程的性能好。但是,程序的阻塞会使得结果不一样,当某个线程阻塞时候,其它线程仍然可以执行,因此程序仍保持运行。充分利用cpu的时间提高的性能远大于线程间的切换带来的性能下降。

何为阻塞:程序中某个线程由于在不满足某些条件的情况下而导致不能够继续执行的现象

2、基本线程机制:

  一个程序可以分为多个独立运行任务,每个独立任务由线程驱动执行。线程并发执行,表面看起来是同时执行,好像各自都有一个自己的CPU一样。实际上,底层机制是CPU将时间片分给各个线程,一个时间只能有一个线程获得CPU的时间片,也就是线程独占CPU资源。

3、定义任务类、定义线程类

  • 定义任务
package com.duoxiancheng;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {

        for (int i=0; i<2; i++) {
          new Thread(new LiftOff()).start();
        }
    }
}

class LiftOff implements Runnable{
    private int countDown=3;
    private static int taskCount=0;
    private final int id=taskCount++;

    @Override
    public void run() {
        while (countDown-->0) {
            System.out.println("#>));
            Thread.yield();
        }
    }
}

输出结果

#id=0 countDown=2
#id=1 countDown=2
#id=0 countDown=1
#id=1 countDown=1
#id=0 countDown=LiftOff!
#id=1 countDown=LiftOff!
  • 定义线程类
package com.duoxiancheng;

public class Main {

    public static void main(String[] args) {

        for (int i=0; i<2; i++) {
          new LiftOff().start();
        }
    }
}

class LiftOff extends Thread{
    private int countDown=3;
    private static int taskCount=0;
    private final int id=taskCount++;

    @Override
    public void run() {
        while (countDown-->0) {
            System.out.println("#>));
            Thread.yield();
        }
    }
}

4、线程池

待续...

5、从任务中产生返回值---Callable<T>接口

package com.duoxiancheng;

import java.util.concurrent.*;

public class Main1 {
    public static void main(String[] args) throws Exception{

        ExecutorService executorService= Executors.newCachedThreadPool();
        for (int i=0;i<50;i++){
            Future future = executorService.submit(new TaskWithResult(i));
            Object result = future.get();
            System.out.println(result);
        }
    }
}
class TaskWithResult implements Callable{
    private int id=0;
    public TaskWithResult(int id){
        this.id=id;
    }
    @Override
    public Object call() throws Exception {
        return id;
    }
}

其中 future.get()是阻塞的方法;如果想想立即阻塞任务的等待,则可以使用 result = exec.submit(aCallable).get(); 形式

6、常用方法

休眠---sleep()

让步---yield()

加入一个线程---join()

优先级--setPriority()/getPriority()

后台线程--setDaemon()/isDaemon()

...

7、捕获线程中的异常

线程中抛出异常,会传播到控制台,除非采用特殊手段。

public interface Runnable {
    public abstract void run();
}
  • 在run()方法内部try-catch-finally捕获异常
  • 使用异常处理器捕获异常--异常处理器实现Thread.UncaughtExceptionHandler接口

以下分析自定义异常处理器:

为线程设置异常处理器。具体做法可以是以下几种:
(1)Thread.setUncaughtExceptionHandler设置当前线程的异常处理器;
(2)Thread.setDefaultUncaughtExceptionHandler为整个程序设置默认的异常处理器;
如果当前线程有异常处理器(默认没有),则优先使用该UncaughtExceptionHandler类;否则,如果当前线程所属的线程组有异常处理器,则使用线程组的
UncaughtExceptionHandler;否则,使用全局默认的DefaultUncaughtExceptionHandler;如果都没有的话,子线程就会退出
package com.duoxiancheng;

import java.util.concurrent.ThreadFactory;

public class Main2 {

    public static void main(String[] args) {

        Thread thread=new Thread(new ExceptionThread());
        thread.setUncaughtExceptionHandler(new MyExceptionHandler());
        thread.start();
    }
}

/**
 * 任务类
 */
class ExceptionThread implements Runnable{

    @Override
    public void run() {
        Thread t = Thread.currentThread();
        System.out.println("ExceptionThread 当前线程信息:"+t.toString());
        System.out.println("当前线程ExceptionThread的异常处理器"
                +t.getUncaughtExceptionHandler());
        throw new RuntimeException();
    }
}

/**
 * 线程异常处理器
 */
class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.println("抛出的异常是:"+e);
    }
}

8、共享资源

  • 共享资源竞争:

  导致线程安全问题

  • 解决思想:

  多人(线程)都希望单独使用浴室(共享资源)。为了使用浴室,一个人先敲门,看能不能使用。如果没人回话,他就进入浴室并锁上门(获得锁)。这时候,其它人想使用浴室的话,就会被阻挡在外面(不能获取锁),直到浴室可以使用。浴室外面的人没有排队,浴室门打开(前一个人释放锁),离门最近的人优先进入使用(获得锁,设置优先级和yield方法可以建议某个优先使用)。

  • 解决方式:

  Synchronized 、Lock锁同步以及Voliate修饰符和原子类

  线程本地存储---ThreadLocal

9、线程之间协作

  • 生产者与消费者
package com.duoxiancheng;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 *  生产者-消费者
 */
public class Main3 {

    public static void main(String[ ] args) throws InterruptedException {
        Restaurant restaurant=new Restaurant();
        new Thread(new ProductorThread(restaurant)).start();
        Thread.sleep(20);
        new Thread(new ConsumerThread(restaurant)).start();
    }
}
class Restaurant {

    Lock lock=new ReentrantLock();//锁
    Condition condition1=lock.newCondition();//条件1
    Condition condition2=lock.newCondition();//条件2

    private int count;//已做好的餐

    private int count2;

    /**
     * 消费者方法
     */
    public void comsumer(){
        lock.lock();
        try {
            if (count==0) {
                System.out.println(Thread.currentThread().getId()+"客户 想要一份快餐!");
                condition2.signalAll();
                System.out.println(Thread.currentThread().getId()+"客户 等待一份快餐!");
                condition1.await();
            }
            count--;
            System.out.println(Thread.currentThread().getId()+ "客户 消费了一份快餐!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    /**
     * 生产者方法
     */
    public void productor(){
        lock.lock();
        try {
            condition2.await();
            count++;//生产一份快餐
            System.out.println(Thread.currentThread().getId()+ "厨师 制作了一份快餐!");
            condition1.signalAll();
            System.out.println(Thread.currentThread().getId()+"厨师 通知客户使用");
        }catch (InterruptedException e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

/**
 * 消费者
 */
class ConsumerThread implements Runnable{

    private Restaurant restaurant;

    public ConsumerThread(Restaurant restaurant){
        this.restaurant=restaurant;
    }

    @Override
    public void run() {
        restaurant.comsumer();
    }
}

/**
 * 生产者
 */
class ProductorThread implements Runnable{

    private Restaurant restaurant;

    public ProductorThread(Restaurant restaurant){
        this.restaurant=restaurant;
    }

    @Override
    public void run() {
        restaurant.productor();
    }
}

输出结果:

11客户 想要一份快餐!
11客户 等待一份快餐!
10厨师 制作了一份快餐!
10厨师 通知客户使用
11客户 消费了一份快餐!
  • 生产者与消费者 和 队列

  使用wait()、notifyAll() 是一种解决任务互操作问题非常低级的方式。使用同步队列来解决任务协作问题,同步队列在任何时刻只允许一个任务插入或移除。

java.util.concurrent.BlockingQueue接口提供了这个同步队列,其有大量的实现。通常可以使用LinkedBlockingDeque(无界队列) 和 ArrayBlockingDeque(固定尺寸队列)。

消费者任务试图从队列中获取对象,而该队列此时为空,那这些队列还可以挂起消费者任务(阻塞);当有更多元素可用时恢复消费者任务。阻塞队列可以解决非常大量的问题。

package com.duoxiancheng;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * 生产者-消费者与队列
 */
public class Main4 {

    public static void main(String[] args) {

        ExecutorService executorService= Executors.newCachedThreadPool();

        LinkedBlockingDeque<Toast> toastQueue=new LinkedBlockingDeque<>();
        LinkedBlockingDeque<Toast> butteredQueue=new LinkedBlockingDeque<>();
        LinkedBlockingDeque<Toast> jammedQueue=new LinkedBlockingDeque<>();

        executorService.execute(new Toaster(toastQueue));
        executorService.execute(new Butterer(toastQueue, butteredQueue));
        executorService.execute(new Jammed(butteredQueue, jammedQueue));
        executorService.execute(new Eater(jammedQueue));

    }
}

class Toast{

    private Status status=Status.DRY;
    private final int id;
    public Toast(int id) {
        this.id = id;
    }
    public int getId() {
        return id;
    }
    public Status getStatus() {
        return status;
    }
    public void addButtered(){
        status=Status.BUTTERED;
    }
    public void addJammed(){
        status=Status.JAMMED;
    }
    @Override
    public String toString() {
        return "Toast "+id+" : "+status;
    }

    /**
     * 枚举类型
     */
    public enum Status{
        DRY,BUTTERED,JAMMED
    }
}

/**
 * 制作吐司
 */
class Toaster implements Runnable{
    private LinkedBlockingDeque<Toast> toastQueue;
    private int count;
    public Toaster(LinkedBlockingDeque toastQueue){
        this.toastQueue=toastQueue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                Toast toast = new Toast(count++);
                System.out.println(toast);
                toastQueue.put(toast);
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

/**
 * 添加黄油
 */
class Butterer implements Runnable{
    private LinkedBlockingDeque<Toast> toastQueue;
    private LinkedBlockingDeque<Toast> butteredQueue;
    public Butterer(LinkedBlockingDeque toastQueue,LinkedBlockingDeque butteredQueue){
        this.toastQueue=toastQueue;
        this.butteredQueue=butteredQueue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                Toast toast = toastQueue.take();
                toast.addButtered();
                System.out.println("添加黄油, " + toast);
                butteredQueue.put(toast);
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

/**
 * 添加果酱
 */
class Jammed implements Runnable{
    private LinkedBlockingDeque<Toast> butteredQueue;
    private LinkedBlockingDeque<Toast> jammedQueue;
    public Jammed(LinkedBlockingDeque butteredQueue, LinkedBlockingDeque jammedQueue){
        this.butteredQueue=butteredQueue;
        this.jammedQueue=jammedQueue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                Toast toast = butteredQueue.take();
                toast.addJammed();
                System.out.println("添加果酱, " + toast);
                jammedQueue.put(toast);
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

/**
 * 消费吐司
 */
class Eater implements Runnable{
    private LinkedBlockingDeque<Toast> jammedQueue;
    public Eater(LinkedBlockingDeque jammedQueue){
        this.jammedQueue=jammedQueue;
    }
    private int counter;
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                Toast toast = jammedQueue.take();
                if (toast.getStatus() != Toast.Status.JAMMED) {
                    System.out.println("=====ERROR=====");
                } else {
                    System.out.println("消费了一个吐司");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果:

Toast 0 : DRY
添加黄油, Toast 0 : BUTTERED
添加果酱, Toast 0 : JAMMED
消费了一个吐司
Toast 1 : DRY
添加黄油, Toast 1 : BUTTERED
添加果酱, Toast 1 : JAMMED
消费了一个吐司
Toast 2 : DRY
添加黄油, Toast 2 : BUTTERED
添加果酱, Toast 2 : JAMMED
消费了一个吐司
Toast 3 : DRY
添加黄油, Toast 3 : BUTTERED
添加果酱, Toast 3 : JAMMED
消费了一个吐司
Toast 4 : DRY
添加黄油, Toast 4 : BUTTERED
添加果酱, Toast 4 : JAMMED
消费了一个吐司

10、死锁

某个线程在等待另一个线程释放锁,而后者又在等别的线程的所,这样一直下去,直到整个链条上的线程又在等第一个线程释放锁。这样线程之间的相互等待的连续循环,导致没有哪个线程能继续执行的现象。

  • 死锁四大条件:

请求与保持

不可剥夺

互斥条件

循环等待

  • 防止死锁:

发生死锁必须全部满足四大条件,要避免死锁,只需破坏其中一个即可。