`
阿尔萨斯
  • 浏览: 4171526 次
社区版块
存档分类
最新评论

关于ArrayBlockingQueue队列的一些问题

 
阅读更多

背景:最近接手一个新的应用-商机快递,主要是给用户发送营销邮件,由于不定期会有漏发的情况,所以在里面加了一些逻辑来修复这个问题,由于系统采用了多线程的方式,改之前考虑的不周全,最后会导致重发的现象。

 // 创建一个阻塞队列,容量为maxThread*2
            ArrayBlockingQueue blockQueue = new ArrayBlockingQueue(maxThread * 2);
            exec = new BizExpressThreadPoolExecutor(maxThread, maxThread, 60, TimeUnit.SECONDS, blockQueue,
                                                    maxThread * 2);
            exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

            // 该方法可以给当前的进程注册一个清理线程,当进程退出的时候,会执行线程中的代码。
            Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
            while (!isShutDown) {
                exec.execute(new BizExpressMailRun());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
            }
 /**
     * @param corePoolSize 线程池维护线程的最少数量
     * @param maximumPoolSize 线程池维护线程的最大数量
     * @param keepAliveTime 线程池维护线程所允许的空闲时间
     * @param unit 线程池维护线程所允许的空闲时间的单位
     * @param workQueue 线程池所使用的缓冲队列
     * @param semaphore
     */
    public BizExpressThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, int semaphore){
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        available = new Semaphore(semaphore, true);
    }

系统初始时会实例一个ThreadPoolExecutor对象(exec)。ThreadPoolExecutor是并发包中一个提供线程池的服务,可以很容易将一个实现了Runnable接口的任务放入线程池中执行。具体的execute(Runnable)方法执行过程为:

首先判断传入的Runnable对象是否为null,如果为null直接抛出NullPointException异常。如果不为空执行下面步骤

如果当前的线程数小于配置的corePoolSize,则调用addIfUnderCorePoolSize方法进而会调用mainLock锁。

如果当前的线程数小于配置的corePoolSize并且线程处于RUNNING状态,调用addThread增加线程,

addThread方法首先创建Worker对象,然后调用threadFactory( Thread newThread(Runnable r); )创建新的线程,如果创建新的线程不为null时,将Worker对象的thread属性指向此创建出来的线程,并将此Worker对象放入到workers中,最后增加当前线程池中的线程数。

用代码描述为:

    private List workers;
 private int count;//当前线程池中的线程数
 public void addThread(Runnable r){
 Worker worker=new Worker();
 Thread tt=threadFactory.newThread(r);
 if(rr!=null){
 worker.setThread(tt);
 }
 workers.add(worker);
 count++;
 }

--------------------------

 while (!isShutDown) {
     exec.execute(new BizExpressMailRun());
       try {
             Thread.sleep(2000);
        } catch (InterruptedException e) {
      }
 }
这是阻塞队列的执行入口,是一个循环过程,中间会休眠2秒,队列的长度是初始化时的corePoolSize,消费和生产无序进行。

原来的做方法是

public void doPerform() {
        List<BizExpressDailyDO> bizexpresses = bizExpressDailyDAO.fetchSomeBizExpressDaily(
                                                                                           BizExpressConfig.getServerIp(),
                                                                                           BizExpressConfig.getBuildEachFetchNum());
        // 如果没有取到数据,则休眠5秒,避免反复读取数据库。
        if (bizexpresses == null || bizexpresses.size() == 0) {
            try {
                Thread.currentThread().sleep(5000);
                // 补发发送失败邮件
                doContinue();
            } catch (InterruptedException e) {
            }
            return;
        }
        takeCareOf(bizexpresses);
    }

一个线程执行doPerform方法,会改biz_express_daily表中的记录的ip,然后再执行takeCareOf方法来发邮件,发送成功后会将记录的status字段值改掉,但是关键的是在完成status改掉之前会有一段时间。

另外的线程调用doPerform方法,此时bizexpresses 为空,进入到if分支,调用doContinue方法。

public void doContinue() {
        // 获取当日被选出,但未被处理过的纪录,最多取100条
        List<BizExpressDailyDO> bizexpresses = bizExpressDailyDAO.fetchLastUnbuiltBizExpressDaily(
                                                                                                  BizExpressConfig.getServerIp(),
                                                                                                  BizExpressConfig.getBuildEachFetchNum());
        if (bizexpresses.size() > 0) {
            takeCareOf(bizexpresses);
        } else {
            return;
        }

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
        }

        doContinue();
    }

此时的bizexpresses 是有值的,而且取出来的正好是刚才第一个线程里面的值,然后同样执行takeCareOf方法发送邮件。

这样就会造成邮件的重发,当然根据线程抢占的激烈程度,会导致重发邮件的数量也不一致。

分享到:
评论

相关推荐

    Java可阻塞队列-ArrayBlockingQueue

    在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue,  ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...

    阻塞队列阻塞队列阻塞队列

    java中,常用的阻塞式队列Demo。包含:ArrayBlockingQueue、LinkedQueue、PriorityBlockingQueue

    Java源码解析阻塞队列ArrayBlockingQueue常用方法

    今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue常用方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

    Java源码解析阻塞队列ArrayBlockingQueue介绍

    今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

    Java源码解析阻塞队列ArrayBlockingQueue功能简介

    今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue功能简介,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧

    Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理

    ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。下面通过本文给大家介绍Java concurrency集合之ArrayBlockingQueue的相关知识,感兴趣的朋友一起看看吧

    java 队列使用

    java 队列使用,次例子是一个模拟网络爬虫工作大致流程的小例子,里面没有具体的爬取的实现,只是对爬取的流程的模拟,使用到了java 的 ArrayBlockingQueue、ConcurrentHashMap、 这2个类和java 的 volatile 关键字...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...

    blockingQueues:简单,高性能,goroutine安全队列,可用作资源池或作业队列

    ArrayBlockingQueue :由切片支持的有界阻塞队列 LinkedBlockingQueue :由容器/列表支持的有界阻塞队列 ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues...

    java并发工具包详解

    3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9...

    java集合框架 arrayblockingqueue应用分析

    ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素

    java并发工具包 java.util.concurrent中文版用户指南pdf

    3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链...

    【Java】Queue、BlockingQueue和队列实现生产者消费者模式

    BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...

    java并发包资源

    3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...

    blocking-queue:COEN 283 的阻塞队列项目

    阻塞队列 COEN 283 的阻塞队列项目 通过阻塞队列传递消息 使用同步的消息缓冲区使用队列来实现阻塞队列,并在队列为空/满时进行自旋... 使用 ArrayBlockingQueue 的消息缓冲区 线程观察者监控线程生产者和消费者的效率

    Java容器.xmind

    container Collection 标记: 顶级接口 List 标记: interface ...ArrayBlockingQueue 数组结构实现,有界队列,手工固定上限 LinkedBlockingQueue 链表结构实现,无界队列(默认上限Integer.MAX_VALUE)

    JavaInterview:最开源的Java技术知识点,以及Java源码分析。为开源贡献自己的一份力

    该项目主要分享一些个人经验,以及一些个人项目中遇到的问题;还有就是一些读书笔记。 如果大家觉得该项目还不错,可以帮忙star或者fork下,你的star就是我的动力,谢谢! 为开源贡献自己的一份力量。 :blue_book: ...

    第7章-JUC多线程v1.1.pdf

    ArrayBlockingQueue LinkedBlockingQueue SynchronousQueue Handler :拒绝处理策略, 线程数量大于最大线程数量就会拒绝处理策略, 四种策略为 ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出...

Global site tag (gtag.js) - Google Analytics