DelayQueue是一个无界阻塞队列,队列中的元素比较特殊,必须是实现了Delayed接口的元素。Delayed接口是一个混合接口,它继承了Comparator接口。它也具有PriorityBlockingQueue的特征,元素中优化级最高的元素是延迟时间最长的元素。队列头的元素是呆在队列时间最长的元素,它只有到时期,才能出队。即getDelay获取到的时间小于等于0时,否则返回null元素。 DelayQueue的并发控制同样使用ReentrantLock和它的Condition对象来实现。因为添加元素不阻塞,所以也只有一个Condition对象来实现等待/通知模式。DelayQueue同样不允许使用null元素。
一、DelayQueue的结构 DelayQueue的结构和PriorityBlockingQueue基本一致,它持有一个PriorityQueue的引用 各种方法实现也委托给了PriorityQueue对象来实现。另外还有一个ReentrantLock和它的Condition对象;队列中的元素是Delayed接口类型的元素。 Delayed接口定义:
1 2 3 public interface Delayed extends Comparable <Delayed > { long getDelay (TimeUnit unit) ; }
DelayQueue的主要成员变量:
1 2 3 private transient final ReentrantLock lock = new ReentrantLock();private transient final Condition available = lock.newCondition();private final PriorityQueue<E> q = new PriorityQueue<E>();
二、DelayQueue的主要方法实现 1、入队操作
入队操作因为是无界队列,所有不会出现阻塞,put/offer都正常添加到队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0 ) available.signalAll(); return true ; } finally { lock.unlock(); } }
2、出队操作 因为出队必须是到期的元素,如果获取不到元素,阻塞版本的take会阻塞等待到delay的时间到期,而超时版本的poll会返回null。具体的实现都大同小异,只看take方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0 ) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null ; if (q.size() != 0 ) available.signalAll(); return x; } } } } finally { lock.unlock(); } }
具体的实现流程如下: 1).获取锁,并加锁; 2).开始自旋,利用peek方法获取队列的头元素,如果获取失败,则释放锁,进入Condition的等待队列; 3).如果能获取到队列头的元素,则判断到期时间;如果还未到期,则继续在剩下的时间中await; 4).如果元素已经到时,则获取成功,poll出队,同时判断队列如果非空,则继续通知阻塞的线程; 5).最后返回获取到的元素;
三、使用DelayQueue DelayQueue可以用于很多场景,比如缓存过期管理、会话过期管理、连接超时管理等。下面的例子是使用DelayQueue来管理缓存中过期的元素。
1、保存数据的键值对类:
1 2 3 4 5 6 7 8 9 public class Pair <K , V > { public K first; public V second; public Pair () {} public Pair (K first,V second) { this .first = first; this .second = second; } }
2、实现Delayed接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class DelayItem <T > implements Delayed { private static long NANO_ORIGIN = System.nanoTime(); final static long now () { return System.nanoTime() - NANO_ORIGIN; } private static final AtomicLong sequencer = new AtomicLong(0 ); private long sequenceNumber; private final long time; private final T item; public DelayItem (T sumbmit,long timeout) { this .item = sumbmit; this .time = now() + timeout; this .sequenceNumber = sequencer.getAndIncrement(); } public T getItem () { return this .item; } @Override public int compareTo (Delayed other) { if (other == this ) return 0 ; if (other instanceof DelayItem){ DelayItem x = (DelayItem)other; long diff = time - x.time; if (diff < 0 ) return -1 ; else if (diff > 0 ) return 1 ; else if (sequenceNumber < x.sequenceNumber) return -1 ; else return 1 ; } long d = this .getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); return d == 0 ? 0 : (d < 0 ) ? -1 : 1 ; } @Override public long getDelay (TimeUnit unit) { long d = unit.convert(time - now() , unit.NANOSECONDS); return d; } }
3、缓存实现和测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class Cache <K ,V > { private final ConcurrentMap<K,V> cache = new ConcurrentHashMap<K,V>(); private final DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>(); private Thread daemonThread; public Cache () { Runnable checkTask = new Runnable(){ @Override public void run () { checkTimeout(); } }; daemonThread = new Thread(checkTask); daemonThread.setDaemon(true ); daemonThread.start(); } private void checkTimeout () { for (;;){ try { DelayItem<Pair<K,V>> item = q.take(); if (item != null ){ Pair<K,V> pair = item.getItem(); cache.remove(pair.first,pair.second); } } catch (InterruptedException e) { e.printStackTrace(); break ; } } } public void put (K key,V value,long time,TimeUnit unit) { V oldValue = cache.put(key, value); if (oldValue != null ) cache.remove(oldValue); long nanoTime = TimeUnit.NANOSECONDS.convert(time,unit); q.put(new DelayItem<Pair<K,V>>(new Pair<K,V>(key,value),nanoTime)); } public V get (K key) { return cache.get(key); } public static void main (String[] args) throws InterruptedException { Cache<Integer,String> cache = new Cache<Integer,String>(); cache.put(1 , "aaaa" , 3 , TimeUnit.SECONDS); Thread.sleep(2000 ); System.out.println(cache.get(1 )); Thread.sleep(2000 ); System.out.println(cache.get(1 )); } }