summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java57
1 files changed, 14 insertions, 43 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index b9aeb2e6e69..52d21897a15 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -23,24 +23,17 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
- private SendSession sender;
- private RouteMetricSet metrics;
- private Pending globalPending = new Pending();
+ private final SendSession sender;
+ private final RouteMetricSet metrics;
+ private final Pending globalPending = new Pending();
/**
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
*/
- public SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) {
- if (factory != null) {
- sender = factory.createSendSession(this, metric);
- }
-
- if (oldSender != null) {
- this.metrics = oldSender.metrics;
- } else {
- metrics = new RouteMetricSet(route, null);
- }
+ SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) {
+ sender = (factory != null) ? factory.createSendSession(this, metric) : null;
+ metrics = (oldSender != null) ? oldSender.metrics : new RouteMetricSet(route, null);
}
public RouteMetricSet getMetrics() {
@@ -63,7 +56,7 @@ public class SharedSender implements ReplyHandler {
* @param timeoutMs The number of milliseconds to wait, or -1 to wait indefinitely.
* @return true if there were no more pending, or false if the timeout expired.
*/
- public boolean waitForPending(ResultCallback owner, long timeoutMs) {
+ boolean waitForPending(ResultCallback owner, long timeoutMs) {
try {
return owner.getPending().waitForZero(timeoutMs);
} catch (InterruptedException e) {
@@ -71,30 +64,6 @@ public class SharedSender implements ReplyHandler {
}
}
- public int getPendingCount(ResultCallback owner) {
- return owner.getPending().val();
- }
-
- /**
- * Returns true if the given result callback has any pending messages with this
- * sender.
- *
- * @param owner The callback to check
- * @return True if there are any pending, false if not.
- */
- public boolean hasPending(ResultCallback owner) {
- return getPendingCount(owner) > 0;
- }
-
- /**
- * Waits until the given file has no pending documents.
- *
- * @param owner the file to check for pending documents
- */
- public void waitForPending(ResultCallback owner) {
- waitForPending(owner, -1);
- }
-
/**
* Sends a message
*
@@ -153,7 +122,9 @@ public class SharedSender implements ReplyHandler {
ResultCallback owner = (ResultCallback) r.getContext();
if (owner != null) {
- metrics.addReply(r);
+ synchronized (metrics) {
+ metrics.addReply(r);
+ }
if (log.isLoggable(LogLevel.SPAM)) {
log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val());
}
@@ -176,16 +147,16 @@ public class SharedSender implements ReplyHandler {
public static class Pending {
private int value = 0;
public synchronized void inc() { ++value; }
- public synchronized void dec() { if (--value == 0) notifyAll(); }
+ synchronized void dec() { if (--value == 0) notifyAll(); }
public synchronized void clear() { value = 0; notifyAll(); }
public synchronized int val() { return value; }
- public synchronized boolean waitForZero() throws InterruptedException {
+ synchronized boolean waitForZero() throws InterruptedException {
while (value > 0) {
wait();
}
return true;
}
- public boolean waitForZero(long timeoutMs) throws InterruptedException {
+ boolean waitForZero(long timeoutMs) throws InterruptedException {
if (timeoutMs == -1) {
return waitForZero();
} else {
@@ -210,7 +181,7 @@ public class SharedSender implements ReplyHandler {
public interface ResultCallback {
/** get the associated Pending number */
- public Pending getPending();
+ Pending getPending();
/** Return true unless we should abort this sender. */
boolean handleReply(Reply r);