`
阿尔萨斯
  • 浏览: 4171130 次
社区版块
存档分类
最新评论

JAVA进阶-多线程(3)

 
阅读更多

1.以前使用线程API并没有返回结果,Callable/Future/FutureTask正是解决了此问题,并在调用过程中

作出对异常的捕获

-Callable执行call()方法返回Object对象,也可抛出异常;调用
Callable并不像Thread,而是调用<T> Future ExecutorService.submit(Callable<T> task);
-Future 返回值,调用该接口的get()方法,可返回对应的对象
------------------

SalesCalculateSample.java>通过多线程计算矩阵每行结果并叠加;

/**
 *
 * 	@author Lean  @date:2014-9-30  
 */
public class SalesCalculateSample {
	
	private static final int NUMBER_OF_MONTH=12;
	private static final int NUMBER_OF_CUSTOMER=100;
	private static int[][] cells;
	
	static class Summer implements Callable<Integer>{

		public int customerID;
		
		public Summer(int companyId){
			this.customerID=companyId;
		}
		
		@Override
		public Integer call() throws Exception {
			int sum=0;
			for (int i = 0; i < NUMBER_OF_MONTH; i++) {
				sum+=cells[customerID][i];
			}
			System.out.printf("customerID:%d ,sum:%d\n",customerID,sum);
			return sum;
		}
		
	}
	
	public static void main(String[] args) {
		generateMatrix();
		
		ExecutorService executor=Executors.newFixedThreadPool(10);
		Set<Future<Integer>> futures=new HashSet<Future<Integer>>();
		for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) {
			Callable<Integer> caller=new Summer(i);
			futures.add(executor.submit(caller));
		}
		// caculate the sum
		int sum=0;
		for (Future<Integer> future : futures) {
			try {
				sum+=future.get();
			} catch (Exception e) {
				e.printStackTrace();
			} 
		}
		System.out.println("sum is>>"+sum);
		executor.shutdown();
	}

	private static void generateMatrix() {
		cells=new int[NUMBER_OF_CUSTOMER][NUMBER_OF_MONTH];
		for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) {
			for (int j = 0; j < NUMBER_OF_MONTH; j++) {
				cells[i][j]=(int)(Math.random()*100);
			}
		}
	}
	
}

------------------


------------------

CanCancelProcessors>随机取消提交的订单(Future提供了可取消的结果执行)

/**
 *
 * 	@author Lean  @date:2014-10-7  
 */
public class CanCancelProcessors {

	private static ExecutorService service=Executors.newFixedThreadPool(100);
	private static final int ORDERS_COUNT=2000;
	private static ArrayList<Future<Integer>> futures=new ArrayList<Future<Integer>>();
	
	public static void main(String[] args) {
		for (int i = 0; i <ORDERS_COUNT; i++) {
			futures.add(service.submit(new OrderExcutor(i)));
		}
		new Thread(new EvilThread(futures)).start();
		try {
			service.awaitTermination(6,TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		int count=0;
		for (Future<Integer> future : futures) {
			if (future.isCancelled()) {
				count++;
			}
		}
		System.out.println("----------"+count+" orders canceled!---------");
		service.shutdown();
	}
	
	static class OrderExcutor implements Callable<Integer>{

		private int mId;
		
		public OrderExcutor(int id){
			this.mId=id;
		}
		
		@Override
		public Integer call() throws Exception {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
			}
			System.out.println("successfully execute orderid : "+mId);
			return mId;
		}
		
	}
	
	static class EvilThread implements Runnable{
		
		private ArrayList<Future<Integer>> futures;
		
		public EvilThread(ArrayList<Future<Integer>> futures){
			this.futures=futures;
		}
		
		@Override
		public void run() {
			for (int i = 0; i < ORDERS_COUNT; i++) {
				try {
					Thread.sleep(200);
					boolean flag=futures.get(i).cancel(true);
					System.out.println("cancel order >"+flag +" by id>> "+i);
				} catch (InterruptedException e) {
				}
			}
			
			
		}
		
	}
	
}
------------------

-FutureTask 集成了Runnable与Future接口的功能,因此拥有异步,返回数据的功能;
------------------

FutureTaskSample

/**
 *
 * 	@author Lean  @date:2014-9-30  
 */
public class FutureTaskSample {
	
	private static final int NUMBER_OF_MONTH=12;
	private static final int NUMBER_OF_CUSTOMER=100;
	private static int[][] cells;
	
	static class Summer implements Callable<Integer>{

		public int customerID;
		
		public Summer(int companyId){
			this.customerID=companyId;
		}
		
		@Override
		public Integer call() throws Exception {
			int sum=0;
			for (int i = 0; i < NUMBER_OF_MONTH; i++) {
				sum+=cells[customerID][i];
			}
			System.out.printf("customerID:%d ,sum:%d\n",customerID,sum);
			return sum;
		}
		
	}
	
	public static void main(String[] args) {
		generateMatrix();
		
		Set<FutureTask<Integer>> futures=new HashSet<FutureTask<Integer>>();
		for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) {
			Callable<Integer> caller=new Summer(i);
			FutureTask<Integer> futureTask=new FutureTask<Integer>(caller);
			futureTask.run();
			futures.add(futureTask);
		}
		// caculate the sum
		int sum=0;
		for (FutureTask<Integer> future : futures) {
			try {
				sum+=future.get();
			} catch (Exception e) {
				e.printStackTrace();
			} 
		}
		System.out.println("sum is>>"+sum);
	}

	private static void generateMatrix() {
		cells=new int[NUMBER_OF_CUSTOMER][NUMBER_OF_MONTH];
		for (int i = 0; i < NUMBER_OF_CUSTOMER; i++) {
			for (int j = 0; j < NUMBER_OF_MONTH; j++) {
				cells[i][j]=(int)(Math.random()*100);
			}
		}
	}
	
}
------------------
2.Executors
.newFixedThreadPool()创建固定线程池
.newCachedThreadPool()创建固定线程池,每个线程在60秒内不再创建
.newSingleThreadExecutor()创建单线程 该线程不会被销毁
.newScheduledExecutorService()创建可定时,延时执行的线程
------------------

ScheduledCountSample>定时,延时度取数字的例子

/**
 *		ScheduledExecutorServiceSample
 *
 * 	@author Lean  @date:2014-10-7  
 */
public class ScheduledCountSample {

	private static ScheduledExecutorService mService=Executors.newScheduledThreadPool(10);
	private static final int AVG=4;
	private static final int mAllCount=400;
	private static  int mCurrenCount=0;
	
	public static void main(String[] args) {
		//delay counting !
//		int times = mAllCount/AVG;
//		for (int i = 0; i < times; i++) {
//			mService.schedule(new tryCount(i), i*1,TimeUnit.SECONDS);
//		}
//		mService.scheduleAtFixedRate(new EveryCount(), 0, 1, TimeUnit.SECONDS);
		mService.scheduleWithFixedDelay(new EveryCount(), 0, 1, TimeUnit.SECONDS);
	}
	
	static class EveryCount implements Runnable{

		@Override
		public void run() {
			if (mAllCount>mCurrenCount) {
				System.out.println("ThreadId>>"+Thread.currentThread().getId()+" and count >>"+mCurrenCount++);
			}
		}
		
	}
	
	static class tryCount implements Callable<Integer>{
		
		private int Index;
		public tryCount(int index) {
			this.Index=index;
		}
		
		@Override
		public Integer call() throws Exception {
			for (int i = Index*AVG+0; i < (AVG+Index*AVG); i++) {
				System.out.println("count >>"+i);
			}
			System.out.println("thread count end! ");
			return Index;
		}
		
	}
	
}
------------------

-ExecutorCompletionService
通常获取结果用get(),该方法形成阻塞.而通过take()获取已经结束的任务的结果

------------------
GetResultRightNow

/**
 * 		获取非阻塞型结果
 *
 * 	@author Lean  @date:2014-10-7  
 */
public class GetResultRightNow {
	
	public static void main(String[] args) {
	
		int[] printNum={1000,200,200,30000,5000};
		ArrayList<Future<Integer>> fetures=new ArrayList<Future<Integer>>();
		ExecutorService executors = Executors.newFixedThreadPool(2);
		ExecutorCompletionService<Integer> service=new ExecutorCompletionService<Integer>(executors);
		for (int i : printNum) {
			fetures.add(service.submit(new getCurrnNum(i)));
		}
		try {
			executors.awaitTermination(5, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		for (int i = 0; i < printNum.length; i++) {
			int num=0;
			try {
				//take it when there has a result;
				num = service.take().get();
				//stop when there has no result no matter others having;
//				num=fetures.get(i).get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
			System.out.println("num is:"+num);
		}
		executors.shutdown();
	}
	
	static class getCurrnNum implements Callable<Integer>{

		private int printNum;
		
		public getCurrnNum(int i) {
			printNum=i;
		}

		@Override
		public Integer call() throws Exception {
			try {
			Thread.sleep(printNum);
			} catch (Exception e) {
			}
			return printNum;
		}
		
		
	}
	
	
}
------------------



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics