thinking in java笔记 21 并发

***并发的目的

1 通常是提高运行在单处理器上的程序的性能。(因为阻塞的存在)

2 在单CPU机器上使用多任务的程序在任意时刻只在执行一项工作,因此,可以不用并发。并发的好处是:程序设计可以极大的简化。某些类型的问题,如仿真,没有并发很难解决。

**困难

最基本的困难在于协调不同线程驱动的任务之间对这些资源的使用,以使得这些资源不会同时被多个任务访问。

***阻塞

如果程序中的某个任务因为改程序控制范围之外的某些条件而导致不能继续执行,则这个任务或现场就阻塞了。如果没有并发,整个程序都会停下来,直至外部条件发生变化。但如果使用并发来编写程序,那么当一个任务阻塞时,程序中的其他任务还可以继续执行。

***进程

实现并发最直接的方式是在操作系统级别使用进程。进程是运行在它自己的地址空间内的自包容的程序。操作系统通常会降进程互相隔离开,因此他们不会彼此干涉,使得用进程编程会容易些。

***基本的线程机制

并发编程使得我们可以将程序划分为多个分离的,独立运行的任务,通过使用多线程机制,这些独立任务中的每一个都将由执行线程来驱动,一个线程就是在进程中的一个单一的程序控制流。

线程模型简化了在单一程序中同时交织在一起的多个操作的处理,在使用线程时,CPU将轮流给每个任务分配其占用时间。线程的一大好处是可以从CPU层次抽身出来,即代码不必知道它是运行在一个还是多个CPU的机器上,线程机制是一种建立透明的,可扩展的程序的方法.

***定义任务

