aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 11:44:33 +0100
committerGitHub <noreply@github.com>2017-01-06 11:44:33 +0100
commit24e355c7f1386efa15a81da8135934fa82d4fa3e (patch)
treecd69e650b73d0cf1b4b9ffa7e0c1d4128b753efd /messagebus
parentaa1ad6b1ca7f6f4d874e54f60178dddc3a0e3208 (diff)
Revert "Revert "First pass at a scalable blocking send.""
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java135
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java90
2 files changed, 188 insertions, 37 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 12ec4186513..554b093df83 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,7 +4,9 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
+import com.yahoo.text.Utf8String;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -14,7 +16,7 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public final class SourceSession implements ReplyHandler {
+public final class SourceSession implements ReplyHandler, Runnable {
private static Logger log = Logger.getLogger(SourceSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -26,8 +28,9 @@ public final class SourceSession implements ReplyHandler {
private final ThrottlePolicy throttlePolicy;
private volatile double timeout;
private volatile int pendingCount = 0;
- private int blockedCount = 0;
- private boolean closed = false;
+ private volatile boolean closed = false;
+ private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
+ private final Thread blockedMessageSender;
/**
* <p>The default constructor requires values for all final member variables
@@ -48,6 +51,9 @@ public final class SourceSession implements ReplyHandler {
replyHandler = params.getReplyHandler();
throttlePolicy = params.getThrottlePolicy();
timeout = params.getTimeout();
+ blockedMessageSender = new Thread(this);
+ blockedMessageSender.setDaemon(true);
+ blockedMessageSender.start();
}
@Override
@@ -126,10 +132,17 @@ public final class SourceSession implements ReplyHandler {
* @return The result of <i>initiating</i> sending of this message.
*/
public Result send(Message msg) {
+
+ return sendInternal(updateTiming(msg));
+ }
+ private Message updateTiming(Message msg) {
msg.setTimeReceivedNow();
if (msg.getTimeRemaining() <= 0) {
- msg.setTimeRemaining((long)(timeout * 1000));
+ msg.setTimeRemaining((long) (timeout * 1000));
}
+ return msg;
+ }
+ private Result sendInternal(Message msg) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
@@ -155,6 +168,86 @@ public final class SourceSession implements ReplyHandler {
return Result.ACCEPTED;
}
+ @Override
+ public void run() {
+ while (!closed) {
+ sendBlockedMessages();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private class BlockedMessage {
+ private final Message msg;
+ private Result result = null;
+ BlockedMessage(Message msg) {
+ this.msg = msg;
+ }
+
+ private synchronized void notifyFailure(Result result) {
+ synchronized (this) {
+ this.result = result;
+ notify();
+ }
+ }
+ private void notifySuccess(Result result) {
+ synchronized (this) {
+ this.result = result;
+ notify();
+ }
+ }
+
+ Message getMessage() { return msg; }
+
+ boolean sendOrExpire() {
+ if (msg.isExpired()) {
+ Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
+ notifyFailure(new Result(error));
+ replyHandler.handleReply(new SendTimeoutReply(msg, error));
+ } else {
+ Result res = sendInternal(msg);
+ if ( ! isSendQFull(res) ) {
+ notifySuccess(res);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ Result waitComplete() throws InterruptedException {
+ synchronized (this) {
+ this.wait();
+ }
+ return result;
+ }
+ }
+
+ static class SendTimeoutReply extends Reply {
+
+ SendTimeoutReply(Message msg, Error error) {
+ setMessage(msg);
+ addError(error);
+ }
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+
+ static private boolean isSendQFull(Result res) {
+ return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL);
+ }
+
/**
* <p>This is a blocking proxy to the {@link #send(Message)} method. This
* method blocks until the message is accepted by the send queue. Note that
@@ -167,19 +260,23 @@ public final class SourceSession implements ReplyHandler {
* @throws InterruptedException Thrown if the calling thread is interrupted.
*/
public Result sendBlocking(Message msg) throws InterruptedException {
- while (true) {
- Result res = send(msg);
- if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) {
- return res;
- }
+ Result res = send(msg);
+ if (isSendQFull(res)) {
+ BlockedMessage blockedMessage = new BlockedMessage(msg);
synchronized (lock) {
- try {
- blockedCount++;
- while (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
- lock.wait(10);
- }
- } finally {
- blockedCount--;
+ blockedQ.add(blockedMessage);
+ }
+ res = blockedMessage.waitComplete();
+ }
+ return res;
+ }
+
+ private void sendBlockedMessages() {
+ synchronized (lock) {
+ for (boolean success = true; success && !blockedQ.isEmpty(); ) {
+ success = blockedQ.element().sendOrExpire();
+ if (success) {
+ blockedQ.remove();
}
}
}
@@ -198,11 +295,7 @@ public final class SourceSession implements ReplyHandler {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- if (blockedCount < 10) {
- lock.notifyAll();
- } else {
- lock.notify();
- }
+ sendBlockedMessages();
}
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index 296e724a502..9db0bda04af 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -1,19 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.local;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.IntermediateSession;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
@@ -91,6 +81,68 @@ public class LocalNetworkTest {
server.mbus.destroy();
}
+ @Test
+ public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException {
+ final LocalWire wire = new LocalWire();
+
+ final Server serverA = new Server(wire);
+ final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1));
+
+ final Server serverB = new Server(wire);
+ final IntermediateSession intermediate = serverB.newIntermediateSession();
+
+ final Server serverC = new Server(wire);
+ final DestinationSession destination = serverC.newDestinationSession();
+
+ Message msg = new SimpleMessage("foo");
+ msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ assertThat(source.sendBlocking(msg).isAccepted(), is(true));
+ Message msg2 = new SimpleMessage("foo2");
+ msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ long TIMEOUT = 1000;
+ msg2.setTimeRemaining(TIMEOUT);
+ long start = System.currentTimeMillis();
+ Result res = source.sendBlocking(msg2);
+ assertThat(res.isAccepted(), is(false));
+ assertEquals(ErrorCode.TIMEOUT, res.getError().getCode());
+ assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ"));
+ long end = System.currentTimeMillis();
+ assertTrue(end - start > TIMEOUT);
+ assertTrue(end - start < 2*TIMEOUT);
+
+ msg = serverB.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ intermediate.forward(msg);
+
+ msg = serverC.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ destination.reply(reply);
+
+ reply = serverB.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+ intermediate.forward(reply);
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ"));
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+
+ serverA.mbus.destroy();
+ serverB.mbus.destroy();
+ serverC.mbus.destroy();
+
+ }
+
private static class Server implements MessageHandler, ReplyHandler {
final MessageBus mbus;
@@ -104,10 +156,16 @@ public class LocalNetworkTest {
}
SourceSession newSourceSession() {
- return mbus.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this));
+ }
+ SourceSession newSourceSession(ThrottlePolicy throttlePolicy) {
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this)
+ .setThrottlePolicy(throttlePolicy));
}
-
IntermediateSession newIntermediateSession() {
return mbus.createIntermediateSession(new IntermediateSessionParams()
.setMessageHandler(this)