diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 07:32:01 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-06 07:32:01 +0100 |
commit | 9830ef4b45b59a2dca227e2e8756bde627ac5046 (patch) | |
tree | 02d6cba4f4dcebdd9d1c21385748ee724f3ac1ad /messagebus/src | |
parent | 01271a56e795c06cb22ff7042e7a9f802ddebdf0 (diff) |
Revert "First pass at a scalable blocking send."
Diffstat (limited to 'messagebus/src')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 135 | ||||
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java | 90 |
2 files changed, 37 insertions, 188 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 554b093df83..12ec4186513 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,9 +4,7 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; -import com.yahoo.text.Utf8String; -import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -16,7 +14,7 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public final class SourceSession implements ReplyHandler, Runnable { +public final class SourceSession implements ReplyHandler { private static Logger log = Logger.getLogger(SourceSession.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -28,9 +26,8 @@ public final class SourceSession implements ReplyHandler, Runnable { private final ThrottlePolicy throttlePolicy; private volatile double timeout; private volatile int pendingCount = 0; - private volatile boolean closed = false; - private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); - private final Thread blockedMessageSender; + private int blockedCount = 0; + private boolean closed = false; /** * <p>The default constructor requires values for all final member variables @@ -51,9 +48,6 @@ public final class SourceSession implements ReplyHandler, Runnable { replyHandler = params.getReplyHandler(); throttlePolicy = params.getThrottlePolicy(); timeout = params.getTimeout(); - blockedMessageSender = new Thread(this); - blockedMessageSender.setDaemon(true); - blockedMessageSender.start(); } @Override @@ -132,17 +126,10 @@ public final class SourceSession implements ReplyHandler, Runnable { * @return The result of <i>initiating</i> sending of this message. */ public Result send(Message msg) { - - return sendInternal(updateTiming(msg)); - } - private Message updateTiming(Message msg) { msg.setTimeReceivedNow(); if (msg.getTimeRemaining() <= 0) { - msg.setTimeRemaining((long) (timeout * 1000)); + msg.setTimeRemaining((long)(timeout * 1000)); } - return msg; - } - private Result sendInternal(Message msg) { synchronized (lock) { if (closed) { return new Result(ErrorCode.SEND_QUEUE_CLOSED, @@ -168,86 +155,6 @@ public final class SourceSession implements ReplyHandler, Runnable { return Result.ACCEPTED; } - @Override - public void run() { - while (!closed) { - sendBlockedMessages(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - return; - } - } - } - - private class BlockedMessage { - private final Message msg; - private Result result = null; - BlockedMessage(Message msg) { - this.msg = msg; - } - - private synchronized void notifyFailure(Result result) { - synchronized (this) { - this.result = result; - notify(); - } - } - private void notifySuccess(Result result) { - synchronized (this) { - this.result = result; - notify(); - } - } - - Message getMessage() { return msg; } - - boolean sendOrExpire() { - if (msg.isExpired()) { - Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); - notifyFailure(new Result(error)); - replyHandler.handleReply(new SendTimeoutReply(msg, error)); - } else { - Result res = sendInternal(msg); - if ( ! isSendQFull(res) ) { - notifySuccess(res); - } else { - return false; - } - } - return true; - } - - Result waitComplete() throws InterruptedException { - synchronized (this) { - this.wait(); - } - return result; - } - } - - static class SendTimeoutReply extends Reply { - - SendTimeoutReply(Message msg, Error error) { - setMessage(msg); - addError(error); - } - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - } - - static private boolean isSendQFull(Result res) { - return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL); - } - /** * <p>This is a blocking proxy to the {@link #send(Message)} method. This * method blocks until the message is accepted by the send queue. Note that @@ -260,23 +167,19 @@ public final class SourceSession implements ReplyHandler, Runnable { * @throws InterruptedException Thrown if the calling thread is interrupted. */ public Result sendBlocking(Message msg) throws InterruptedException { - Result res = send(msg); - if (isSendQFull(res)) { - BlockedMessage blockedMessage = new BlockedMessage(msg); - synchronized (lock) { - blockedQ.add(blockedMessage); + while (true) { + Result res = send(msg); + if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) { + return res; } - res = blockedMessage.waitComplete(); - } - return res; - } - - private void sendBlockedMessages() { - synchronized (lock) { - for (boolean success = true; success && !blockedQ.isEmpty(); ) { - success = blockedQ.element().sendOrExpire(); - if (success) { - blockedQ.remove(); + synchronized (lock) { + try { + blockedCount++; + while (!closed && !throttlePolicy.canSend(msg, pendingCount)) { + lock.wait(10); + } + } finally { + blockedCount--; } } } @@ -295,7 +198,11 @@ public final class SourceSession implements ReplyHandler, Runnable { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - sendBlockedMessages(); + if (blockedCount < 10) { + lock.notifyAll(); + } else { + lock.notify(); + } } if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index 9db0bda04af..296e724a502 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -1,7 +1,19 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus.network.local; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.IntermediateSession; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.test.SimpleMessage; @@ -15,9 +27,7 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> @@ -81,68 +91,6 @@ public class LocalNetworkTest { server.mbus.destroy(); } - @Test - public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException { - final LocalWire wire = new LocalWire(); - - final Server serverA = new Server(wire); - final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1)); - - final Server serverB = new Server(wire); - final IntermediateSession intermediate = serverB.newIntermediateSession(); - - final Server serverC = new Server(wire); - final DestinationSession destination = serverC.newDestinationSession(); - - Message msg = new SimpleMessage("foo"); - msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) - .addHop(Hop.parse(destination.getConnectionSpec()))); - assertThat(source.sendBlocking(msg).isAccepted(), is(true)); - Message msg2 = new SimpleMessage("foo2"); - msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) - .addHop(Hop.parse(destination.getConnectionSpec()))); - long TIMEOUT = 1000; - msg2.setTimeRemaining(TIMEOUT); - long start = System.currentTimeMillis(); - Result res = source.sendBlocking(msg2); - assertThat(res.isAccepted(), is(false)); - assertEquals(ErrorCode.TIMEOUT, res.getError().getCode()); - assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ")); - long end = System.currentTimeMillis(); - assertTrue(end - start > TIMEOUT); - assertTrue(end - start < 2*TIMEOUT); - - msg = serverB.messages.poll(60, TimeUnit.SECONDS); - assertThat(msg, instanceOf(SimpleMessage.class)); - assertThat(((SimpleMessage)msg).getValue(), is("foo")); - intermediate.forward(msg); - - msg = serverC.messages.poll(60, TimeUnit.SECONDS); - assertThat(msg, instanceOf(SimpleMessage.class)); - assertThat(((SimpleMessage)msg).getValue(), is("foo")); - Reply reply = new SimpleReply("bar"); - reply.swapState(msg); - destination.reply(reply); - - reply = serverB.replies.poll(60, TimeUnit.SECONDS); - assertThat(reply, instanceOf(SimpleReply.class)); - assertThat(((SimpleReply)reply).getValue(), is("bar")); - intermediate.forward(reply); - - reply = serverA.replies.poll(60, TimeUnit.SECONDS); - assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode()); - assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ")); - - reply = serverA.replies.poll(60, TimeUnit.SECONDS); - assertThat(reply, instanceOf(SimpleReply.class)); - assertThat(((SimpleReply)reply).getValue(), is("bar")); - - serverA.mbus.destroy(); - serverB.mbus.destroy(); - serverC.mbus.destroy(); - - } - private static class Server implements MessageHandler, ReplyHandler { final MessageBus mbus; @@ -156,16 +104,10 @@ public class LocalNetworkTest { } SourceSession newSourceSession() { - return mbus.createSourceSession(new SourceSessionParams() - .setTimeout(600.0) - .setReplyHandler(this)); - } - SourceSession newSourceSession(ThrottlePolicy throttlePolicy) { - return mbus.createSourceSession(new SourceSessionParams() - .setTimeout(600.0) - .setReplyHandler(this) - .setThrottlePolicy(throttlePolicy)); + return mbus.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(this)); } + IntermediateSession newIntermediateSession() { return mbus.createIntermediateSession(new IntermediateSessionParams() .setMessageHandler(this) |