diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-04 18:31:48 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-04 18:31:48 +0100 |
commit | 3df8e56d3bead686aae9c362deb4b351415cca81 (patch) | |
tree | b005826cf4c5af77eada6de2e253c375c513a38c /messagebus | |
parent | b949690fd9d8e803b8dcae8360fc4376d72e8cc2 (diff) |
First pass at a scalable blocking send.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 86 |
1 files changed, 74 insertions, 12 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 12ec4186513..b9b78d18359 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,11 +4,15 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; +import com.yahoo.text.Utf8String; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; +import static java.lang.System.nanoTime; + /** * <p>A session supporting sending new messages.</p> * @@ -26,8 +30,8 @@ public final class SourceSession implements ReplyHandler { private final ThrottlePolicy throttlePolicy; private volatile double timeout; private volatile int pendingCount = 0; - private int blockedCount = 0; private boolean closed = false; + private final Queue<BlockingHandler> blockedQ = new LinkedList<>(); /** * <p>The default constructor requires values for all final member variables @@ -155,6 +159,52 @@ public final class SourceSession implements ReplyHandler { return Result.ACCEPTED; } + static private class BlockingHandler { + + private boolean complete = false; + private final Message msg; + BlockingHandler(Message msg) { + this.msg = msg; + } + + void sendComplete() { + synchronized (this) { + complete = true; + notify(); + } + } + + Message getMessage() { return msg; } + boolean waitComplete() throws InterruptedException { + + synchronized (this) { + + while( !complete && !msg.isExpired() ) { + wait(msg.getTimeRemainingNow()); + } + return complete; + } + } + } + + static public class SendTimeoutReply extends Reply { + + SendTimeoutReply(Message msg) { + setMessage(msg); + addError(new Error(ErrorCode.TIMEOUT, "Timed out in sendQ")); + } + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } + /** * <p>This is a blocking proxy to the {@link #send(Message)} method. This * method blocks until the message is accepted by the send queue. Note that @@ -172,15 +222,31 @@ public final class SourceSession implements ReplyHandler { if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) { return res; } + BlockingHandler sendHandler = null; synchronized (lock) { - try { - blockedCount++; - while (!closed && !throttlePolicy.canSend(msg, pendingCount)) { - lock.wait(10); + if (!closed && !throttlePolicy.canSend(msg, pendingCount)) { + sendHandler = new BlockingHandler(msg); + blockedQ.add(sendHandler); + } + } + if ( (sendHandler != null) && !sendHandler.waitComplete() ) { + replyHandler.handleReply(new SendTimeoutReply(msg)); + } + } + } + + private void drainQ() { + synchronized (lock) { + while (!blockedQ.isEmpty() ) { + BlockingHandler handler = blockedQ.element(); + if ( ! handler.getMessage().isExpired() ) { + if (throttlePolicy.canSend(handler.getMessage(), pendingCount)) { + handler.sendComplete(); + } else { + return; } - } finally { - blockedCount--; } + blockedQ.remove(); } } } @@ -198,11 +264,7 @@ public final class SourceSession implements ReplyHandler { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - if (blockedCount < 10) { - lock.notifyAll(); - } else { - lock.notify(); - } + drainQ(); } if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, |