summaryrefslogtreecommitdiffstats
path: root/jdisc_core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-10 13:27:45 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-10 13:27:45 +0200
commit78fbd3b29837066e5d1b43efd70df6c01ed2a34f (patch)
tree1e2be218dbf6e7d5494a9f0f32b6f450d2ba2575 /jdisc_core
parent165142127b8d40621fdd29a2fcac74084eeb4d9c (diff)
Make timeout manager thread safe
Diffstat (limited to 'jdisc_core')
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java40
1 files changed, 25 insertions, 15 deletions
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java b/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java
index fb72f3c845a..49b5f685a7c 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/core/TimeoutManagerImpl.java
@@ -18,6 +18,8 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,12 +30,12 @@ public class TimeoutManagerImpl {
private static final ContentChannel IGNORED_CONTENT = new IgnoredContent();
private static final Logger log = Logger.getLogger(TimeoutManagerImpl.class.getName());
- private final ScheduledQueue schedules[] = new ScheduledQueue[Runtime.getRuntime().availableProcessors()];
+ private final ScheduledQueue [] schedules = new ScheduledQueue[Runtime.getRuntime().availableProcessors()];
private final Thread thread;
private final Timer timer;
- private volatile int nextScheduler = 0;
- private volatile int queueSize = 0;
- private volatile boolean done = false;
+ private final AtomicInteger nextScheduler = new AtomicInteger(0);
+ private final AtomicInteger queueSize = new AtomicInteger(0);
+ private final AtomicBoolean done = new AtomicBoolean(false);
@Inject
public TimeoutManagerImpl(ThreadFactory factory, Timer timer) {
@@ -52,7 +54,7 @@ public class TimeoutManagerImpl {
}
public void shutdown() {
- done = true;
+ done.set(true);
}
public RequestHandler manageHandler(RequestHandler handler) {
@@ -60,7 +62,7 @@ public class TimeoutManagerImpl {
}
int queueSize() {
- return queueSize; // unstable snapshot, only for test purposes
+ return queueSize.get(); // unstable snapshot, only for test purposes
}
Timer timer() {
@@ -94,14 +96,22 @@ public class TimeoutManagerImpl {
private class ManagerTask implements Runnable {
+ boolean oneMoreCheck(int timeoutMS) {
+ synchronized (done) {
+ if (!done.get()) {
+ try {
+ done.wait(timeoutMS);
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Ignoring interrupt signal in timeout manager.", e);
+ }
+ }
+ }
+ return ! done.get();
+ }
+
@Override
public void run() {
- while (!done) {
- try {
- Thread.sleep(ScheduledQueue.MILLIS_PER_SLOT);
- } catch (InterruptedException e) {
- log.log(Level.WARNING, "Ignoring interrupt signal in timeout manager.", e);
- }
+ while (oneMoreCheck(ScheduledQueue.MILLIS_PER_SLOT)) {
checkTasks(timer.currentTimeMillis());
}
}
@@ -185,10 +195,10 @@ public class TimeoutManagerImpl {
return;
}
if (timeoutQueueEntry == null) {
- timeoutQueueEntry = schedules[(++nextScheduler & 0xffff) % schedules.length].newEntry(this);
+ timeoutQueueEntry = schedules[(nextScheduler.incrementAndGet() & 0xffff) % schedules.length].newEntry(this);
}
timeoutQueueEntry.scheduleAt(request.creationTime(TimeUnit.MILLISECONDS) + request.getTimeout(TimeUnit.MILLISECONDS));
- ++queueSize;
+ queueSize.incrementAndGet();
}
synchronized void unscheduleTimeout() {
@@ -198,7 +208,7 @@ public class TimeoutManagerImpl {
//followed by unscheduling in another thread from TimeoutHandler.handleResponse
timeoutQueueEntry = null;
}
- --queueSize;
+ queueSize.decrementAndGet();
}
@Override