Java线程池如何优雅地等待所有任务执行完?

https://blog.csdn.net/flycp/article/details/106337294

https://blog.csdn.net/qq_25806863/article/details/71214033

随着项目的体量越来越大,对代码的执行效率要求越来越高,在实际应用过程中我们会经常使用线程池。

那么如果线程池嵌入在业务代码中,如何正确的等待线程池执行完,在执行后续操作呢?或者想要获取执行结果有应该怎么处理呢?

下面走一下场景:

package com.example.demo1.entity;

/**

* create by c-pown on 2019-12-06

*/

public class Student {

private String name;

private Integer age;

private Integer heigh;

private String hoby;

public Student(String name, Integer age, Integer heigh, String hoby) {

this.name = name;

this.age = age;

this.heigh = heigh;

this.hoby = hoby;

}

static String getAllname(){

return "张三";

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Integer getAge() {

return age;

}

public void setAge(Integer age) {

this.age = age;

}

public Integer getHeigh() {

return heigh;

}

public void setHeigh(Integer heigh) {

this.heigh = heigh;

}

public String getHoby() {

return hoby;

}

public void setHoby(String hoby) {

this.hoby = hoby;

}

@Override

public String toString() {

return "Student{" +

"name='" + name + '\'' +

", age=" + age +

", heigh=" + heigh +

", hoby='" + hoby + '\'' +

'}';

}

}

package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* create by c-pown on 2020-05-25

*/

public class TestThreadPool {

/**

* 手动创建线程池

*/

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,

TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {

Student student = null;

List<Student> students = new ArrayList<>();

//添加五十万个学生元素

for (int i = 0; i < 500000; i++) {

student = new Student("name"+i,20,183,"玩");

students.add(student);

}

for (Student student1 : students) {

/**

* 给元素添加后缀

*/

executor.submit(()-> student1.setName(student1.getName()+"这是后缀"));

}

//查看添加情况

System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());

}

}

我们给List里面添加500000个学生元素,然后使用线程池给name属性添加后缀,看一下执行结果:

添加数量:475371

我们发现添加成功的数量是少了两万多,这是由于线程池中的子线程任务没有执行完,而主线程已经开始执行业务代码,导致成功数量变少。

下面我们修改一下代码:

一、使用CountDownLatch

package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* create by c-pown on 2020-05-25

*/

public class TestThreadPool {

/**

* 手动创建线程池

*/

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,

TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {

Student student = null;

List<Student> students = new ArrayList<>();

//添加五十万个学生元素

for (int i = 0; i < 500000; i++) {

student = new Student("name"+i,20,183,"玩");

students.add(student);

}

CountDownLatch countDownLatch = new CountDownLatch(students.size());

for (Student student1 : students) {

/**

* 给元素添加后缀

*/

executor.submit(()-> {

try {

student1.setName(student1.getName()+"这是后缀");

} catch (Exception e) {

e.printStackTrace();

}finally {

countDownLatch.countDown();

}

});

}

try {

countDownLatch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

//查看添加情况

System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());

}

}

结果:

添加数量:500000

1

这是一个计数器操作,在线程池执行之前,给计数器指定数值(与要执行代码的次数一致)也就是students.size(),在线程池执行代码体里面要加上countDownLatch.countDown();代表每执行一次数值减少一,最后在循环体外边写上countDownLatch.await();代表等待计数器归零。

可以查看下源码介绍:

/**

* Decrements the count of the latch, releasing all waiting threads if

* the count reaches zero.

*

* <p>If the current count is greater than zero then it is decremented.

* If the new count is zero then all waiting threads are re-enabled for

* thread scheduling purposes.

*

* <p>If the current count equals zero then nothing happens.

*/

public void countDown() {

sync.releaseShared(1);

}

/**

* Causes the current thread to wait until the latch has counted down to

* zero, unless the thread is {@linkplain Thread#interrupt interrupted}.

*

* <p>If the current count is zero then this method returns immediately.

*

* <p>If the current count is greater than zero then the current

* thread becomes disabled for thread scheduling purposes and lies

* dormant until one of two things happen:

* <ul>

* <li>The count reaches zero due to invocations of the

* {@link #countDown} method; or

* <li>Some other thread {@linkplain Thread#interrupt interrupts}

* the current thread.

* </ul>

*

* <p>If the current thread:

* <ul>

* <li>has its interrupted status set on entry to this method; or

* <li>is {@linkplain Thread#interrupt interrupted} while waiting,

* </ul>

* then {@link InterruptedException} is thrown and the current thread's

* interrupted status is cleared.

*

* @throws InterruptedException if the current thread is interrupted

* while waiting

*/

public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}

介绍中写到等待计数器数量减少直至为0为止。也可以给await()设置超时时间

countDownLatch.await(300,TimeUnit.SECONDS);

1

如果超过300s(也可以是时,分)则不再等待,直接执行下面代码。

二、使用Future.get()

package com.example.demo1.service.TestThreadPool;

import com.example.demo1.entity.Student;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.*;

/**

* create by c-pown on 2020-05-25

*/

public class TestThreadPool {

/**

* 手动创建线程池

*/

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20,25,100L,

TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadPoolExecutor.CallerRunsPolicy());

public static void main(String[] args) {

Student student = null;

List<Student> students = new ArrayList<>();

//添加五十万个学生元素

for (int i = 0; i < 500000; i++) {

student = new Student("name"+i,20,183,"玩");

students.add(student);

}

List<Future> futures = new ArrayList<>();

for (Student student1 : students) {

/**

* 给元素添加后缀

*/

Future future = executor.submit(()-> {

try {

student1.setName(student1.getName()+"这是后缀");

} catch (Exception e) {

e.printStackTrace();

}

});

futures.add(future);

}

for (Future future : futures) {

try {

future.get();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

}

//查看添加情况

System.out.println("添加数量:"+students.stream().filter(x->x.getName().contains("这是后缀")).count());

}

}

结果:

添加数量:500000

1

Future.get()可以同步等待线程执行完成,并且可以监听执行结果

/**

* Waits if necessary for the computation to complete, and then

* retrieves its result.

*

* @return the computed result

* @throws CancellationException if the computation was cancelled

* @throws ExecutionException if the computation threw an

* exception

* @throws InterruptedException if the current thread was interrupted

* while waiting

*/

V get() throws InterruptedException, ExecutionException;

源码中可以看出方法是有返回值得,可以监听线程池子线程执行状态及执行结果。

直接return 结果 Future<?>添加泛型即可。

同样的 Future.get()也是可以指定超时时间的,超过等待时间可以直接执行后续代码。

最后 如果线程池是方法内部创建的,可以直接使用shutdown()也会等待线程池的执行结果。同时会关闭线程池资源。

executor.shutdown();

try {

executor.awaitTermination(300,TimeUnit.SECONDS);

} catch (InterruptedException e) {

e.printStackTrace();

}

—————————————————————————————————————

原文链接:https://blog.csdn.net/flycp/article/details/106337294