diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-09 11:57:46 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-09 11:57:46 +0100 |
commit | 27dfcb0237f8be5b806d3d17a8aa875b586d2854 (patch) | |
tree | f8d9f3afc6c8ca60e36e516b111d49c2d26d4fd8 /vespaclient-core | |
parent | ef63ebdf34df4b97d8cc54afedd32c55a3d9fabf (diff) |
Git commit -a -m "Implment an AbstractQueuedSynchronizer do handle up/down counting.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java | 82 |
1 files changed, 49 insertions, 33 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index cb2fe2dee83..2a7fe3e61ad 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Logger; /** @@ -23,8 +23,6 @@ public class SharedSender implements ReplyHandler { private SendSession sender; private RouteMetricSet metrics; - private static final int REACT_LATENCY_ON_RACE = 5; - private static final int REACT_LATENCY_ON_WATERMARK = 5; private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>(); @@ -203,63 +201,81 @@ public class SharedSender implements ReplyHandler { } } - public static class OwnerState { + private static final class Sync extends AbstractQueuedSynchronizer { + Sync() { + setState(0); + } - final AtomicInteger numPending = new AtomicInteger(0); + int getCount() { + return getState(); + } - int addPending(int count) { - return numPending.addAndGet(count); + @Override + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; } - int decPending(int count) { - int newValue = numPending.addAndGet(-count); - if (newValue <= 0) { - synchronized (numPending) { - numPending.notify(); - } + @Override + protected boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (;;) { + int c = getState(); + if (c == 0) + return false; + int nextc = (c > releases) ? c - releases : 0; + if (compareAndSetState(c, nextc)) + return nextc == 0; } - return newValue; + } + } + + public static class OwnerState { + private static final long MS2NS = 1000000; + private static final long REACT_LATENCY_ON_RACE = 5 *MS2NS; + private static final long REACT_LATENCY_ON_WATERMARK = 5 *MS2NS; + + private final Sync sync = new Sync(); + + void addPending(int count) { + sync.releaseShared(-count); + } + + void decPending(int count) { + sync.releaseShared(count); } void waitMaxPendingbelow(int limit) { try { - synchronized (numPending) { - while (numPending.get() > limit) { - numPending.wait(REACT_LATENCY_ON_WATERMARK); - } + while (getNumPending() > limit) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_WATERMARK); } } catch (InterruptedException e) { } } int getNumPending() { - return numPending.get(); + return sync.getCount(); } void clearPending() { - numPending.set(0); - synchronized (numPending) { - numPending.notify(); - } + sync.releaseShared(0); } boolean waitPending(long timeoutMS) throws InterruptedException { long timeStart = SystemTimer.INSTANCE.milliTime(); long timeLeft = timeoutMS; - synchronized (numPending) { - while ((numPending.get() > 0) && (timeLeft > 0)) { - numPending.wait(REACT_LATENCY_ON_RACE); - timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); - } + + while ((getNumPending() > 0) && (timeLeft > 0)) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); + timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); } - return numPending.get() <= 0; + + return getNumPending() == 0; } void waitPending() throws InterruptedException { - synchronized (numPending) { - while (numPending.get() > 0) { - numPending.wait(REACT_LATENCY_ON_RACE); - } + while (getNumPending() > 0) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); } } } |