`
uule
  • 浏览: 6305591 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

使用CompletionService获取多线程返回值

阅读更多

 

public interface CompletionService<V> {
    
    Future<V> submit(Callable<V> task);
    
    Future<V> submit(Runnable task, V result);

    /**
     * Retrieves and removes the Future representing the next
     * completed task, waiting if none are yet present.
     *
     * @return the Future representing the next completed task
     * @throws InterruptedException if interrupted while waiting
     * 阻塞/
    Future<V> take() throws InterruptedException;


    /**
     * Retrieves and removes the Future representing the next
     * completed task or <tt>null</tt> if none are present.
     *
     * @return the Future representing the next completed task, or
     *         <tt>null</tt> if none are present
     * 非阻塞/
    Future<V> poll();
}

 

CompletionService 也不是到处都能用,它不适合处理任务数量有限但个数不可知的场景。例如,要统计某个文件夹中的文件个数,在遍历子文件夹的时候也会“递归地”提交新的任务,但最后到底提交了多少,以及在什么时候提交完了所有任务,都是未知数,无论 CompletionService 还是线程池都无法进行判断。这种情况只能直接用线程池来处理。

 

CompletionService 接口的实例可以充当生产者和消费者的中间处理引擎,从而达到将提交任务和处理结果的代码进行解耦的目的。生产者调用submit 方法提交任务,而消费者调用 poll(非阻塞)或 take(阻塞)方法获取下一个结果:这一特征看起来和阻塞队列(BlockingQueue)类似,两者的区别在于 CompletionService 要负责任务的处理,而阻塞队列则不会。

 

 

在 JDK 中,该接口只有一个实现类 ExecutorCompletionService,该类使用创建时提供的 Executor 对象(通常是线程池)来执行任务,然后将结果放入一个阻塞队列中。

 

 Example1:

1. 背景

在Java5的多线程中,可以使用Callable接口来实现具有返回值的线程。使用线程池的submit方法提交Callable任务,利用submit方法返回的Future存根,调用此存根的get方法来获取整个线程池中所有任务的运行结果。

方法一:如果是自己写代码,应该是自己维护一个Collection保存submit方法返回的Future存根,然后在主线程中遍历这个Collection并调用Future存根的get()方法取到线程的返回值。

方法二:使用CompletionService类,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法获取线程的返回值。

2. 实现代码

package com.clzhang.sample.thread;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPoolTest4 {
    // 具有返回值的测试线程
    class MyThread implements Callable<String> {
        private String name;
        public MyThread(String name) {
            this.name = name;
        }

        @Override
        public String call() {
            int sleepTime = new Random().nextInt(1000);
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 返回给调用者的值
            String str = name + " sleep time:" + sleepTime;
            System.out.println(name + " finished...");

            return str;
        }
    }

    private final int POOL_SIZE = 5;
    private final int TOTAL_TASK = 20;

    // 方法一,自己写集合来实现获取线程池中任务的返回结果
    public void testByQueue() throws Exception {
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        BlockingQueue<Future<String>> queue = new LinkedBlockingQueue<Future<String>>();

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<String> future = pool.submit(new MyThread("Thread" + i));
            queue.add(future);
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            System.out.println("method1:" + queue.take().get());
        }

        // 关闭线程池
        pool.shutdown();
    }

    // 方法二,通过CompletionService来实现获取线程池中任务的返回结果
    public void testByCompetion() throws Exception {
        // 创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
        CompletionService<String> cService = new ExecutorCompletionService<String>(pool);

        // 向里面扔任务
        for (int i = 0; i < TOTAL_TASK; i++) {
            cService.submit(new MyThread("Thread" + i));
        }

        // 检查线程池任务执行结果
        for (int i = 0; i < TOTAL_TASK; i++) {
            Future<String> future = cService.take();
            System.out.println("method2:" + future.get());
        }

        // 关闭线程池
        pool.shutdown();
    }

    public static void main(String[] args) throws Exception {
        ThreadPoolTest4 t = new ThreadPoolTest4();
        t.testByQueue();
        t.testByCompetion();
    }
}

 

 

部分输出:

...

Thread4 finished...
method1:Thread4 sleep time:833
method1:Thread5 sleep time:158
Thread6 finished...
method1:Thread6 sleep time:826
method1:Thread7 sleep time:185
Thread9 finished...
Thread8 finished...
method1:Thread8 sleep time:929
method1:Thread9 sleep time:575

...

Thread11 finished...
method2:Thread11 sleep time:952
Thread18 finished...
method2:Thread18 sleep time:793
Thread19 finished...
method2:Thread19 sleep time:763
Thread16 finished...
method2:Thread16 sleep time:990

...

3. 总结

使用方法一,自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

使用方法二,使用CompletionService来维护处理线程不的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。

 

 

Example2:

当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用future task不合适的,效率也不高的,实例如下:

 

实例一:用一个非complete Service完成的批量任务

 

public class NonCompleteServiceTest {
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<String>[] futures = new FutureTask[10];        
        /**
         * 产生一个随机数,模拟不同的任务的处理时间不同
         */
        for (int i = 0; i < 10; i++) {
            futures[i] = executorService.submit(new Callable<String>() {
                public String call(){
                    int rnt = new Random().nextInt(5);
                    
                    try {
                        Thread.sleep(rnt*1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("run rnt = "+rnt);
                    return String.valueOf(rnt*1000);
                }
            });
        }
        
        /**
         * 获取结果时,如果任务没有完成,则阻塞,在顺序获取结果时,
         * 可能别的任务已经完成,显然效率不高
         */
        for (int i = 0; i < futures.length; i++) {
            System.out.println(futures[i].get());
        }
        executorService.shutdown();
    }

}

 

CompletionService:

 

JDK:

 

public interface CompletionService<V>

将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

 

 

 

public class CompleteServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    
    /**
     * 产生一个随机数,模拟不同的任务的处理时间不同
     */
    for (int i = 0; i < 10; i++) {
        completionService.submit(new Callable<String>() {
            public String call(){
                int rnt = new Random().nextInt(5);
                
                try {
                    Thread.sleep(rnt*1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println("run rnt = "+rnt);
                return String.valueOf(rnt*1000);
            }
        });
    }
    
    /**
     * 获取结果时,总是先拿到队列上已经存在的对象,这样不用依次等待结果
     * 显然效率更高
     */
    for (int i = 0; i < 10; i++) {
        Future<String> future = completionService.take();
        System.out.println(future.get());
    }
    executorService.shutdown();
}
}

 实例来源:http://tomyz0223.iteye.com/blog/1040300

JDK:http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/CompletionService.html

 

  其实也可以不使用CompletionService,可以先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据

 

 ================================================================================

 

 ================================================================================

接下来我们来看看CompletionService接口的具体实现:ExecutorCompletionService。

 

ExecutorCompletionService实现分析


 

成员变量 

 

ExecutorCompletionService有三个成员变量:

executor:执行task的线程池,创建CompletionService必须指定;

aes:主要用于创建待执行task;

completionQueue:存储已完成状态的task,默认是基于链表结构的阻塞队列LinkedBlockingQueue。

 

构造方法 


 

ExecutorCompletionService提供两个构造方法,具体的使用具体情况具体分析,使用者可以根据业务场景来进行选择。

 

task提交 

ExecutorCompletionService提供submit方法来提交Callable类型或者Runnable类型的task: 


 

具体的执行流程如下:

 

1、参数校验,不符合条件的task抛出异常,程序结束;

2、将Callable类型或者Runnable类型的task构造成FutureTask;

3、把构造好的FutureTask交由线程池executor执行。

 

看到这里可能大家会比较疑惑了,task调用submit方法可以提交,完成的task是什么时候被加入到completionQueue里的呢?

 

针对这个问题,从submit方法的源码可以看出,在提交到线程池的时候需要将FutureTask封装成QueueingFuture,我们来看看QueueingFuture的具体实现: 

 


 从源码可以看出,QueueingFuture是FutureTask的子类,实现了done方法,在task执行完成之后将当前task添加到completionQueue,done方法的具体调用在FutureTask的finishCompletion方法,上篇介绍FutureTask的文章已经做过具体的分析,在这里就不再赘述了。

 

已完成状态task获取 

CompletionService的take方法和poll方法都可以获取已完成状态的task,我们来看看具体的实现: 


 

从源码可以看出,take和poll都是调用BlockingQueue提供的方法。既然take和poll都可以获取到已完成状态的task,那么他们的区别是什么呢?

 

take()在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,等待,直到存在这样的task;

poll()在获取并移除已完成状态的task时,如果目前暂时不存在这样的task,不等待,直接返回null。

 

 

  • 大小: 18 KB
  • 大小: 53.2 KB
  • 大小: 25.2 KB
  • 大小: 17.9 KB
  • 大小: 25 KB
分享到:
评论
1 楼 baojunhu99 2019-01-07  
private final int POOL_SIZE = 5; 
private final int TOTAL_TASK = 20;  这块的代码似乎有问题啊,一共创建了5个线程,但是你的for循环获取返回接口,为什么是TOTAL_TASK,循环20次呢

相关推荐

Global site tag (gtag.js) - Google Analytics