diff options
Diffstat (limited to 'vespaclient-core')
3 files changed, 73 insertions, 107 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index 48ff0eae36b..a29693aee1c 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -23,12 +23,8 @@ public class SharedSender implements ReplyHandler { public static final Logger log = Logger.getLogger(SharedSender.class.getName()); private SendSession sender; - private final Object monitor = new Object(); private RouteMetricSet metrics; - // Maps from filename to number of pending requests - private Map<ResultCallback, Integer> activeOwners = new HashMap<>(); - /** * Creates a new shared sender. * If oldsender != null, we copy that status information from that sender. @@ -49,21 +45,8 @@ public class SharedSender implements ReplyHandler { return metrics; } - public void remove(ResultCallback owner) { - synchronized (monitor) { - activeOwners.remove(owner); - } - } - public void shutdown() { - try { - synchronized (monitor) { - while ( ! activeOwners.isEmpty()) { - monitor.wait(180 * 1000); - } - } - } catch (InterruptedException e) { - } + // XXX may need to wait here? sender.close(); } @@ -76,36 +59,15 @@ public class SharedSender implements ReplyHandler { * @return true if there were no more pending, or false if the timeout expired. */ public boolean waitForPending(ResultCallback owner, long timeoutMs) { - long timeStart = SystemTimer.INSTANCE.milliTime(); - long timeLeft = timeoutMs; - try { - while (timeoutMs == -1 || timeLeft > 0) { - synchronized (monitor) { - Integer count = activeOwners.get(owner); - if (count == null || count == 0) { - return true; - } else if (timeLeft > 0) { - monitor.wait(timeLeft); - } else { - monitor.wait(); - } - } - - timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart); - } + return owner.getPending().waitForZero(timeoutMs); } catch (InterruptedException e) { + return false; } - - return false; } public int getPendingCount(ResultCallback owner) { - Integer count = activeOwners.get(owner); - if (count == null) { - return 0; - } - return count; + return owner.getPending().val(); } /** @@ -135,7 +97,7 @@ public class SharedSender implements ReplyHandler { * @param owner A callback to send replies to when received from messagebus */ public void send(Message msg, ResultCallback owner) { - send(msg, owner, -1, true); + send(msg, owner, true); } /** @@ -144,43 +106,16 @@ public class SharedSender implements ReplyHandler { * * @param msg The message to send * @param owner The callback to send replies to when received from messagebus - * @param maxPendingPerOwner The maximum number of pending messages the callback * @param blockingQueue If true, block until the message bus queue is available. */ - public void send(Message msg, ResultCallback owner, int maxPendingPerOwner, boolean blockingQueue) { + public void send(Message msg, ResultCallback owner, boolean blockingQueue) { // Silently fail messages that are attempted sent after the callback aborted. if (owner.isAborted()) { return; } - try { - synchronized (monitor) { - if (maxPendingPerOwner != -1 && blockingQueue) { - while (true) { - Integer count = activeOwners.get(owner); - - if (count != null && count >= maxPendingPerOwner) { - log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies"); - monitor.wait(10000); - } else { - break; - } - } - } - - Integer count = activeOwners.get(owner); - - if (count == null) { - activeOwners.put(owner, 1); - } else { - activeOwners.put(owner, count + 1); - } - } - } catch (InterruptedException e) { - return; - } - msg.setContext(owner); + owner.getPending().inc(); try { com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue); @@ -203,44 +138,76 @@ public class SharedSender implements ReplyHandler { */ @Override public void handleReply(Reply r) { - synchronized (monitor) { - ResultCallback owner = (ResultCallback) r.getContext(); - - if (owner != null) { - metrics.addReply(r); + ResultCallback owner = (ResultCallback) r.getContext(); - Integer count = activeOwners.get(owner); - - if (count != null) { - if (log.isLoggable(LogLevel.SPAM)) { - log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count); - } + if (owner != null) { + metrics.addReply(r); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val()); + } + if (owner.isAborted()) { + log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which is aborted"); + owner.getPending().clear(); + return; + } + if (owner.handleReply(r)) { + owner.getPending().dec(); + } else { + log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which wants to abort"); + owner.getPending().clear(); + } + } else { + log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + } + } - if ( ! owner.handleReply(r, count - 1)) { - activeOwners.remove(owner); - } else { - activeOwners.put(owner, count - 1); + public static class Pending { + private int value = 0; + public synchronized void inc() { ++value; } + public synchronized void dec() { if (--value == 0) notifyAll(); } + public synchronized void clear() { value = 0; notifyAll(); } + public synchronized int val() { return value; } + public synchronized boolean waitForZero() throws InterruptedException { + while (value > 0) { + wait(); + } + return true; + } + public boolean waitForZero(long timeoutMs) throws InterruptedException { + if (timeoutMs == -1) { + return waitForZero(); + } else { + long timeStart = SystemTimer.INSTANCE.milliTime(); + long timeLeft = timeoutMs; + while (timeLeft > 0) { + synchronized(this) { + if (value > 0) { + wait(timeLeft); + } else { + return true; + } } + long elapsed = SystemTimer.INSTANCE.milliTime() - timeStart; + timeLeft = timeoutMs - elapsed; } - } else { - log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context"); + return false; } - - monitor.notifyAll(); } } public interface ResultCallback { - /** Return true if we should continue waiting for replies for this sender. */ - boolean handleReply(Reply r, int numPending); + /** get the associated Pending number */ + public Pending getPending(); + + /** Return true unless we should abort this sender. */ + boolean handleReply(Reply r); /** * Returns true if feeding has been aborted. No more feeding is allowed with this * callback after that. */ boolean isAborted(); - } } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java index a9a08562c9d..242cbf9db17 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java @@ -84,19 +84,13 @@ public class SingleSender implements SimpleFeedAccess { return m; } - public void send(Message m) { - send(m, -1); - } - /** - * Sends the given message, allowing a maximum of maxPending messages to be - * sent for this sender. + * Sends the given message. * * @param m The message to send - * @param maxPending The number of pending messages to block on for this sender. */ - public void send(Message m, int maxPending) { - sender.send(processMessage(m), owner, maxPending, blockingQueue); + public void send(Message m) { + sender.send(processMessage(m), owner, blockingQueue); } public void done() { diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java index cdd4ac76c4c..738b4d3303a 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java @@ -35,6 +35,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res private final RouteMetricSet metrics; private boolean abortOnError = false; private boolean isAborted = false; + private final SharedSender.Pending pendingNumber = new SharedSender.Pending(); public FeedResponse(RouteMetricSet metrics) { super(com.yahoo.jdisc.http.HttpResponse.Status.OK); @@ -101,7 +102,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res return ""; } - public boolean handleReply(Reply reply, int numPending) { + public boolean handleReply(Reply reply) { metrics.addReply(reply); if (reply.getTrace().getLevel() > 0) { String str = reply.getTrace().toString(); @@ -123,12 +124,16 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res log.finest(str); addError(str); } - isAborted = abortOnError; - return !abortOnError; + if (abortOnError) { + isAborted = true; + return false; + } } - return numPending > 0; + return true; } + public SharedSender.Pending getPending() { return pendingNumber; } + public void done() { metrics.done(); } |