aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-04 18:31:48 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-04 18:31:48 +0100
commit3df8e56d3bead686aae9c362deb4b351415cca81 (patch)
treeb005826cf4c5af77eada6de2e253c375c513a38c /messagebus
parentb949690fd9d8e803b8dcae8360fc4376d72e8cc2 (diff)
First pass at a scalable blocking send.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java86
1 files changed, 74 insertions, 12 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 12ec4186513..b9b78d18359 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,11 +4,15 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
+import com.yahoo.text.Utf8String;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
+import static java.lang.System.nanoTime;
+
/**
* <p>A session supporting sending new messages.</p>
*
@@ -26,8 +30,8 @@ public final class SourceSession implements ReplyHandler {
private final ThrottlePolicy throttlePolicy;
private volatile double timeout;
private volatile int pendingCount = 0;
- private int blockedCount = 0;
private boolean closed = false;
+ private final Queue<BlockingHandler> blockedQ = new LinkedList<>();
/**
* <p>The default constructor requires values for all final member variables
@@ -155,6 +159,52 @@ public final class SourceSession implements ReplyHandler {
return Result.ACCEPTED;
}
+ static private class BlockingHandler {
+
+ private boolean complete = false;
+ private final Message msg;
+ BlockingHandler(Message msg) {
+ this.msg = msg;
+ }
+
+ void sendComplete() {
+ synchronized (this) {
+ complete = true;
+ notify();
+ }
+ }
+
+ Message getMessage() { return msg; }
+ boolean waitComplete() throws InterruptedException {
+
+ synchronized (this) {
+
+ while( !complete && !msg.isExpired() ) {
+ wait(msg.getTimeRemainingNow());
+ }
+ return complete;
+ }
+ }
+ }
+
+ static public class SendTimeoutReply extends Reply {
+
+ SendTimeoutReply(Message msg) {
+ setMessage(msg);
+ addError(new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"));
+ }
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+
/**
* <p>This is a blocking proxy to the {@link #send(Message)} method. This
* method blocks until the message is accepted by the send queue. Note that
@@ -172,15 +222,31 @@ public final class SourceSession implements ReplyHandler {
if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) {
return res;
}
+ BlockingHandler sendHandler = null;
synchronized (lock) {
- try {
- blockedCount++;
- while (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
- lock.wait(10);
+ if (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
+ sendHandler = new BlockingHandler(msg);
+ blockedQ.add(sendHandler);
+ }
+ }
+ if ( (sendHandler != null) && !sendHandler.waitComplete() ) {
+ replyHandler.handleReply(new SendTimeoutReply(msg));
+ }
+ }
+ }
+
+ private void drainQ() {
+ synchronized (lock) {
+ while (!blockedQ.isEmpty() ) {
+ BlockingHandler handler = blockedQ.element();
+ if ( ! handler.getMessage().isExpired() ) {
+ if (throttlePolicy.canSend(handler.getMessage(), pendingCount)) {
+ handler.sendComplete();
+ } else {
+ return;
}
- } finally {
- blockedCount--;
}
+ blockedQ.remove();
}
}
}
@@ -198,11 +264,7 @@ public final class SourceSession implements ReplyHandler {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- if (blockedCount < 10) {
- lock.notifyAll();
- } else {
- lock.notify();
- }
+ drainQ();
}
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,