转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/27250059
一般情况下,我们使用Runnable作为基本的任务表示形式,但是Runnable是一种有很大局限的抽象,run方法中只能记录日志,打印,或者把数据汇总入某个容器(一方面内存消耗大,另一方面需要控制同步,效率很大的限制),总之不能返回执行的结果;比如同时1000个任务去网络上抓取数据,然后将抓取到的数据进行处理(处理方式不定),我觉得最好的方式就是提供回调接口,把处理的方式最为回调传进去;但是现在我们有了更好的方式实现:CompletionService + Callable
Callable的call方法可以返回执行的结果;
CompletionService将Executor(线程池)和BlockingQueue(阻塞队列)结合在一起,同时使用Callable作为任务的基本单元,整个过程就是生产者不断把Callable任务放入阻塞对了,Executor作为消费者不断把任务取出来执行,并返回结果;
优势:
a、阻塞队列防止了内存中排队等待的任务过多,造成内存溢出(毕竟一般生产者速度比较快,比如爬虫准备好网址和规则,就去执行了,执行起来(消费者)还是比较慢的)
b、CompletionService可以实现,哪个任务先执行完成就返回,而不是按顺序返回,这样可以极大的提升效率;
1、CompletionService : Executor +BlockingQueue
下面看个例子:
package com.zhy.concurrency.completionService;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 将Executor和BlockingQueue功能融合在一起,可以将Callable的任务提交给它来执行, 然后使用take()方法获得已经完成的结果
*
* @author zhy
*
*/
public class CompletionServiceDemo
{
public static void main(String[] args) throws InterruptedException,
ExecutionException
{
/**
* 内部维护11个线程的线程池
*/
ExecutorService exec = Executors.newFixedThreadPool(11);
/**
* 容量为10的阻塞队列
*/
final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(
10);
//实例化CompletionService
final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
exec, queue);
/**
* 模拟瞬间产生10个任务,且每个任务执行时间不一致
*/
for (int i = 0; i < 10; i++)
{
completionService.submit(new Callable<Integer>()
{
@Override
public Integer call() throws Exception
{
int ran = new Random().nextInt(1000);
Thread.sleep(ran);
System.out.println(Thread.currentThread().getName()
+ " 休息了 " + ran);
return ran;
}
});
}
/**
* 立即输出结果
*/
for (int i = 0; i < 10; i++)
{
try
{
//谁最先执行完成,直接返回
Future<Integer> f = completionService.take();
System.out.println(f.get());
} catch (InterruptedException e)
{
e.printStackTrace();
} catch (ExecutionException e)
{
e.printStackTrace();
}
}
exec.shutdown();
}
}
输出结果:
pool-1-thread-4 休息了 52
52
pool-1-thread-1 休息了 59
59
pool-1-thread-10 休息了 215
215
pool-1-thread-9 休息了 352
352
pool-1-thread-5 休息了 389
389
pool-1-thread-3 休息了 589
589
pool-1-thread-2 休息了 794
794
pool-1-thread-7 休息了 805
805
pool-1-thread-6 休息了 909
909
pool-1-thread-8 休息了 987
987
最先执行完成的直接返回,并不需要按任务提交的顺序执行,如果需要写个高并发的程序,且每个任务需要返回执行结果,这是个相当不错的选择!
2、ExecutorService.invokeAll
ExecutorService的invokeAll方法也能批量执行任务,并批量返回结果,但是呢,有个我觉得很致命的缺点,必须等待所有的任务执行完成后统一返回,一方面内存持有的时间长;另一方面响应性也有一定的影响,毕竟大家都喜欢看看刷刷的执行结果输出,而不是苦苦的等待;
下面看个例子:
package com.zhy.concurrency.executors;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestInvokeAll
{
public static void main(String[] args) throws InterruptedException,
ExecutionException
{
ExecutorService exec = Executors.newFixedThreadPool(10);
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
Callable<Integer> task = null;
for (int i = 0; i < 10; i++)
{
task = new Callable<Integer>()
{
@Override
public Integer call() throws Exception
{
int ran = new Random().nextInt(1000);
Thread.sleep(ran);
System.out.println(Thread.currentThread().getName()+" 休息了 " + ran );
return ran;
}
};
tasks.add(task);
}
long s = System.currentTimeMillis();
List<Future<Integer>> results = exec.invokeAll(tasks);
System.out.println("执行任务消耗了 :" + (System.currentTimeMillis() - s) +"毫秒");
for (int i = 0; i < results.size(); i++)
{
try
{
System.out.println(results.get(i).get());
} catch (Exception e)
{
e.printStackTrace();
}
}
exec.shutdown();
}
}
执行结果:
pool-1-thread-10 休息了 1
pool-1-thread-5 休息了 59
pool-1-thread-6 休息了 128
pool-1-thread-1 休息了 146
pool-1-thread-3 休息了 158
pool-1-thread-7 休息了 387
pool-1-thread-9 休息了 486
pool-1-thread-8 休息了 606
pool-1-thread-4 休息了 707
pool-1-thread-2 休息了 817
执行任务消耗了 :819毫秒
146
817
158
707
59
128
387
606
486
1
我特意在任务提交完成打印了一个时间,然后invokeAll执行完成后打印了下时间,可以看出invokeAll返回是等待所有线程执行完毕的。这点来说,我觉得可用性不如CompletionService。
嗯,对于批量执行任务,且携带返回结果的案例就到这里~如果有疑问或者代码中存在错误请指出~
分享到:
相关推荐
NULL 博文链接:https://songjianyong.iteye.com/blog/2056990
先来看第一种实现方式,假设任务 A 由于参数原因,执行时间相对任务 B,C,D 都要长很多,但是按照程序的执行顺序,程序在 get() 任务 A 的执行结果会阻塞在那里,导致任务 B,C,D 的后续任务没办法执行。又因为每个...
Executor: 一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
16. 执行器服务 ExecutorService 17. 线程池执行者 ThreadPoolExecutor 18. 定时执行者服务 ScheduledExecutorService 19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 ...
Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。...它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。``
接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。壹個 ExecutorService 实例因此特别像壹個线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现...
ExecutorService的execute和submit方法
Java并发编程基础主要包括以下几个核心方面: 线程与线程状态:理解Java中线程的基本概念,包括线程的创建、启动、暂停、恢复和终止。熟悉线程的生命周期及其不同状态,如新建、就绪、运行、阻塞和死亡。 线程同步...
如何识别可并行执行的任务,如何提高单线程子系统的响应性,如何确保并发程序执行预期任务,如何提高并发代码的性能和可伸缩性等内容,最后介绍了一些高级主题,如显式锁、原子变量、非阻塞算法以及如何开发自定义的...
关闭时可使用如下代码 ... executorService.shutdown(); if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { //超时后直接关闭 executorService.shutdownNow(); } } catch (In
运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接
ExecutorService方法案例文件.zip
25 JAVA8 与元数据.................................................................................................................................25 2.4. 垃圾回收与算法 .................................
高并发编程第三阶段18讲 CountDownLatch经典案例讲解如何给离散平行任务增加逻辑层次关系-下_.mp4 高并发编程第三阶段19讲 CyclicBarrier工具的使用场景介绍_.mp4 高并发编程第三阶段20讲 CyclicBarrier vs ...
主要介绍了线程并发ScheduledExecutorService类,设置 ScheduledExecutorService ,2秒后,在 1 分钟内每 10 秒钟蜂鸣一次
Java具有简单性、面向对象、分布式、健壮性、安全性、平台独立与可移植性、多线程、动态性等特点 。Java可以编写桌面应用程序、Web应用...Callable是需要使用java.util.concurrent.ExecutorService.submit(Callable)方
使用Java的ExecutorService类创建了一个固定大小的线程池,并将任务安排到线程池中执行。 通过调用`scheduleTask()`方法,可以指定要执行的任务、延迟时间和延迟时间单位。 任务将在指定的延迟时间后被安排到线程池...