summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-12 14:47:09 +0100
committerGitHub <noreply@github.com>2017-01-12 14:47:09 +0100
commitd433b1cd3fdad87f4fab647eeb7e564159ab768d (patch)
treee0fba22dcfea11e5f6fb544ad725132bb3bb7581 /vespaclient-core
parent6923e3d0690056be227f17d10d44543db2ccf2e4 (diff)
parentc6d2a4893a1e620325f2fd363b3065546efc2e64 (diff)
Merge pull request #1496 from yahoo/arnej/simplify-sharedsender-api
Arnej/simplify sharedsender api
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java155
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java12
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java13
3 files changed, 73 insertions, 107 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
index 48ff0eae36b..a29693aee1c 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -23,12 +23,8 @@ public class SharedSender implements ReplyHandler {
public static final Logger log = Logger.getLogger(SharedSender.class.getName());
private SendSession sender;
- private final Object monitor = new Object();
private RouteMetricSet metrics;
- // Maps from filename to number of pending requests
- private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
-
/**
* Creates a new shared sender.
* If oldsender != null, we copy that status information from that sender.
@@ -49,21 +45,8 @@ public class SharedSender implements ReplyHandler {
return metrics;
}
- public void remove(ResultCallback owner) {
- synchronized (monitor) {
- activeOwners.remove(owner);
- }
- }
-
public void shutdown() {
- try {
- synchronized (monitor) {
- while ( ! activeOwners.isEmpty()) {
- monitor.wait(180 * 1000);
- }
- }
- } catch (InterruptedException e) {
- }
+ // XXX may need to wait here?
sender.close();
}
@@ -76,36 +59,15 @@ public class SharedSender implements ReplyHandler {
* @return true if there were no more pending, or false if the timeout expired.
*/
public boolean waitForPending(ResultCallback owner, long timeoutMs) {
- long timeStart = SystemTimer.INSTANCE.milliTime();
- long timeLeft = timeoutMs;
-
try {
- while (timeoutMs == -1 || timeLeft > 0) {
- synchronized (monitor) {
- Integer count = activeOwners.get(owner);
- if (count == null || count == 0) {
- return true;
- } else if (timeLeft > 0) {
- monitor.wait(timeLeft);
- } else {
- monitor.wait();
- }
- }
-
- timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
- }
+ return owner.getPending().waitForZero(timeoutMs);
} catch (InterruptedException e) {
+ return false;
}
-
- return false;
}
public int getPendingCount(ResultCallback owner) {
- Integer count = activeOwners.get(owner);
- if (count == null) {
- return 0;
- }
- return count;
+ return owner.getPending().val();
}
/**
@@ -135,7 +97,7 @@ public class SharedSender implements ReplyHandler {
* @param owner A callback to send replies to when received from messagebus
*/
public void send(Message msg, ResultCallback owner) {
- send(msg, owner, -1, true);
+ send(msg, owner, true);
}
/**
@@ -144,43 +106,16 @@ public class SharedSender implements ReplyHandler {
*
* @param msg The message to send
* @param owner The callback to send replies to when received from messagebus
- * @param maxPendingPerOwner The maximum number of pending messages the callback
* @param blockingQueue If true, block until the message bus queue is available.
*/
- public void send(Message msg, ResultCallback owner, int maxPendingPerOwner, boolean blockingQueue) {
+ public void send(Message msg, ResultCallback owner, boolean blockingQueue) {
// Silently fail messages that are attempted sent after the callback aborted.
if (owner.isAborted()) {
return;
}
- try {
- synchronized (monitor) {
- if (maxPendingPerOwner != -1 && blockingQueue) {
- while (true) {
- Integer count = activeOwners.get(owner);
-
- if (count != null && count >= maxPendingPerOwner) {
- log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
- monitor.wait(10000);
- } else {
- break;
- }
- }
- }
-
- Integer count = activeOwners.get(owner);
-
- if (count == null) {
- activeOwners.put(owner, 1);
- } else {
- activeOwners.put(owner, count + 1);
- }
- }
- } catch (InterruptedException e) {
- return;
- }
-
msg.setContext(owner);
+ owner.getPending().inc();
try {
com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue);
@@ -203,44 +138,76 @@ public class SharedSender implements ReplyHandler {
*/
@Override
public void handleReply(Reply r) {
- synchronized (monitor) {
- ResultCallback owner = (ResultCallback) r.getContext();
-
- if (owner != null) {
- metrics.addReply(r);
+ ResultCallback owner = (ResultCallback) r.getContext();
- Integer count = activeOwners.get(owner);
-
- if (count != null) {
- if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
- }
+ if (owner != null) {
+ metrics.addReply(r);
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + owner.getPending().val());
+ }
+ if (owner.isAborted()) {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which is aborted");
+ owner.getPending().clear();
+ return;
+ }
+ if (owner.handleReply(r)) {
+ owner.getPending().dec();
+ } else {
+ log.log(LogLevel.WARNING, "Received reply for file " + owner.toString() + " which wants to abort");
+ owner.getPending().clear();
+ }
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ }
+ }
- if ( ! owner.handleReply(r, count - 1)) {
- activeOwners.remove(owner);
- } else {
- activeOwners.put(owner, count - 1);
+ public static class Pending {
+ private int value = 0;
+ public synchronized void inc() { ++value; }
+ public synchronized void dec() { if (--value == 0) notifyAll(); }
+ public synchronized void clear() { value = 0; notifyAll(); }
+ public synchronized int val() { return value; }
+ public synchronized boolean waitForZero() throws InterruptedException {
+ while (value > 0) {
+ wait();
+ }
+ return true;
+ }
+ public boolean waitForZero(long timeoutMs) throws InterruptedException {
+ if (timeoutMs == -1) {
+ return waitForZero();
+ } else {
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
+ while (timeLeft > 0) {
+ synchronized(this) {
+ if (value > 0) {
+ wait(timeLeft);
+ } else {
+ return true;
+ }
}
+ long elapsed = SystemTimer.INSTANCE.milliTime() - timeStart;
+ timeLeft = timeoutMs - elapsed;
}
- } else {
- log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ return false;
}
-
- monitor.notifyAll();
}
}
public interface ResultCallback {
- /** Return true if we should continue waiting for replies for this sender. */
- boolean handleReply(Reply r, int numPending);
+ /** get the associated Pending number */
+ public Pending getPending();
+
+ /** Return true unless we should abort this sender. */
+ boolean handleReply(Reply r);
/**
* Returns true if feeding has been aborted. No more feeding is allowed with this
* callback after that.
*/
boolean isAborted();
-
}
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
index a9a08562c9d..242cbf9db17 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
@@ -84,19 +84,13 @@ public class SingleSender implements SimpleFeedAccess {
return m;
}
- public void send(Message m) {
- send(m, -1);
- }
-
/**
- * Sends the given message, allowing a maximum of maxPending messages to be
- * sent for this sender.
+ * Sends the given message.
*
* @param m The message to send
- * @param maxPending The number of pending messages to block on for this sender.
*/
- public void send(Message m, int maxPending) {
- sender.send(processMessage(m), owner, maxPending, blockingQueue);
+ public void send(Message m) {
+ sender.send(processMessage(m), owner, blockingQueue);
}
public void done() {
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
index cdd4ac76c4c..738b4d3303a 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -35,6 +35,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
private final RouteMetricSet metrics;
private boolean abortOnError = false;
private boolean isAborted = false;
+ private final SharedSender.Pending pendingNumber = new SharedSender.Pending();
public FeedResponse(RouteMetricSet metrics) {
super(com.yahoo.jdisc.http.HttpResponse.Status.OK);
@@ -101,7 +102,7 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
return "";
}
- public boolean handleReply(Reply reply, int numPending) {
+ public boolean handleReply(Reply reply) {
metrics.addReply(reply);
if (reply.getTrace().getLevel() > 0) {
String str = reply.getTrace().toString();
@@ -123,12 +124,16 @@ public final class FeedResponse extends HttpResponse implements SharedSender.Res
log.finest(str);
addError(str);
}
- isAborted = abortOnError;
- return !abortOnError;
+ if (abortOnError) {
+ isAborted = true;
+ return false;
+ }
}
- return numPending > 0;
+ return true;
}
+ public SharedSender.Pending getPending() { return pendingNumber; }
+
public void done() {
metrics.done();
}