diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java | 57 |
1 files changed, 14 insertions, 43 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java index b9aeb2e6e69..52d21897a15 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java @@ -23,24 +23,17 @@ public class SharedSender implements ReplyHandler { public static final Logger log = Logger.getLogger(SharedSender.class.getName()); - private SendSession sender; - private RouteMetricSet metrics; - private Pending globalPending = new Pending(); + private final SendSession sender; + private final RouteMetricSet metrics; + private final Pending globalPending = new Pending(); /** * Creates a new shared sender. * If oldsender != null, we copy that status information from that sender. */ - public SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) { - if (factory != null) { - sender = factory.createSendSession(this, metric); - } - - if (oldSender != null) { - this.metrics = oldSender.metrics; - } else { - metrics = new RouteMetricSet(route, null); - } + SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) { + sender = (factory != null) ? factory.createSendSession(this, metric) : null; + metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null); } public RouteMetricSet getMetrics() { @@ -63,7 +56,7 @@ public class SharedSender implements ReplyHandler { * @param timeoutMs The number of milliseconds to wait, or -1 to wait indefinitely. * @return true if there were no more pending, or false if the timeout expired. */ - public boolean waitForPending(ResultCallback owner, long timeoutMs) { + boolean waitForPending(ResultCallback owner, long timeoutMs) { try { return owner.getPending().waitForZero(timeoutMs); } catch (InterruptedException e) { @@ -71,30 +64,6 @@ public class SharedSender implements ReplyHandler { } } - public int getPendingCount(ResultCallback owner) { - return owner.getPending().val(); - } - - /** - * Returns true if the given result callback has any pending messages with this - * sender. - * - * @param owner The callback to check - * @return True if there are any pending, false if not. - */ - public boolean hasPending(ResultCallback owner) { - return getPendingCount(owner) > 0; - } - - /** - * Waits until the given file has no pending documents. - * - * @param owner the file to check for pending documents - */ - public void waitForPending(ResultCallback owner) { - waitForPending(owner, -1); - } - /** * Sends a message * @@ -153,7 +122,9 @@ public class SharedSender implements ReplyHandler { ResultCallback owner = (ResultCallback) r.getContext(); if (owner != null) { - metrics.addReply(r); + synchronized (metrics) { + metrics.addReply(r); + } if (log.isLoggable(LogLevel.SPAM)) { log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val()); } @@ -176,16 +147,16 @@ public class SharedSender implements ReplyHandler { public static class Pending { private int value = 0; public synchronized void inc() { ++value; } - public synchronized void dec() { if (--value == 0) notifyAll(); } + synchronized void dec() { if (--value == 0) notifyAll(); } public synchronized void clear() { value = 0; notifyAll(); } public synchronized int val() { return value; } - public synchronized boolean waitForZero() throws InterruptedException { + synchronized boolean waitForZero() throws InterruptedException { while (value > 0) { wait(); } return true; } - public boolean waitForZero(long timeoutMs) throws InterruptedException { + boolean waitForZero(long timeoutMs) throws InterruptedException { if (timeoutMs == -1) { return waitForZero(); } else { @@ -210,7 +181,7 @@ public class SharedSender implements ReplyHandler { public interface ResultCallback { /** get the associated Pending number */ - public Pending getPending(); + Pending getPending(); /** Return true unless we should abort this sender. */ boolean handleReply(Reply r); |