From 27dfcb0237f8be5b806d3d17a8aa875b586d2854 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 9 Jan 2017 11:57:46 +0100 Subject: Git commit -a -m "Implment an AbstractQueuedSynchronizer do handle up/down counting. --- .../main/java/com/yahoo/feedapi/SharedSender.java | 82 +++++++++++++--------- 1 file changed, 49 insertions(+), 33 deletions(-) (limited to 'vespaclient-core/src') diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index cb2fe2dee83..2a7fe3e61ad 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Logger; /** @@ -23,8 +23,6 @@ public class SharedSender implements ReplyHandler { private SendSession sender; private RouteMetricSet metrics; - private static final int REACT_LATENCY_ON_RACE = 5; - private static final int REACT_LATENCY_ON_WATERMARK = 5; private ConcurrentHashMap activeOwners = new ConcurrentHashMap<>(); @@ -203,63 +201,81 @@ public class SharedSender implements ReplyHandler { } } - public static class OwnerState { + private static final class Sync extends AbstractQueuedSynchronizer { + Sync() { + setState(0); + } - final AtomicInteger numPending = new AtomicInteger(0); + int getCount() { + return getState(); + } - int addPending(int count) { - return numPending.addAndGet(count); + @Override + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; } - int decPending(int count) { - int newValue = numPending.addAndGet(-count); - if (newValue <= 0) { - synchronized (numPending) { - numPending.notify(); - } + @Override + protected boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (;;) { + int c = getState(); + if (c == 0) + return false; + int nextc = (c > releases) ? c - releases : 0; + if (compareAndSetState(c, nextc)) + return nextc == 0; } - return newValue; + } + } + + public static class OwnerState { + private static final long MS2NS = 1000000; + private static final long REACT_LATENCY_ON_RACE = 5 *MS2NS; + private static final long REACT_LATENCY_ON_WATERMARK = 5 *MS2NS; + + private final Sync sync = new Sync(); + + void addPending(int count) { + sync.releaseShared(-count); + } + + void decPending(int count) { + sync.releaseShared(count); } void waitMaxPendingbelow(int limit) { try { - synchronized (numPending) { - while (numPending.get() > limit) { - numPending.wait(REACT_LATENCY_ON_WATERMARK); - } + while (getNumPending() > limit) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_WATERMARK); } } catch (InterruptedException e) { } } int getNumPending() { - return numPending.get(); + return sync.getCount(); } void clearPending() { - numPending.set(0); - synchronized (numPending) { - numPending.notify(); - } + sync.releaseShared(0); } boolean waitPending(long timeoutMS) throws InterruptedException { long timeStart = SystemTimer.INSTANCE.milliTime(); long timeLeft = timeoutMS; - synchronized (numPending) { - while ((numPending.get() > 0) && (timeLeft > 0)) { - numPending.wait(REACT_LATENCY_ON_RACE); - timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); - } + + while ((getNumPending() > 0) && (timeLeft > 0)) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); + timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); } - return numPending.get() <= 0; + + return getNumPending() == 0; } void waitPending() throws InterruptedException { - synchronized (numPending) { - while (numPending.get() > 0) { - numPending.wait(REACT_LATENCY_ON_RACE); - } + while (getNumPending() > 0) { + sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); } } } -- cgit v1.2.3