summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2017-01-12 13:13:24 +0100
committerArne H Juul <arnej@yahoo-inc.com>2017-01-12 13:13:24 +0100
commitc6d2a4893a1e620325f2fd363b3065546efc2e64 (patch)
treeb2160ceb7bf9d4a5d322606d7e614e5b1645368c /vespaclient-core
parentc3e78ef7a72efc0ccd419acf6ac719eb6effdc3d (diff)
simplify SharedSender API
* require ResultCallback owners to keep their own Pending number * no longer send numPending in handleReply() * remove unused remove() method * get rid of activeOwners map and associated monitor * waitForPending() now just delegates to the Pending number
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java133
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java7
2 files changed, 64 insertions, 76 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 97b9ec93f56..a29693aee1c 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -23,12 +23,8 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
private SendSession sender;
- private final Object monitor = new Object();
private RouteMetricSet metrics;
- // Maps from filename to number of pending requests
- private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
-
/**
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
@@ -49,21 +45,8 @@ public class SharedSender implements ReplyHandler {
return metrics;
}
- public void remove(ResultCallback owner) {
- synchronized (monitor) {
- activeOwners.remove(owner);
- }
- }
-
public void shutdown() {
- try {
- synchronized (monitor) {
- while ( ! activeOwners.isEmpty()) {
- monitor.wait(180 * 1000);
- }
- }
- } catch (InterruptedException e) {
- }
+ // XXX may need to wait here?
sender.close();
}
@@ -76,36 +59,15 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMs;
-
try {
- while (timeoutMs == -1 || timeLeft > 0) {
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
- if (count == null || count == 0) {
- return true;
- } else if (timeLeft > 0) {
- monitor.wait(timeLeft);
- } else {
- monitor.wait();
- }
- }
-
- timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
- }
+ return owner.getPending().waitForZero(timeoutMs);
} catch (InterruptedException e) {
+ return false;
}
-
- return false;
}
public int getPendingCount(ResultCallback owner) {
- Integer count = activeOwners.get(owner);
- if (count == null) {
- return 0;
- }
- return count;
+ return owner.getPending().val();
}
/**
@@ -152,17 +114,8 @@ public class SharedSender implements ReplyHandler {
return;
}
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
-
- if (count == null) {
- activeOwners.put(owner, 1);
- } else {
- activeOwners.put(owner, count + 1);
- }
- }
-
msg.setContext(owner);
+ owner.getPending().inc();
try {
com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue);
@@ -185,44 +138,76 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- synchronized (monitor) {
- ResultCallback owner = (ResultCallback) r.getContext();
-
- if (owner != null) {
- metrics.addReply(r);
+ ResultCallback owner = (ResultCallback) r.getContext();
- Integer count = activeOwners.get(owner);
-
- if (count != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
- }
+ if (owner != null) {
+ metrics.addReply(r);
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val());
+ }
+ if (owner.isAborted()) {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which is aborted");
+ owner.getPending().clear();
+ return;
+ }
+ if (owner.handleReply(r)) {
+ owner.getPending().dec();
+ } else {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which wants to abort");
+ owner.getPending().clear();
+ }
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ }
+ }
- if ( ! owner.handleReply(r, count - 1)) {
- activeOwners.remove(owner);
- } else {
- activeOwners.put(owner, count - 1);
+ public static class Pending {
+ private int value = 0;
+ public synchronized void inc() { ++value; }
+ public synchronized void dec() { if (--value == 0) notifyAll(); }
+ public synchronized void clear() { value = 0; notifyAll(); }
+ public synchronized int val() { return value; }
+ public synchronized boolean waitForZero() throws InterruptedException {
+ while (value > 0) {
+ wait();
+ }
+ return true;
+ }
+ public boolean waitForZero(long timeoutMs) throws InterruptedException {
+ if (timeoutMs == -1) {
+ return waitForZero();
+ } else {
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
+ while (timeLeft > 0) {
+ synchronized(this) {
+ if (value > 0) {
+ wait(timeLeft);
+ } else {
+ return true;
+ }
}
+ long elapsed = SystemTimer.INSTANCE.milliTime() - timeStart;
+ timeLeft = timeoutMs - elapsed;
}
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ return false;
}
-
- monitor.notifyAll();
}
}
public interface ResultCallback {
- /** Return true if we should continue waiting for replies for this sender. */
- boolean handleReply(Reply r, int numPending);
+ /** get the associated Pending number */
+ public Pending getPending();
+
+ /** Return true unless we should abort this sender. */
+ boolean handleReply(Reply r);
/**
* Returns true if feeding has been aborted. No more feeding is allowed with this
* callback after that.
*/
boolean isAborted();
-
}
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
index 7492a48414b..738b4d3303a 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -35,6 +35,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
private final RouteMetricSet metrics;
private boolean abortOnError = false;
private boolean isAborted = false;
+ private final SharedSender.Pending pendingNumber = new SharedSender.Pending();
public FeedResponse(RouteMetricSet metrics) {
super(com.yahoo.jdisc.http.HttpResponse.Status.OK);
@@ -101,7 +102,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
return "";
}
- public boolean handleReply(Reply reply, int numPending) {
+ public boolean handleReply(Reply reply) {
metrics.addReply(reply);
if (reply.getTrace().getLevel() > 0) {
String str = reply.getTrace().toString();
@@ -128,9 +129,11 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
return false;
}
}
- return numPending > 0;
+ return true;
}
+ public SharedSender.Pending getPending() { return pendingNumber; }
+
public void done() {
metrics.done();
}