diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 07:32:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-06 07:32:01 +0100 |
commit | 9830ef4b45b59a2dca227e2e8756bde627ac5046 (patch) | |
tree | 02d6cba4f4dcebdd9d1c21385748ee724f3ac1ad /messagebus/src/main/java/com | |
parent | 01271a56e795c06cb22ff7042e7a9f802ddebdf0 (diff) |
Revert "First pass at a scalable blocking send."
Diffstat (limited to 'messagebus/src/main/java/com')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 135 |
1 files changed, 21 insertions, 114 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 554b093df83..12ec4186513 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,9 +4,7 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; -import com.yahoo.text.Utf8String; -import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -16,7 +14,7 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public final class SourceSession implements ReplyHandler, Runnable { +public final class SourceSession implements ReplyHandler { private static Logger log = Logger.getLogger(SourceSession.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -28,9 +26,8 @@ public final class SourceSession implements ReplyHandler, Runnable { private final ThrottlePolicy throttlePolicy; private volatile double timeout; private volatile int pendingCount = 0; - private volatile boolean closed = false; - private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); - private final Thread blockedMessageSender; + private int blockedCount = 0; + private boolean closed = false; /** * <p>The default constructor requires values for all final member variables @@ -51,9 +48,6 @@ public final class SourceSession implements ReplyHandler, Runnable { replyHandler = params.getReplyHandler(); throttlePolicy = params.getThrottlePolicy(); timeout = params.getTimeout(); - blockedMessageSender = new Thread(this); - blockedMessageSender.setDaemon(true); - blockedMessageSender.start(); } @Override @@ -132,17 +126,10 @@ public final class SourceSession implements ReplyHandler, Runnable { * @return The result of <i>initiating</i> sending of this message. */ public Result send(Message msg) { - - return sendInternal(updateTiming(msg)); - } - private Message updateTiming(Message msg) { msg.setTimeReceivedNow(); if (msg.getTimeRemaining() <= 0) { - msg.setTimeRemaining((long) (timeout * 1000)); + msg.setTimeRemaining((long)(timeout * 1000)); } - return msg; - } - private Result sendInternal(Message msg) { synchronized (lock) { if (closed) { return new Result(ErrorCode.SEND_QUEUE_CLOSED, @@ -168,86 +155,6 @@ public final class SourceSession implements ReplyHandler, Runnable { return Result.ACCEPTED; } - @Override - public void run() { - while (!closed) { - sendBlockedMessages(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - return; - } - } - } - - private class BlockedMessage { - private final Message msg; - private Result result = null; - BlockedMessage(Message msg) { - this.msg = msg; - } - - private synchronized void notifyFailure(Result result) { - synchronized (this) { - this.result = result; - notify(); - } - } - private void notifySuccess(Result result) { - synchronized (this) { - this.result = result; - notify(); - } - } - - Message getMessage() { return msg; } - - boolean sendOrExpire() { - if (msg.isExpired()) { - Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); - notifyFailure(new Result(error)); - replyHandler.handleReply(new SendTimeoutReply(msg, error)); - } else { - Result res = sendInternal(msg); - if ( ! isSendQFull(res) ) { - notifySuccess(res); - } else { - return false; - } - } - return true; - } - - Result waitComplete() throws InterruptedException { - synchronized (this) { - this.wait(); - } - return result; - } - } - - static class SendTimeoutReply extends Reply { - - SendTimeoutReply(Message msg, Error error) { - setMessage(msg); - addError(error); - } - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - } - - static private boolean isSendQFull(Result res) { - return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL); - } - /** * <p>This is a blocking proxy to the {@link #send(Message)} method. This * method blocks until the message is accepted by the send queue. Note that @@ -260,23 +167,19 @@ public final class SourceSession implements ReplyHandler, Runnable { * @throws InterruptedException Thrown if the calling thread is interrupted. */ public Result sendBlocking(Message msg) throws InterruptedException { - Result res = send(msg); - if (isSendQFull(res)) { - BlockedMessage blockedMessage = new BlockedMessage(msg); - synchronized (lock) { - blockedQ.add(blockedMessage); + while (true) { + Result res = send(msg); + if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) { + return res; } - res = blockedMessage.waitComplete(); - } - return res; - } - - private void sendBlockedMessages() { - synchronized (lock) { - for (boolean success = true; success && !blockedQ.isEmpty(); ) { - success = blockedQ.element().sendOrExpire(); - if (success) { - blockedQ.remove(); + synchronized (lock) { + try { + blockedCount++; + while (!closed && !throttlePolicy.canSend(msg, pendingCount)) { + lock.wait(10); + } + } finally { + blockedCount--; } } } @@ -295,7 +198,11 @@ public final class SourceSession implements ReplyHandler, Runnable { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - sendBlockedMessages(); + if (blockedCount < 10) { + lock.notifyAll(); + } else { + lock.notify(); + } } if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, |