From 97a3166a34b4fc343bffc9f0f36eed6ff2c806d2 Mon Sep 17 00:00:00 2001 From: Arnstein Ressem Date: Mon, 9 Jan 2017 20:48:11 +0100 Subject: Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender""""""""" --- .../main/java/com/yahoo/feedapi/SharedSender.java | 199 ++++++++------------- 1 file changed, 77 insertions(+), 122 deletions(-) (limited to 'vespaclient-core/src/main') diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 08243c4e0ec..48ff0eae36b 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -6,9 +6,9 @@ import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; /** @@ -23,9 +23,11 @@ public class SharedSender implements ReplyHandler { public static final Logger log = Logger.getLogger(SharedSender.class.getName()); private SendSession sender; + private final Object monitor = new Object(); private RouteMetricSet metrics; - private ConcurrentHashMap activeOwners = new ConcurrentHashMap<>(); + // Maps from filename to number of pending requests + private Map activeOwners = new HashMap<>(); /** * Creates a new shared sender. @@ -48,16 +50,17 @@ public class SharedSender implements ReplyHandler { } public void remove(ResultCallback owner) { - OwnerState state = activeOwners.remove(owner); - if (state != null) { - state.clearPending(); + synchronized (monitor) { + activeOwners.remove(owner); } } public void shutdown() { try { - while ( ! activeOwners.isEmpty()) { - Thread.sleep(10); + synchronized (monitor) { + while ( ! activeOwners.isEmpty()) { + monitor.wait(180 * 1000); + } } } catch (InterruptedException e) { } @@ -73,21 +76,36 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - OwnerState state = activeOwners.get(owner); - if (state != null) { - try { - return state.waitPending(timeoutMs); - } catch (InterruptedException e) { - return false; + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMs; + + try { + while (timeoutMs == -1 || timeLeft > 0) { + synchronized (monitor) { + Integer count = activeOwners.get(owner); + if (count == null || count == 0) { + return true; + } else if (timeLeft > 0) { + monitor.wait(timeLeft); + } else { + monitor.wait(); + } + } + + timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); } + } catch (InterruptedException e) { } - return true; + return false; } public int getPendingCount(ResultCallback owner) { - OwnerState state = activeOwners.get(owner); - return (state != null) ? state.getNumPending() : 0; + Integer count = activeOwners.get(owner); + if (count == null) { + return 0; + } + return count; } /** @@ -107,12 +125,7 @@ public class SharedSender implements ReplyHandler { * @param owner the file to check for pending documents */ public void waitForPending(ResultCallback owner) { - OwnerState state = activeOwners.get(owner); - if (state != null) { - try { - state.waitPending(); - } catch (InterruptedException e) { } - } + waitForPending(owner, -1); } /** @@ -140,17 +153,31 @@ public class SharedSender implements ReplyHandler { return; } - OwnerState state = activeOwners.get(owner); - if (state == null) { - OwnerState newState = new OwnerState(); - state = activeOwners.putIfAbsent(owner, newState); + try { + synchronized (monitor) { + if (maxPendingPerOwner != -1 && blockingQueue) { + while (true) { + Integer count = activeOwners.get(owner); + + if (count != null && count >= maxPendingPerOwner) { + log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); + monitor.wait(10000); + } else { + break; + } + } + } - } - if (state != null) { - if (maxPendingPerOwner != -1 && blockingQueue) { - state.waitMaxPendingBelow(maxPendingPerOwner); + Integer count = activeOwners.get(owner); + + if (count == null) { + activeOwners.put(owner, 1); + } else { + activeOwners.put(owner, count + 1); + } } - state.addPending(1); + } catch (InterruptedException e) { + return; } msg.setContext(owner); @@ -176,102 +203,30 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - ResultCallback owner = (ResultCallback) r.getContext(); - if (owner == null) { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); - return; - } + synchronized (monitor) { + ResultCallback owner = (ResultCallback) r.getContext(); - metrics.addReply(r); - OwnerState state = activeOwners.get(owner); + if (owner != null) { + metrics.addReply(r); - if (state == null) { - // TODO: should be debug level if at all - log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); - return; - } + Integer count = activeOwners.get(owner); - int numPending = state.getNumPending() - 1; - boolean noMorePending = state.decPending(1); - if (noMorePending) { - numPending = 0; - } - boolean active = owner.handleReply(r, numPending); - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); - } - if (!active) { - state.clearPending(); - activeOwners.remove(owner); - } - } + if (count != null) { + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); + } - private static final class Sync extends AbstractQueuedSynchronizer { - Sync(int initialCount) { - setState(initialCount); - } - - int getCount() { - return getState(); - } - - @Override - protected int tryAcquireShared(int acquires) { - return (getState() == 0) ? 1 : -1; - } - - @Override - protected boolean tryReleaseShared(int releases) { - // Increment/Decrement count; signal when transition downwards to zero. - // releases == 0 means unblock all - while ( true ) { - int c = getState(); - if ((c == 0) && (releases >= 0)) { return false; } - int nextc = (c > releases) ? ((releases != 0) ? c - releases : 0) : 0; - if (compareAndSetState(c, nextc)) { - return nextc == 0; + if ( ! owner.handleReply(r, count - 1)) { + activeOwners.remove(owner); + } else { + activeOwners.put(owner, count - 1); + } } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); } - } - } - - private static final class OwnerState { - - private static final long REACT_LATENCY_ON_WATERMARK_MS = 5; - - private final Sync sync = new Sync(1); - - void addPending(int count) { - sync.releaseShared(-count); - } - - boolean decPending(int count) { - return sync.releaseShared(count); - } - - void waitMaxPendingBelow(int limit) { - try { - while (getNumPending() > limit) { - sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS)); - } - } catch (InterruptedException e) { - } - } - - int getNumPending() { - return sync.getCount(); - } - - void clearPending() { - sync.releaseShared(0); - } - - boolean waitPending(long timeoutMS) throws InterruptedException { - return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS)); - } - void waitPending() throws InterruptedException { - sync.tryAcquireShared(1); + monitor.notifyAll(); } } -- cgit v1.2.3