summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java133
1 files changed, 59 insertions, 74 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 97b9ec93f56..a29693aee1c 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -23,12 +23,8 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
private SendSession sender;
- private final Object monitor = new Object();
private RouteMetricSet metrics;
- // Maps from filename to number of pending requests
- private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
-
/**
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
@@ -49,21 +45,8 @@ public class SharedSender implements ReplyHandler {
return metrics;
}
- public void remove(ResultCallback owner) {
- synchronized (monitor) {
- activeOwners.remove(owner);
- }
- }
-
public void shutdown() {
- try {
- synchronized (monitor) {
- while ( ! activeOwners.isEmpty()) {
- monitor.wait(180 * 1000);
- }
- }
- } catch (InterruptedException e) {
- }
+ // XXX may need to wait here?
sender.close();
}
@@ -76,36 +59,15 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMs;
-
try {
- while (timeoutMs == -1 || timeLeft > 0) {
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
- if (count == null || count == 0) {
- return true;
- } else if (timeLeft > 0) {
- monitor.wait(timeLeft);
- } else {
- monitor.wait();
- }
- }
-
- timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
- }
+ return owner.getPending().waitForZero(timeoutMs);
} catch (InterruptedException e) {
+ return false;
}
-
- return false;
}
public int getPendingCount(ResultCallback owner) {
- Integer count = activeOwners.get(owner);
- if (count == null) {
- return 0;
- }
- return count;
+ return owner.getPending().val();
}
/**
@@ -152,17 +114,8 @@ public class SharedSender implements ReplyHandler {
return;
}
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
-
- if (count == null) {
- activeOwners.put(owner, 1);
- } else {
- activeOwners.put(owner, count + 1);
- }
- }
-
msg.setContext(owner);
+ owner.getPending().inc();
try {
com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue);
@@ -185,44 +138,76 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- synchronized (monitor) {
- ResultCallback owner = (ResultCallback) r.getContext();
-
- if (owner != null) {
- metrics.addReply(r);
+ ResultCallback owner = (ResultCallback) r.getContext();
- Integer count = activeOwners.get(owner);
-
- if (count != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
- }
+ if (owner != null) {
+ metrics.addReply(r);
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val());
+ }
+ if (owner.isAborted()) {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which is aborted");
+ owner.getPending().clear();
+ return;
+ }
+ if (owner.handleReply(r)) {
+ owner.getPending().dec();
+ } else {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which wants to abort");
+ owner.getPending().clear();
+ }
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ }
+ }
- if ( ! owner.handleReply(r, count - 1)) {
- activeOwners.remove(owner);
- } else {
- activeOwners.put(owner, count - 1);
+ public static class Pending {
+ private int value = 0;
+ public synchronized void inc() { ++value; }
+ public synchronized void dec() { if (--value == 0) notifyAll(); }
+ public synchronized void clear() { value = 0; notifyAll(); }
+ public synchronized int val() { return value; }
+ public synchronized boolean waitForZero() throws InterruptedException {
+ while (value > 0) {
+ wait();
+ }
+ return true;
+ }
+ public boolean waitForZero(long timeoutMs) throws InterruptedException {
+ if (timeoutMs == -1) {
+ return waitForZero();
+ } else {
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
+ while (timeLeft > 0) {
+ synchronized(this) {
+ if (value > 0) {
+ wait(timeLeft);
+ } else {
+ return true;
+ }
}
+ long elapsed = SystemTimer.INSTANCE.milliTime() - timeStart;
+ timeLeft = timeoutMs - elapsed;
}
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ return false;
}
-
- monitor.notifyAll();
}
}
public interface ResultCallback {
- /** Return true if we should continue waiting for replies for this sender. */
- boolean handleReply(Reply r, int numPending);
+ /** get the associated Pending number */
+ public Pending getPending();
+
+ /** Return true unless we should abort this sender. */
+ boolean handleReply(Reply r);
/**
* Returns true if feeding has been aborted. No more feeding is allowed with this
* callback after that.
*/
boolean isAborted();
-
}
}