diff options
Diffstat (limited to 'jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java')
-rw-r--r-- | jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java index fb72f3c845a..49b5f685a7c 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java @@ -18,6 +18,8 @@ import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -28,12 +30,12 @@ public class TimeoutManagerImpl { private static final ContentChannel IGNORED_CONTENT = new IgnoredContent(); private static final Logger log = Logger.getLogger(TimeoutManagerImpl.class.getName()); - private final ScheduledQueue schedules[] = new ScheduledQueue[Runtime.getRuntime().availableProcessors()]; + private final ScheduledQueue [] schedules = new ScheduledQueue[Runtime.getRuntime().availableProcessors()]; private final Thread thread; private final Timer timer; - private volatile int nextScheduler = 0; - private volatile int queueSize = 0; - private volatile boolean done = false; + private final AtomicInteger nextScheduler = new AtomicInteger(0); + private final AtomicInteger queueSize = new AtomicInteger(0); + private final AtomicBoolean done = new AtomicBoolean(false); @Inject public TimeoutManagerImpl(ThreadFactory factory, Timer timer) { @@ -52,7 +54,7 @@ public class TimeoutManagerImpl { } public void shutdown() { - done = true; + done.set(true); } public RequestHandler manageHandler(RequestHandler handler) { @@ -60,7 +62,7 @@ public class TimeoutManagerImpl { } int queueSize() { - return queueSize; // unstable snapshot, only for test purposes + return queueSize.get(); // unstable snapshot, only for test purposes } Timer timer() { @@ -94,14 +96,22 @@ public class TimeoutManagerImpl { private class ManagerTask implements Runnable { + boolean oneMoreCheck(int timeoutMS) { + synchronized (done) { + if (!done.get()) { + try { + done.wait(timeoutMS); + } catch (InterruptedException e) { + log.log(Level.WARNING, "Ignoring interrupt signal in timeout manager.", e); + } + } + } + return ! done.get(); + } + @Override public void run() { - while (!done) { - try { - Thread.sleep(ScheduledQueue.MILLIS_PER_SLOT); - } catch (InterruptedException e) { - log.log(Level.WARNING, "Ignoring interrupt signal in timeout manager.", e); - } + while (oneMoreCheck(ScheduledQueue.MILLIS_PER_SLOT)) { checkTasks(timer.currentTimeMillis()); } } @@ -185,10 +195,10 @@ public class TimeoutManagerImpl { return; } if (timeoutQueueEntry == null) { - timeoutQueueEntry = schedules[(++nextScheduler & 0xffff) % schedules.length].newEntry(this); + timeoutQueueEntry = schedules[(nextScheduler.incrementAndGet() & 0xffff) % schedules.length].newEntry(this); } timeoutQueueEntry.scheduleAt(request.creationTime(TimeUnit.MILLISECONDS) + request.getTimeout(TimeUnit.MILLISECONDS)); - ++queueSize; + queueSize.incrementAndGet(); } synchronized void unscheduleTimeout() { @@ -198,7 +208,7 @@ public class TimeoutManagerImpl { //followed by unscheduling in another thread from TimeoutHandler.handleResponse timeoutQueueEntry = null; } - --queueSize; + queueSize.decrementAndGet(); } @Override |