DelayQueue的使用

Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
DelayQueue<Item> queue = new DelayQueue<>();
queue.put(item1);
queue.put(item2);
queue.put(item3);
System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
for (int i = 0; i < 3; i++) {
    Item take = queue.take();
    System.out.format("name:{%s}, time:{%s}\n", take.name,LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
  • Item需实现Delayed接口
  • Delayed.getDelay()方法获取延时的时间

DelayQueue的实现

DelayQueue属性

  • private final PriorityQueue<E> q = new PriorityQueue();保存延时队列元素的优先级队列,通过元素的compareTo方法决定元素在优先级队列中的位置
  • private final transient ReentrantLock lock = new ReentrantLock();延时队列锁对象,ReentrantLock通过aqs实现
  • private final Condition available;
    • aqs的ConditionObject
    • 可以向该对象中添加ConditionWaiter
    • 并且可通过available的doSignal方法唤醒waiter
    • 具体唤醒方法从waiter上获取线程,并通过LockSupport.unpark(var1.thread)唤醒该线程

put

  • DelayQueue.put(item1)向队列中添加元素
  • 具体即为向优先级队列q中添加元素
  • 在向q添加元素前后分别执行lock.lock()和lock.unlock()
  • 如果该元素添加后在优先级队列的首位,则发出signal通知,唤醒线程

take

  • Item take = queue.take()从延时队列中获取元素
  • 在while死循环中从优先级队列q中获取元素
  • 如果获取到元素调用返回元素的getDelay()方法,并与0进行比较,如果小于等于0,则将该属性从q中删除并返回
  • 如果大于0,并且已存在等待的线程(this.leader),则通过available.await()方法挂起线程
  • 如果大于0,并且不存在等待的线程,则通过available.awaitNanos(var3)方法挂起delay time,具体实现为LockSupport.parkNanos(this, nanosTimeout)
  • available.awaitNanos(var3)到达时间后,会继续运行线程将this.leader设置为null,然后再次循环获取到元素并返回
  • 返回元素退出while循环,在finally中通过this.available.signal()在AbstractQueuedSynchronizer队列中获取第一个node并通过unpark唤醒