// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.core; import java.util.Objects; import java.util.Queue; /** * @author Haavard Pettersen * @author Simon Thoresen Hult */ class ScheduledQueue { public static final int MILLIS_PER_SLOT = 100; public static final int NUM_SLOTS = 512; public static final int NUM_SLOTS_UNDILATED = 3; public static final int SLOT_MASK = 511; // bitmask to modulo NUM_SLOTS public static final int ITER_SHIFT = 9; // number of bits to shift off SLOT_MASK private final Entry[] slots = new Entry[NUM_SLOTS + 1]; private final int[] counts = new int[NUM_SLOTS + 1]; private int currIter = 0; private int currSlot = 0; private long nextTick; public ScheduledQueue(long currentTimeMillis) { this.nextTick = currentTimeMillis + MILLIS_PER_SLOT; } public Entry newEntry(Object payload) { Objects.requireNonNull(payload, "payload"); return new Entry(payload); } public synchronized void drainTo(long currentTimeMillis, Queue out) { if (slots[NUM_SLOTS] == null && currentTimeMillis < nextTick) { return; } drainTo(NUM_SLOTS, 0, out); for (int i = 0; currentTimeMillis >= nextTick; i++, nextTick += MILLIS_PER_SLOT) { if (i < NUM_SLOTS_UNDILATED) { if (++currSlot >= NUM_SLOTS) { currSlot = 0; currIter++; } drainTo(currSlot, currIter, out); } } } private void drainTo(int slot, int iter, Queue out) { int cnt = counts[slot]; Entry entry = slots[slot]; for (int i = 0; i < cnt; i++) { Entry next = entry.next; if (entry.iter == iter) { linkOut(entry); out.add(entry.payload); } entry = next; } } private synchronized void scheduleAt(Entry entry, long expireAtMillis) { if (entry.next != null) { linkOut(entry); } long delayMillis = expireAtMillis - nextTick; if (delayMillis < 0) { entry.slot = NUM_SLOTS; entry.iter = 0; } else { long ticks = 1 + (int)((delayMillis + MILLIS_PER_SLOT / 2) / MILLIS_PER_SLOT); entry.slot = (int)((ticks + currSlot) & SLOT_MASK); entry.iter = currIter + (int)((ticks + currSlot) >> ITER_SHIFT); } linkIn(entry); } private synchronized void unschedule(Entry entry) { if (entry.next != null) { linkOut(entry); } } private void linkIn(Entry entry) { Entry head = slots[entry.slot]; if (head == null) { entry.next = entry; entry.prev = entry; slots[entry.slot] = entry; } else { entry.next = head; entry.prev = head.prev; head.prev.next = entry; head.prev = entry; } ++counts[entry.slot]; } private void linkOut(Entry entry) { Entry head = slots[entry.slot]; if (entry.next == entry) { slots[entry.slot] = null; } else { entry.prev.next = entry.next; entry.next.prev = entry.prev; if (head == entry) { slots[entry.slot] = entry.next; } } entry.next = null; entry.prev = null; --counts[entry.slot]; } public class Entry { private final Object payload; private int slot; private int iter; private Entry next; private Entry prev; private Entry(Object payload) { this.payload = payload; } public void scheduleAt(long expireAtMillis) { ScheduledQueue.this.scheduleAt(this, expireAtMillis); } public void unschedule() { ScheduledQueue.this.unschedule(this); } } }