From c90866eff408818f0735dad286e8eb4b14990d41 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 9 Jan 2017 13:46:52 +0100 Subject: Simplify since there is no longer any race. --- .../main/java/com/yahoo/feedapi/SharedSender.java | 52 +++++++++------------- 1 file changed, 21 insertions(+), 31 deletions(-) (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java') diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 2a7fe3e61ad..85e4d700fce 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -7,6 +7,7 @@ import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Logger; @@ -143,15 +144,15 @@ public class SharedSender implements ReplyHandler { if (state == null) { OwnerState newState = new OwnerState(); state = activeOwners.putIfAbsent(owner, newState); - if (state == null) { - state = newState; - } + } - if (maxPendingPerOwner != -1 && blockingQueue) { - state.waitMaxPendingbelow(maxPendingPerOwner); + if (state != null) { + if (maxPendingPerOwner != -1 && blockingQueue) { + state.waitMaxPendingbelow(maxPendingPerOwner); + } + state.addPending(1); } - state.addPending(1); msg.setContext(owner); try { @@ -202,8 +203,8 @@ public class SharedSender implements ReplyHandler { } private static final class Sync extends AbstractQueuedSynchronizer { - Sync() { - setState(0); + Sync(int count) { + setState(1); } int getCount() { @@ -217,24 +218,23 @@ public class SharedSender implements ReplyHandler { @Override protected boolean tryReleaseShared(int releases) { - // Decrement count; signal when transition to zero - for (;;) { + // Increment/Decrement count; signal when transition downwards to zero. + while ( true ) { int c = getState(); - if (c == 0) - return false; + if ((c == 0) && (releases >= 0)) { return false; } int nextc = (c > releases) ? c - releases : 0; - if (compareAndSetState(c, nextc)) + if (compareAndSetState(c, nextc)) { return nextc == 0; + } } } } - public static class OwnerState { - private static final long MS2NS = 1000000; - private static final long REACT_LATENCY_ON_RACE = 5 *MS2NS; - private static final long REACT_LATENCY_ON_WATERMARK = 5 *MS2NS; + private static final class OwnerState { - private final Sync sync = new Sync(); + private static final long REACT_LATENCY_ON_WATERMARK_MS = 5; + + private final Sync sync = new Sync(1); void addPending(int count) { sync.releaseShared(-count); @@ -247,7 +247,7 @@ public class SharedSender implements ReplyHandler { void waitMaxPendingbelow(int limit) { try { while (getNumPending() > limit) { - sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_WATERMARK); + sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS)); } } catch (InterruptedException e) { } @@ -262,21 +262,11 @@ public class SharedSender implements ReplyHandler { } boolean waitPending(long timeoutMS) throws InterruptedException { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMS; - - while ((getNumPending() > 0) && (timeLeft > 0)) { - sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); - timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); - } - - return getNumPending() == 0; + return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS)); } void waitPending() throws InterruptedException { - while (getNumPending() > 0) { - sync.tryAcquireSharedNanos(1, REACT_LATENCY_ON_RACE); - } + sync.tryAcquireShared(1); } } -- cgit v1.2.3