From 920f825d169bc05ff00ad7f2661305589cf70994 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sat, 7 Jan 2017 19:01:23 +0100 Subject: Revert "Revert "Revert "Revert "Balder/avoid costly notifyall in sharedsender"""" --- .../messagebus/network/local/LocalNetworkTest.java | 2 +- .../com/yahoo/storage/searcher/GetSearcher.java | 4 +- .../main/java/com/yahoo/feedapi/SharedSender.java | 194 ++++++++++++--------- .../java/com/yahoo/feedhandler/FeedResponse.java | 4 +- 4 files changed, 120 insertions(+), 84 deletions(-) diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index d9f8b05e4e9..95cf7639d09 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -109,7 +109,7 @@ public class LocalNetworkTest { assertEquals(ErrorCode.TIMEOUT, res.getError().getCode()); assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ")); long end = System.currentTimeMillis(); - assertTrue(end - start >= TIMEOUT); + assertTrue(end - start >= (TIMEOUT*0.98)); // Different clocks are used.... assertTrue(end - start < 2*TIMEOUT); msg = serverB.messages.poll(60, TimeUnit.SECONDS); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/storage/searcher/GetSearcher.java b/vespaclient-container-plugin/src/main/java/com/yahoo/storage/searcher/GetSearcher.java index 42ca9d45599..5723483603b 100755 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/storage/searcher/GetSearcher.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/storage/searcher/GetSearcher.java @@ -97,7 +97,7 @@ public class GetSearcher extends Searcher { return sw.toString(); } - public boolean handleReply(Reply reply, int numPending) { + public boolean handleReply(Reply reply) { if ((reply.getTrace().getLevel() > 0) && log.isLoggable(LogLevel.DEBUG)) { String str = reply.getTrace().toString(); log.log(LogLevel.DEBUG, str); @@ -119,7 +119,7 @@ public class GetSearcher extends Searcher { } } - return (numPending > 0); + return true; } private void addDocumentHit(Reply reply) { diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 48ff0eae36b..b0e52357a64 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -6,9 +6,8 @@ import com.yahoo.jdisc.Metric; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.*; import com.yahoo.clientmetrics.RouteMetricSet; - -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; /** @@ -26,8 +25,7 @@ public class SharedSender implements ReplyHandler { private final Object monitor = new Object(); private RouteMetricSet metrics; - // Maps from filename to number of pending requests - private Map activeOwners = new HashMap<>(); + private ConcurrentHashMap activeOwners = new ConcurrentHashMap<>(); /** * Creates a new shared sender. @@ -50,17 +48,13 @@ public class SharedSender implements ReplyHandler { } public void remove(ResultCallback owner) { - synchronized (monitor) { - activeOwners.remove(owner); - } + activeOwners.remove(owner); } public void shutdown() { try { - synchronized (monitor) { - while ( ! activeOwners.isEmpty()) { - monitor.wait(180 * 1000); - } + while ( ! activeOwners.isEmpty()) { + Thread.sleep(10); } } catch (InterruptedException e) { } @@ -76,36 +70,29 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMs; - - try { - while (timeoutMs == -1 || timeLeft > 0) { - synchronized (monitor) { - Integer count = activeOwners.get(owner); - if (count == null || count == 0) { - return true; - } else if (timeLeft > 0) { - monitor.wait(timeLeft); - } else { - monitor.wait(); - } - } - - timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); + OwnerState state = activeOwners.get(owner); + if (state != null) { + try { + return state.waitPending(timeoutMs); + } catch (InterruptedException e) { + return false; } - } catch (InterruptedException e) { } - return false; + return true; } - public int getPendingCount(ResultCallback owner) { - Integer count = activeOwners.get(owner); - if (count == null) { - return 0; + private OwnerState getNonNullState(ResultCallback owner) { + OwnerState state = activeOwners.get(owner); + if (state == null) { + throw new IllegalStateException("No active callback : " + owner.toString()); } - return count; + return state; + } + + public int getPendingCount(ResultCallback owner) { + OwnerState state = activeOwners.get(owner); + return (state != null) ? state.getNumPending() : 0; } /** @@ -125,7 +112,12 @@ public class SharedSender implements ReplyHandler { * @param owner the file to check for pending documents */ public void waitForPending(ResultCallback owner) { - waitForPending(owner, -1); + OwnerState state = activeOwners.get(owner); + if (state != null) { + try { + state.waitPending(); + } catch (InterruptedException e) { } + } } /** @@ -153,33 +145,16 @@ public class SharedSender implements ReplyHandler { return; } - try { - synchronized (monitor) { - if (maxPendingPerOwner != -1 && blockingQueue) { - while (true) { - Integer count = activeOwners.get(owner); - - if (count != null && count >= maxPendingPerOwner) { - log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); - monitor.wait(10000); - } else { - break; - } - } - } - - Integer count = activeOwners.get(owner); - - if (count == null) { - activeOwners.put(owner, 1); - } else { - activeOwners.put(owner, count + 1); - } - } - } catch (InterruptedException e) { - return; + OwnerState state = activeOwners.get(owner); + if (state == null) { + state = new OwnerState(); + activeOwners.put(owner, state); + } + if (maxPendingPerOwner != -1 && blockingQueue) { + state.waitMaxPendingbelow(maxPendingPerOwner); } + state.addPending(1); msg.setContext(owner); try { @@ -203,37 +178,98 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - synchronized (monitor) { - ResultCallback owner = (ResultCallback) r.getContext(); + ResultCallback owner = (ResultCallback) r.getContext(); - if (owner != null) { - metrics.addReply(r); + if (owner != null) { + metrics.addReply(r); + boolean active = owner.handleReply(r); + OwnerState state = activeOwners.get(owner); - Integer count = activeOwners.get(owner); + if (state != null) { + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + ", count was " + state.getNumPending()); + } + if (!active) { + state.clearPending(); + activeOwners.remove(owner); + } else if ((state.decPending(1) <= 0)) { + activeOwners.remove(owner); + } + } else { + // TODO: should be debug level if at all. + log.log(LogLevel.WARNING, "Owner " + owner.toString() + " is not active"); + } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + } + } - if (count != null) { - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); - } - if ( ! owner.handleReply(r, count - 1)) { - activeOwners.remove(owner); - } else { - activeOwners.put(owner, count - 1); + public static class OwnerState { + + final AtomicInteger numPending = new AtomicInteger(0); + + int addPending(int count) { + return numPending.addAndGet(count); + } + + int decPending(int count) { + int newValue = numPending.addAndGet(-count); + if (newValue <= 0) { + synchronized (numPending) { + numPending.notify(); + } + } + return newValue; + } + + void waitMaxPendingbelow(int limit) { + try { + synchronized (numPending) { + while (numPending.get() > limit) { + numPending.wait(5); } } - } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + } catch (InterruptedException e) { } + } + + int getNumPending() { + return numPending.get(); + } + + void clearPending() { + numPending.set(0); + synchronized (numPending) { + numPending.notify(); + } + } - monitor.notifyAll(); + boolean waitPending(long timeoutMS) throws InterruptedException { + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMS; + synchronized (numPending) { + while ((numPending.get() > 0) && (timeLeft > 0)) { + numPending.wait(timeLeft); + timeLeft = timeoutMS - (SystemTimer.INSTANCE.milliTime() - timeStart); + } + } + return numPending.get() <= 0; + } + + void waitPending() throws InterruptedException { + synchronized (numPending) { + while (numPending.get() > 0) { + numPending.wait(); + } + } } } public interface ResultCallback { /** Return true if we should continue waiting for replies for this sender. */ - boolean handleReply(Reply r, int numPending); + boolean handleReply(Reply r); /** * Returns true if feeding has been aborted. No more feeding is allowed with this diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java index cdd4ac76c4c..f5d717d35e0 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java @@ -101,7 +101,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res return ""; } - public boolean handleReply(Reply reply, int numPending) { + public boolean handleReply(Reply reply) { metrics.addReply(reply); if (reply.getTrace().getLevel() > 0) { String str = reply.getTrace().toString(); @@ -126,7 +126,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res isAborted = abortOnError; return !abortOnError; } - return numPending > 0; + return true; } public void done() { -- cgit v1.2.3