diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-09 17:26:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-09 17:26:13 +0100 |
commit | cc161c73c27a5c9e7ddad25102dd1a64910cabdf (patch) | |
tree | 33c3eb6a868c1ee4518c8add342457cfccf1c1a0 /vespaclient-core/src | |
parent | b99468d847b444a3ad7f4aeba0f2eac1906c74cb (diff) | |
parent | e3f1192b39150b70372b508032dc1204b747eaf6 (diff) |
Merge pull request #1457 from yahoo/revert-1456-revert-1453-revert-1452-revert-1451-revert-1450-revert-1449-revert-1448-revert-1447-balder/avoid-costly-notifyall-in-sharedsender
Revert "Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender""""""""
Diffstat (limited to 'vespaclient-core/src')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java | 199 |
1 files changed, 122 insertions, 77 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 48ff0eae36b..08243c4e0ec 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -6,9 +6,9 @@ import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; - -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.logging.Logger; /** @@ -23,11 +23,9 @@ public class SharedSender implements ReplyHandler { public static final Logger log = Logger.getLogger(SharedSender.class.getName()); private SendSession sender; - private final Object monitor = new Object(); private RouteMetricSet metrics; - // Maps from filename to number of pending requests - private Map<ResultCallback, Integer> activeOwners = new HashMap<>(); + private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>(); /** * Creates a new shared sender. @@ -50,17 +48,16 @@ public class SharedSender implements ReplyHandler { } public void remove(ResultCallback owner) { - synchronized (monitor) { - activeOwners.remove(owner); + OwnerState state = activeOwners.remove(owner); + if (state != null) { + state.clearPending(); } } public void shutdown() { try { - synchronized (monitor) { - while ( ! activeOwners.isEmpty()) { - monitor.wait(180 * 1000); - } + while ( ! activeOwners.isEmpty()) { + Thread.sleep(10); } } catch (InterruptedException e) { } @@ -76,36 +73,21 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMs; - - try { - while (timeoutMs == -1 || timeLeft > 0) { - synchronized (monitor) { - Integer count = activeOwners.get(owner); - if (count == null || count == 0) { - return true; - } else if (timeLeft > 0) { - monitor.wait(timeLeft); - } else { - monitor.wait(); - } - } - - timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); + OwnerState state = activeOwners.get(owner); + if (state != null) { + try { + return state.waitPending(timeoutMs); + } catch (InterruptedException e) { + return false; } - } catch (InterruptedException e) { } - return false; + return true; } public int getPendingCount(ResultCallback owner) { - Integer count = activeOwners.get(owner); - if (count == null) { - return 0; - } - return count; + OwnerState state = activeOwners.get(owner); + return (state != null) ? state.getNumPending() : 0; } /** @@ -125,7 +107,12 @@ public class SharedSender implements ReplyHandler { * @param owner the file to check for pending documents */ public void waitForPending(ResultCallback owner) { - waitForPending(owner, -1); + OwnerState state = activeOwners.get(owner); + if (state != null) { + try { + state.waitPending(); + } catch (InterruptedException e) { } + } } /** @@ -153,31 +140,17 @@ public class SharedSender implements ReplyHandler { return; } - try { - synchronized (monitor) { - if (maxPendingPerOwner != -1 && blockingQueue) { - while (true) { - Integer count = activeOwners.get(owner); - - if (count != null && count >= maxPendingPerOwner) { - log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); - monitor.wait(10000); - } else { - break; - } - } - } - - Integer count = activeOwners.get(owner); + OwnerState state = activeOwners.get(owner); + if (state == null) { + OwnerState newState = new OwnerState(); + state = activeOwners.putIfAbsent(owner, newState); - if (count == null) { - activeOwners.put(owner, 1); - } else { - activeOwners.put(owner, count + 1); - } + } + if (state != null) { + if (maxPendingPerOwner != -1 && blockingQueue) { + state.waitMaxPendingBelow(maxPendingPerOwner); } - } catch (InterruptedException e) { - return; + state.addPending(1); } msg.setContext(owner); @@ -203,30 +176,102 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - synchronized (monitor) { - ResultCallback owner = (ResultCallback) r.getContext(); + ResultCallback owner = (ResultCallback) r.getContext(); + if (owner == null) { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + return; + } - if (owner != null) { - metrics.addReply(r); + metrics.addReply(r); + OwnerState state = activeOwners.get(owner); - Integer count = activeOwners.get(owner); + if (state == null) { + // TODO: should be debug level if at all + log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); + return; + } - if (count != null) { - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); - } + int numPending = state.getNumPending() - 1; + boolean noMorePending = state.decPending(1); + if (noMorePending) { + numPending = 0; + } + boolean active = owner.handleReply(r, numPending); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); + } + if (!active) { + state.clearPending(); + activeOwners.remove(owner); + } + } - if ( ! owner.handleReply(r, count - 1)) { - activeOwners.remove(owner); - } else { - activeOwners.put(owner, count - 1); - } + private static final class Sync extends AbstractQueuedSynchronizer { + Sync(int initialCount) { + setState(initialCount); + } + + int getCount() { + return getState(); + } + + @Override + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(int releases) { + // Increment/Decrement count; signal when transition downwards to zero. + // releases == 0 means unblock all + while ( true ) { + int c = getState(); + if ((c == 0) && (releases >= 0)) { return false; } + int nextc = (c > releases) ? ((releases != 0) ? c - releases : 0) : 0; + if (compareAndSetState(c, nextc)) { + return nextc == 0; } - } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); } + } + } + + private static final class OwnerState { + + private static final long REACT_LATENCY_ON_WATERMARK_MS = 5; + + private final Sync sync = new Sync(1); + + void addPending(int count) { + sync.releaseShared(-count); + } + + boolean decPending(int count) { + return sync.releaseShared(count); + } + + void waitMaxPendingBelow(int limit) { + try { + while (getNumPending() > limit) { + sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(REACT_LATENCY_ON_WATERMARK_MS)); + } + } catch (InterruptedException e) { + } + } + + int getNumPending() { + return sync.getCount(); + } + + void clearPending() { + sync.releaseShared(0); + } + + boolean waitPending(long timeoutMS) throws InterruptedException { + return sync.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(timeoutMS)); + } - monitor.notifyAll(); + void waitPending() throws InterruptedException { + sync.tryAcquireShared(1); } } |