diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 11:44:33 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-06 11:44:33 +0100 |
commit | 24e355c7f1386efa15a81da8135934fa82d4fa3e (patch) | |
tree | cd69e650b73d0cf1b4b9ffa7e0c1d4128b753efd | |
parent | aa1ad6b1ca7f6f4d874e54f60178dddc3a0e3208 (diff) |
Revert "Revert "First pass at a scalable blocking send.""
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 135 | ||||
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java | 90 |
2 files changed, 188 insertions, 37 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 12ec4186513..554b093df83 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,7 +4,9 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; +import com.yahoo.text.Utf8String; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -14,7 +16,7 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public final class SourceSession implements ReplyHandler { +public final class SourceSession implements ReplyHandler, Runnable { private static Logger log = Logger.getLogger(SourceSession.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -26,8 +28,9 @@ public final class SourceSession implements ReplyHandler { private final ThrottlePolicy throttlePolicy; private volatile double timeout; private volatile int pendingCount = 0; - private int blockedCount = 0; - private boolean closed = false; + private volatile boolean closed = false; + private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); + private final Thread blockedMessageSender; /** * <p>The default constructor requires values for all final member variables @@ -48,6 +51,9 @@ public final class SourceSession implements ReplyHandler { replyHandler = params.getReplyHandler(); throttlePolicy = params.getThrottlePolicy(); timeout = params.getTimeout(); + blockedMessageSender = new Thread(this); + blockedMessageSender.setDaemon(true); + blockedMessageSender.start(); } @Override @@ -126,10 +132,17 @@ public final class SourceSession implements ReplyHandler { * @return The result of <i>initiating</i> sending of this message. */ public Result send(Message msg) { + + return sendInternal(updateTiming(msg)); + } + private Message updateTiming(Message msg) { msg.setTimeReceivedNow(); if (msg.getTimeRemaining() <= 0) { - msg.setTimeRemaining((long)(timeout * 1000)); + msg.setTimeRemaining((long) (timeout * 1000)); } + return msg; + } + private Result sendInternal(Message msg) { synchronized (lock) { if (closed) { return new Result(ErrorCode.SEND_QUEUE_CLOSED, @@ -155,6 +168,86 @@ public final class SourceSession implements ReplyHandler { return Result.ACCEPTED; } + @Override + public void run() { + while (!closed) { + sendBlockedMessages(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + return; + } + } + } + + private class BlockedMessage { + private final Message msg; + private Result result = null; + BlockedMessage(Message msg) { + this.msg = msg; + } + + private synchronized void notifyFailure(Result result) { + synchronized (this) { + this.result = result; + notify(); + } + } + private void notifySuccess(Result result) { + synchronized (this) { + this.result = result; + notify(); + } + } + + Message getMessage() { return msg; } + + boolean sendOrExpire() { + if (msg.isExpired()) { + Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); + notifyFailure(new Result(error)); + replyHandler.handleReply(new SendTimeoutReply(msg, error)); + } else { + Result res = sendInternal(msg); + if ( ! isSendQFull(res) ) { + notifySuccess(res); + } else { + return false; + } + } + return true; + } + + Result waitComplete() throws InterruptedException { + synchronized (this) { + this.wait(); + } + return result; + } + } + + static class SendTimeoutReply extends Reply { + + SendTimeoutReply(Message msg, Error error) { + setMessage(msg); + addError(error); + } + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } + + static private boolean isSendQFull(Result res) { + return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL); + } + /** * <p>This is a blocking proxy to the {@link #send(Message)} method. This * method blocks until the message is accepted by the send queue. Note that @@ -167,19 +260,23 @@ public final class SourceSession implements ReplyHandler { * @throws InterruptedException Thrown if the calling thread is interrupted. */ public Result sendBlocking(Message msg) throws InterruptedException { - while (true) { - Result res = send(msg); - if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) { - return res; - } + Result res = send(msg); + if (isSendQFull(res)) { + BlockedMessage blockedMessage = new BlockedMessage(msg); synchronized (lock) { - try { - blockedCount++; - while (!closed && !throttlePolicy.canSend(msg, pendingCount)) { - lock.wait(10); - } - } finally { - blockedCount--; + blockedQ.add(blockedMessage); + } + res = blockedMessage.waitComplete(); + } + return res; + } + + private void sendBlockedMessages() { + synchronized (lock) { + for (boolean success = true; success && !blockedQ.isEmpty(); ) { + success = blockedQ.element().sendOrExpire(); + if (success) { + blockedQ.remove(); } } } @@ -198,11 +295,7 @@ public final class SourceSession implements ReplyHandler { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - if (blockedCount < 10) { - lock.notifyAll(); - } else { - lock.notify(); - } + sendBlockedMessages(); } if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index 296e724a502..9db0bda04af 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -1,19 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus.network.local; -import com.yahoo.messagebus.DestinationSession; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.IntermediateSession; -import com.yahoo.messagebus.IntermediateSessionParams; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.*; import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.test.SimpleMessage; @@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> @@ -91,6 +81,68 @@ public class LocalNetworkTest { server.mbus.destroy(); } + @Test + public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException { + final LocalWire wire = new LocalWire(); + + final Server serverA = new Server(wire); + final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1)); + + final Server serverB = new Server(wire); + final IntermediateSession intermediate = serverB.newIntermediateSession(); + + final Server serverC = new Server(wire); + final DestinationSession destination = serverC.newDestinationSession(); + + Message msg = new SimpleMessage("foo"); + msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + assertThat(source.sendBlocking(msg).isAccepted(), is(true)); + Message msg2 = new SimpleMessage("foo2"); + msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + long TIMEOUT = 1000; + msg2.setTimeRemaining(TIMEOUT); + long start = System.currentTimeMillis(); + Result res = source.sendBlocking(msg2); + assertThat(res.isAccepted(), is(false)); + assertEquals(ErrorCode.TIMEOUT, res.getError().getCode()); + assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ")); + long end = System.currentTimeMillis(); + assertTrue(end - start > TIMEOUT); + assertTrue(end - start < 2*TIMEOUT); + + msg = serverB.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + intermediate.forward(msg); + + msg = serverC.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + destination.reply(reply); + + reply = serverB.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + intermediate.forward(reply); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode()); + assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ")); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + + serverA.mbus.destroy(); + serverB.mbus.destroy(); + serverC.mbus.destroy(); + + } + private static class Server implements MessageHandler, ReplyHandler { final MessageBus mbus; @@ -104,10 +156,16 @@ public class LocalNetworkTest { } SourceSession newSourceSession() { - return mbus.createSourceSession( - new SourceSessionParams().setTimeout(600.0).setReplyHandler(this)); + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this)); + } + SourceSession newSourceSession(ThrottlePolicy throttlePolicy) { + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this) + .setThrottlePolicy(throttlePolicy)); } - IntermediateSession newIntermediateSession() { return mbus.createIntermediateSession(new IntermediateSessionParams() .setMessageHandler(this) |