From c6d2a4893a1e620325f2fd363b3065546efc2e64 Mon Sep 17 00:00:00 2001 From: Arne H Juul Date: Thu, 12 Jan 2017 13:13:24 +0100 Subject: simplify SharedSender API * require ResultCallback owners to keep their own Pending number * no longer send numPending in handleReply() * remove unused remove() method * get rid of activeOwners map and associated monitor * waitForPending() now just delegates to the Pending number --- .../main/java/com/yahoo/feedapi/SharedSender.java | 133 +++++++++------------ .../java/com/yahoo/feedhandler/FeedResponse.java | 7 +- 2 files changed, 64 insertions(+), 76 deletions(-) (limited to 'vespaclient-core') diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 97b9ec93f56..a29693aee1c 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -23,12 +23,8 @@ public class SharedSender implements ReplyHandler { public static final Logger log = Logger.getLogger(SharedSender.class.getName()); private SendSession sender; - private final Object monitor = new Object(); private RouteMetricSet metrics; - // Maps from filename to number of pending requests - private Map activeOwners = new HashMap<>(); - /** * Creates a new shared sender. * If oldsender != null, we copy that status information from that sender. @@ -49,21 +45,8 @@ public class SharedSender implements ReplyHandler { return metrics; } - public void remove(ResultCallback owner) { - synchronized (monitor) { - activeOwners.remove(owner); - } - } - public void shutdown() { - try { - synchronized (monitor) { - while ( ! activeOwners.isEmpty()) { - monitor.wait(180 * 1000); - } - } - } catch (InterruptedException e) { - } + // XXX may need to wait here? sender.close(); } @@ -76,36 +59,15 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMs; - try { - while (timeoutMs == -1 || timeLeft > 0) { - synchronized (monitor) { - Integer count = activeOwners.get(owner); - if (count == null || count == 0) { - return true; - } else if (timeLeft > 0) { - monitor.wait(timeLeft); - } else { - monitor.wait(); - } - } - - timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); - } + return owner.getPending().waitForZero(timeoutMs); } catch (InterruptedException e) { + return false; } - - return false; } public int getPendingCount(ResultCallback owner) { - Integer count = activeOwners.get(owner); - if (count == null) { - return 0; - } - return count; + return owner.getPending().val(); } /** @@ -152,17 +114,8 @@ public class SharedSender implements ReplyHandler { return; } - synchronized (monitor) { - Integer count = activeOwners.get(owner); - - if (count == null) { - activeOwners.put(owner, 1); - } else { - activeOwners.put(owner, count + 1); - } - } - msg.setContext(owner); + owner.getPending().inc(); try { com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue); @@ -185,44 +138,76 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - synchronized (monitor) { - ResultCallback owner = (ResultCallback) r.getContext(); - - if (owner != null) { - metrics.addReply(r); + ResultCallback owner = (ResultCallback) r.getContext(); - Integer count = activeOwners.get(owner); - - if (count != null) { - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); - } + if (owner != null) { + metrics.addReply(r); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val()); + } + if (owner.isAborted()) { + log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which is aborted"); + owner.getPending().clear(); + return; + } + if (owner.handleReply(r)) { + owner.getPending().dec(); + } else { + log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which wants to abort"); + owner.getPending().clear(); + } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + } + } - if ( ! owner.handleReply(r, count - 1)) { - activeOwners.remove(owner); - } else { - activeOwners.put(owner, count - 1); + public static class Pending { + private int value = 0; + public synchronized void inc() { ++value; } + public synchronized void dec() { if (--value == 0) notifyAll(); } + public synchronized void clear() { value = 0; notifyAll(); } + public synchronized int val() { return value; } + public synchronized boolean waitForZero() throws InterruptedException { + while (value > 0) { + wait(); + } + return true; + } + public boolean waitForZero(long timeoutMs) throws InterruptedException { + if (timeoutMs == -1) { + return waitForZero(); + } else { + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMs; + while (timeLeft > 0) { + synchronized(this) { + if (value > 0) { + wait(timeLeft); + } else { + return true; + } } + long elapsed = SystemTimer.INSTANCE.milliTime() - timeStart; + timeLeft = timeoutMs - elapsed; } - } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + return false; } - - monitor.notifyAll(); } } public interface ResultCallback { - /** Return true if we should continue waiting for replies for this sender. */ - boolean handleReply(Reply r, int numPending); + /** get the associated Pending number */ + public Pending getPending(); + + /** Return true unless we should abort this sender. */ + boolean handleReply(Reply r); /** * Returns true if feeding has been aborted. No more feeding is allowed with this * callback after that. */ boolean isAborted(); - } } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java index 7492a48414b..738b4d3303a 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java @@ -35,6 +35,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res private final RouteMetricSet metrics; private boolean abortOnError = false; private boolean isAborted = false; + private final SharedSender.Pending pendingNumber = new SharedSender.Pending(); public FeedResponse(RouteMetricSet metrics) { super(com.yahoo.jdisc.http.HttpResponse.Status.OK); @@ -101,7 +102,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res return ""; } - public boolean handleReply(Reply reply, int numPending) { + public boolean handleReply(Reply reply) { metrics.addReply(reply); if (reply.getTrace().getLevel() > 0) { String str = reply.getTrace().toString(); @@ -128,9 +129,11 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res return false; } } - return numPending > 0; + return true; } + public SharedSender.Pending getPending() { return pendingNumber; } + public void done() { metrics.done(); } -- cgit v1.2.3