1 无返回值 class Run implements Runnable{

Run a=new Run();

a.run();

直接调用run方法并不产生内在的线程能力。必须显式地将一个任务附着到线程上。

2 有返回值 class Call implements Callable<String>{

<>内为返回值类型,必须实现call(),调用方法如下: submit产生Future对象。可以调用isDone()查看Future是否已经完成,当任务完成时,可以使用get()

        ExecutorService service=Executors.newCachedThreadPool();
for(int i=0;i<3;i++){
Future<String> s=service.submit(new Call());
try {
System.out.println(s.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

在任务的内部,使用Thread. currentThread ()获得对于驱动该对象的Thread对象的引用。

可直接继承自Thread类。

    class KK extends Thread{
KK(){
start();
}
public void run(){
System.out.println("xx");
}
}

构造函数中调用start()存在一个问题,另一个任务可能会在构造器结束之前开始执行,意味着该任务能够访问处于不稳定状态的对象,应优选Executor而不是显式地创建Thread类。

***Thread类

Thread.yield() 对线程调度器的让步建议,已经执行完重要部分,让步给其他任务执行。在多任务之间产生分布良好的处理机制,但不能依赖它。

Thread t1=new Thread(new LiftOff());

t1.start();

启动了另外一个线程,两边同时执行。

***Executor类

在客户端和任务执行之间提供了一个间接层,与客户端直接执行任务不同,这个中介对象将执行任务。它允许你管理异步任务的执行,而无需显式地管理线程的生命周期。是优选方法。常见的情况是,单个的Executor被用来创建和管理系统中所有的任务。

对shutDown()的调用可以防止新任务被提交给这个Executor。

1

 ExecutorService service=Executors. newCachedThreadPool ();
for(int i=0;i<3;i++){
service.execute(new LiftOff());
}

Creates a thread pool that creates new threads as needed, but

* will reuse previously constructed threads when they are

* available. These pools will typically improve the performance

* of programs that execute many short-lived asynchronous tasks.

程序执行过程中创建与所需数量相同的线程,然后在回收旧线程时停止创建新线程,是合理的Executors的首选。当这种方式出现问题时,再切换至FixedThreadPool。

2 Executors. newFixedThreadPool (3); 使用有限的线程集来执行所提交的任务。 FixedThreadPool可以一次性预先执行代价高昂的线程分配,可以限制线程数量。这可以节省时间,不用为每个任务都固定地付出创建线程的开销。在事件驱动的系统中,可以使用它创建线程池,需要线程的事件处理器直接从池中获取线程。

3 newSingleThreadExecutor是线程数量为1的FixedThreadPool。若希望在另一个线程中连续运行一个任务,则可用它,如监听进入的套接字连接的任务。对于希望在线程中运行的短任务也方便,如更新本地或远程日志的小任务,或事件分发线程。

如果向它提交了多个任务,它们将排队,每个任务在下一个任务开始之前运行结束,newSingleThreadExecutor会序列化所有提交给它的任务,然后维护自己的悬挂任务队列。

4 public class DaemonThreadFactory implements ThreadFactory {

ExecutorService service=Executors. newCachedThreadPool ( new DaemonThreadFactory());

Executors的静态创建方法接受一个ThreadFactory 对象。

***sleep

TimeUnit. SECONDS .sleep(time); 使任务中止执行相应的时间.可以跑出InterruptedException异常,它在run中被捕获,因为异常不能跨线程传播回main(),所以必须在本地处理所有在任务内部产生的异常。

控制线程的执行顺序,不能依靠sleep来进行,因为不同os底层的线程机制有差异,最好使用同步控制或者不是与偶那个线程,编写自己的协作例程,然后这些例程按照指定的顺序互相传递控制权。

***优先级

线程的优先级将该线程的重要性传递给调度器,调度器倾向于优先权高的线程先执行。优先级较低的程序也会执行,只是执行频率较低。多数情况下,不应该操纵线程优先级。

***后台线程(daemon)

指在程序运行时在后台提供一种通用服务的线程,且此线程并非不可或缺。因此,当所有非后台线程结束时,程序终止并杀死所有后台线程。在线程启动之前设置为后台线程。

Thread daemon = new Thread(new SimpleDaemons());

daemon.setDaemon(true); // Must call before start()

daemon.start();

可用isDaemon()判断一个线程是否为后台线程,后台线程创建的任何线程都将被自动设置为后台线程。

后台进程在不执行finally子句的情况下就会终止其run方法。最后一个非后台线程终止时,后台线程会突然终止。非后台的Executor是一种更好的方式,其控制的所有任务可以同时被关闭。

***join() interrupt()

一个线程可以在其他线程上调用join(),效果是等待一段时间知道第二个线程结束才继续执行。join(time)表示如果目标线程在这段时间到期仍未结束的话,join方法自动结束。join()的调用可用interrupt()打断。

***捕获异常

run方法中抛出的异常,若不做处理会传播到方法外部,而在main中对Executor使用try catch是无法捕获异常的。

为了捕获异常,需要修改Executor产生线程的方式。如下:

    class KK implements Runnable{
public void run() {
throw new RuntimeException();
}
}
class myUnCaughtExceptionHandler implements UncaughtExceptionHandler{
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught:"+e);
}
}
class UnCaughtThreadFactory implements ThreadFactory{
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setUncaughtExceptionHandler(new myUnCaughtExceptionHandler());
return t;
}
}
public class Test1 {
public static void main(String[] args) {
ExecutorService s=Executors.newCachedThreadPool(new UnCaughtThreadFactory());
s.execute(new KK());
}
}

***共享受限资源

对于并发工作,需要某种方式来防止两个任务访问相同的资源。解决方法是在资源被一个任务使用时,在其上加锁。基本所有并发模式解决线程冲突问题时,都采用序列化访问共享资源的方案。锁语句产生互斥的效果,被称为互斥量(mutex).

何时使用同步:如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么必须使用同步。并且,读写线程都必须用相同的监视器锁同步。

每个访问临界共享资源的方法都必须被同步,否则不会正确的进行工作。

***synchronized

synchronized为防止资源冲突提供了内置支持。当任务要执行被synchronized保护的代码段时,它将检查锁是否可用,然后获取锁,执行代码,释放锁。

共享资源一般是以对象形式存在的内存片段,也可以是文件,输入输出端口,打印机等。要控制对共享资源的访问,首先将其包装进一个对象。然后把所有要访问这个资源的方法标记为synchronized。若某任务处于一个synchronized方法的调用中,则在这个线程从该方法返回之前,其他调用类中synchronized方法的线程都会被阻塞。

使用并发时,将域设置为private很重要,否则synchronized不能防止其他任务直接访问域,会产生冲突。

一个任务可以获得多个对象的锁,然后对其计数。

synchronized static方法可以在类的范围内防止对static数据的访问。

***Lock

使用显示的lock对象:

Lock对象必须被显式地创建锁定和释放,因此它没有内建的锁优雅。但解决一些问题更为灵活。

private Lock lock = new ReentrantLock();
public int next() {
lock.lock();
try {
++currentEvenValue;
Thread.yield(); // Cause failure faster
++currentEvenValue;
return currentEvenValue;
} finally {
lock.unlock();
}
}

unlock必须放在finally中。try中必须有return语句,以确保unlock不会过早发生。使用synchronized时,如果失败则会抛出异常, 没有机会做清理工作。使用lock,可以在finally中维护。应尽可能使用synchronized ,必要时使用lock,如尝试获取锁但失败,或尝试获取锁一段时间,然后放弃它。 tryLock() tryLock(time,timeUnit)尝试锁定,成功后返回true。若未获取锁,则可以暂时离开去做其他事情。显示的Lock在加锁和释放锁方面有更细粒度的控制力。这对于实现专有同步结构很有用,例如用于遍历链接列表中的节点的节节传递的加锁机制,这种遍历代码必须在释放当前节点的锁之前捕获下一节点的锁。

***原子性与易变性

不应该使用原子性来代替同步。

原子性可以应用于除double long之外的所有基本类型之上的简单操作。JVM将64位(double long)的读取和写入当做两个分离的32位操作来执行,这样会在读写操作中发生上下文切换,导致不同的任务可以看到不正确结果的可能性(字撕裂)。定义long double时,使用volatile会获得原子性。因此,原子操作可有线程机制来保证其不可中断,超级高手可以利用这点编写无锁的代码,这些代码不需要被同步。

volatile确保应用中的可视性。如果将一个域声明成volatile,只要产生写操作,其他读操作都可以看到修改。如果多个任务同时访问一个域,则应该是volatile的。否则,这个域就只能通过同步来访问,同步也会导致向主存中刷新,因此如果一个域完全由synchronized方法或语句块来防护,则不必设成volatile。当一个域的值依赖于它之前的值时(如计数器递增),volatile无法工作。如果某个域的值受到其他域的值的限制,volatile也无法工作。使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。

若将一个域定义为volatile,就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。

***临界区

synchronized(syncObject){ }

又称同步控制块,防止多个线程同时访问代码段。进入此段代码前,必须得到syncObject对象的锁。如果其他线程得到,则必须等待其释放。同步控制块比对整个方法进行同步控制性能要高。

***在其他对象上同步

synchronized块必须给定一个在其上进行同步的对象,最好是使用其方法正在被调用的当前对象。synchronized(this).如果获得了synchronized块上的锁,则该对象其他的synchronized方法和临界区就不能被调用。因此,在this上同步,临界区的效果就会直接缩小在同步的范围内。

有时必须在另一个对象上同步,但必须确保所有相关的任务都是在同一个对象上同步的。两个任务可以同时进入同一个对象,只要这个对象上的方法是在不同的锁上同步。

***线程本地存储

防止任务在共享资源上冲突的第二种方法是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。使状态与线程关联起来。

ThreadLocal对象通常当做静态域存储,只能通过get set来访问该对象的内容。get返回与其线程相关联的对象的副本。set将参数插入到为其线程存储的对象中,并返回存储中原有的对象。

***线程状态

1 新建(new) 线程创建时,短暂处于此状态,此时它已经分配了必需的系统资源,并执行了初始化。此刻线程有资格获得CPU时间,之后调度器将它转变为可运行状态或阻塞状态。

2 就绪(Runnable) 此状态下,若调度器将时间片分配给线程,线程就可以运行。

3 阻塞(Blocked) 线程能够运行,但有某个条件阻止他的运行。此状态下,调度器将忽略线程,不会分配给线程任何CPU时间,直到线程重新进入了就绪状态,它才能执行操作。

4 死亡(Dead) 处于死亡或终止状态的线程不再可调度,也不会得到CPU时间,它的任务已结束,不再可运行,任务死亡的方式通常是从run()中返回。

***终结任务

ExecutorService shutDown() 关闭线程

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.

ExecutorService service.awaitTermination(20,TimeUnit.MILLISECONDS)

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

进入阻塞状态原因:

1 sleep()

2 wait()使线程挂起

3 任务在等待某个输入/输出完成

4 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用。

在阻塞时终结:

sleep()有时会使任务从执行状态变为被阻塞状态,有时必须终止被阻塞的任务。要终止处于阻塞状态的任务,必须强制这个任务跳出阻塞状态。

中断:

在任务的run()中间打断,会抛出异常,必须仔细考虑代码的执行路径,编写catch正确清除所有事物,回到良好的状态。

ExecutorService shutDownNow()

它将发送一个interrupt()给它启动的所有线程。当完成工程中的某个部分时,关闭某个 Executor的所有任务。

调用Sumit(),使用Future<?> cacel()中断由Executor启动的单个线程。

能够中断对sleep()的调用,但不能中断试图获取synchronized锁或者试图执行I/O操作的线程(意味着多线程程序有可能会被IO操作锁住),可以采取关闭阻塞任务对应的底层资源来解决。

一个任务应该能够调用在同一个对象中的其他synchronized方法。

无论何时,只要任务以不可中断的方式被阻塞,就有潜在的锁住程序的可能。而ReentrantLock上阻塞的任务具备可以被中断的能力。与I/O调用不同,Thread interrupt()可以打断被互斥所阻塞的调用。被设计用来响应interrupt()的类都必须建立一种策略,确保保持一致的状态。应在所有需要清理的对象创建操作的后面,都必须紧跟try-finally子句,从而使得无论run循环如何退出,清理都会发生。

***线程之间的协作

任务协作时,关键是任务之间的握手。使用互斥。互斥能够确保只有一个任务能响应某个信号,这样就可以根除其他的竞争条件。在互斥之上,有一种新途径,任务可以将自身挂起,直至某些外部条件发生变化。

wait()使你可以等待某个条件发生变化,而改变这个条件超出了当前方法的控制。wait会将任务挂起等待,直到notify() notifyAll()出现时,任务才被唤醒。wait()提供了一种在任务之间对活动同步的方式。sleep()和yield()调用时不释放锁,wait()会释放锁。

wait与sleep区别:

1 wait期间锁是释放的

2 可以通过notify() notifyAll() 或者令时间到期,从wait()中恢复执行。

wait() notify() notifyAll() 是基类Object的一部分,因为这些方法操作的锁也是所有对象的一部分,因此可以把wait()放进任何同步控制方法里。

notify() notifyAll()

notify()只会唤醒众多等待锁的任务中的一个,所有任务必须等待相同的条件,否则不知道是否唤醒了恰当的任务。当条件发生变化时,必须只有一个任务能够从中受益。这些限制对所有可能存在的子类都必须是起作用的。如果上述任一不满足,应使用 notifyAll()。 notifyAll()因某个特定锁被调用时,只有等待这个锁的任务才会被唤醒。

***生产者-消费者与队列

wait() notify()每次交互时都握手,较为低级。可采用同步队列来解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素,通常可以使用LinkedBlockingQueue,它是无界队列。还可以用ArrayBlockingQueue,它又固定的尺寸,可以在它被阻塞之前,向其中放置有限数量的元素。当有消费者任务试图从空队列中获取对象,队列可以挂起消费者任务,当有更多元素时再恢复。阻塞队列能解决很多问题,切比wait() notify()更简单可靠。

***任务间使用管道进行输入、输出

通过输入、输出在线程间进行通信很有用,线程类库以管道的形式提供了支持。他们在java输入输出类库中的对应物是PipedWriter PipedReader类。管道基本上是一个阻塞队列,存在于BlockingQueue之前的java版本中。

***死锁

任务可以变成阻塞状态,所以可能出现:一个任务在等待另一个任务,另一个等待其他任务,其他任务在等待第一个任务,形成循环,无法继续。死锁的四个条件:

1 互斥条件,任务使用的资源中至少有一个不能共享。

2 至少有一个任务它必须持有一个资源且在等待 获取一个当前被别的任务持有的资源。

3 资源不能被任务抢占,任务必须把资源释放当做普通事件。

4 必须有循环等待。

***CountDownLatch

CountDownLatch如其所写,是一个倒计数的锁存器,当计数减至0时触发特定的事件。利用这种特性,可以让主线程等待子线程的结束。

*** CyclicBarrier

在实际应用中,有时候需要多个线程同时工作以完成同一件事情,而且在完成过程中,往往会等待其他线程都完成某一阶段后再执行,等所有线程都到达某一个阶段后再统一执行。

比如有几个旅行团需要途经深圳、广州、韶关、长沙最后到达武汉。旅行团中有自驾游的,有徒步的,有乘坐旅游大巴的;这些旅行团同时出发,并且每到一个目的地,都要等待其他旅行团到达此地后再同时出发,直到都到达终点站武汉。

这时候CyclicBarrier就可以派上用场。CyclicBarrier最重要的属性就是参与者个数,另外最要方法是await()。当所有线程都调用了await()后,就表示这些线程都可以继续执行,否则就会等待。

***DelayQueue

是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。

Delayed

一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

***PriorityBlockingQueue

一个无界的阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞检索的操作。

我们只需要把放入该队列的对象实现Comparable接口就可以轻松实现线程优先级调度了。

***ScheduledThreadPoolExecutor

可定期或延期执行命令,在多线程环境下优于Timer

***Semaphore

正常的锁(lock synchronized)在任何时刻只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。还可以将信号量看作是在向外分发使用资源的许可证,尽管实际上没有使用任何许可证对象。可以使用它来实现对象池等。

***Exchanger

可以用来完成线程间的数据交换。

类java.util.concurrent.Exchanger提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据,并返回。当两个线程通过Exchanger交换了对象,这个交换对于两个线程来说都是安全的。

***免锁容器

背后的通用策略:对容器的读写可以同时发生,只要读取者只能看到完成修改的结果即可。

CopyOnWriteArrayList:写入将导致创建整个底层数组的副本,源数组将保留在原地,是的赋值的数组在被修改时,读取操作可以安全的执行。当修改完成后,一个原子性的操作将把新的数组换入,使读方法可以看到修改。其好处之一是当多个迭代器同时遍历及修改这个列表时,不会抛出ConcurrentModificationExecption.

CopyOnWriteArraySet使用CopyOnWriteArrayList来实现其免锁行为。

ConcurrentHashmap ConcurrentLinkedQueue使用类似技术,允许并发的读写操作,但容器中只有部分内容可以被复制和修改,修改完成前不可视。