diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-08 20:57:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-08 20:57:30 +0100 |
commit | 1264780883ef73482d7b386b30a542dd07f10c6e (patch) | |
tree | 234a30d62d027917d47cb53e59189707af22d64c /vespaclient-core | |
parent | 3cbce5fb9a8100dd931aaeb9ed04ec47d2de9dbd (diff) |
Revert "Revert "Revert "Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender"""""""
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java | 192 |
1 files changed, 78 insertions, 114 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 00f5606b93a..48ff0eae36b 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -6,8 +6,9 @@ import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; + +import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; /** @@ -25,7 +26,8 @@ public class SharedSender implements ReplyHandler { private final Object monitor = new Object(); private RouteMetricSet metrics; - private ConcurrentHashMap<ResultCallback, OwnerState> activeOwners = new ConcurrentHashMap<>(); + // Maps from filename to number of pending requests + private Map<ResultCallback, Integer> activeOwners = new HashMap<>(); /** * Creates a new shared sender. @@ -48,13 +50,17 @@ public class SharedSender implements ReplyHandler { } public void remove(ResultCallback owner) { - activeOwners.remove(owner); + synchronized (monitor) { + activeOwners.remove(owner); + } } public void shutdown() { try { - while ( ! activeOwners.isEmpty()) { - Thread.sleep(10); + synchronized (monitor) { + while ( ! activeOwners.isEmpty()) { + monitor.wait(180 * 1000); + } } } catch (InterruptedException e) { } @@ -70,29 +76,36 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - OwnerState state = activeOwners.get(owner); - if (state != null) { - try { - return state.waitPending(timeoutMs); - } catch (InterruptedException e) { - return false; - } - } + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMs; - return true; - } + try { + while (timeoutMs == -1 || timeLeft > 0) { + synchronized (monitor) { + Integer count = activeOwners.get(owner); + if (count == null || count == 0) { + return true; + } else if (timeLeft > 0) { + monitor.wait(timeLeft); + } else { + monitor.wait(); + } + } - private OwnerState getNonNullState(ResultCallback owner) { - OwnerState state = activeOwners.get(owner); - if (state == null) { - throw new IllegalStateException("No active callback : " + owner.toString()); + timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); + } + } catch (InterruptedException e) { } - return state; + + return false; } public int getPendingCount(ResultCallback owner) { - OwnerState state = activeOwners.get(owner); - return (state != null) ? state.getNumPending() : 0; + Integer count = activeOwners.get(owner); + if (count == null) { + return 0; + } + return count; } /** @@ -112,12 +125,7 @@ public class SharedSender implements ReplyHandler { * @param owner the file to check for pending documents */ public void waitForPending(ResultCallback owner) { - OwnerState state = activeOwners.get(owner); - if (state != null) { - try { - state.waitPending(); - } catch (InterruptedException e) { } - } + waitForPending(owner, -1); } /** @@ -145,16 +153,33 @@ public class SharedSender implements ReplyHandler { return; } - OwnerState state = activeOwners.get(owner); - if (state == null) { - state = new OwnerState(); - activeOwners.put(owner, state); - } - if (maxPendingPerOwner != -1 && blockingQueue) { - state.waitMaxPendingbelow(maxPendingPerOwner); + try { + synchronized (monitor) { + if (maxPendingPerOwner != -1 && blockingQueue) { + while (true) { + Integer count = activeOwners.get(owner); + + if (count != null && count >= maxPendingPerOwner) { + log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); + monitor.wait(10000); + } else { + break; + } + } + } + + Integer count = activeOwners.get(owner); + + if (count == null) { + activeOwners.put(owner, 1); + } else { + activeOwners.put(owner, count + 1); + } + } + } catch (InterruptedException e) { + return; } - state.addPending(1); msg.setContext(owner); try { @@ -178,91 +203,30 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - ResultCallback owner = (ResultCallback) r.getContext(); + synchronized (monitor) { + ResultCallback owner = (ResultCallback) r.getContext(); - if (owner != null) { - metrics.addReply(r); - OwnerState state = activeOwners.get(owner); - boolean active = owner.handleReply(r, state.getNumPending() - 1); + if (owner != null) { + metrics.addReply(r); - if (state != null) { - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); - } - if (!active) { - state.clearPending(); - activeOwners.remove(owner); - } else { - state.decPending(1); - } - } else { - // TODO: should be debug level if at all. - log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); - } - } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); - } - } - - - public static class OwnerState { - - final AtomicInteger numPending = new AtomicInteger(0); - - int addPending(int count) { - return numPending.addAndGet(count); - } - - int decPending(int count) { - int newValue = numPending.addAndGet(-count); - if (newValue <= 0) { - synchronized (numPending) { - numPending.notify(); - } - } - return newValue; - } + Integer count = activeOwners.get(owner); - void waitMaxPendingbelow(int limit) { - try { - synchronized (numPending) { - while (numPending.get() > limit) { - numPending.wait(5); + if (count != null) { + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); } - } - } catch (InterruptedException e) { - } - } - int getNumPending() { - return numPending.get(); - } - - void clearPending() { - numPending.set(0); - synchronized (numPending) { - numPending.notify(); - } - } - - boolean waitPending(long timeoutMS) throws InterruptedException { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMS; - synchronized (numPending) { - while ((numPending.get() > 0) && (timeLeft > 0)) { - numPending.wait(timeLeft); - timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); + if ( ! owner.handleReply(r, count - 1)) { + activeOwners.remove(owner); + } else { + activeOwners.put(owner, count - 1); + } } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); } - return numPending.get() <= 0; - } - void waitPending() throws InterruptedException { - synchronized (numPending) { - while (numPending.get() > 0) { - numPending.wait(); - } - } + monitor.notifyAll(); } } |