aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 00:43:50 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 00:43:50 +0100
commit236832a9528552422ad77d84dadb381052db5de6 (patch)
tree819825cbed96744fed9709651d423d2f365ff956 /messagebus
parent3df8e56d3bead686aae9c362deb4b351415cca81 (diff)
- Added a test for sendBlocking with and without blocking sendQ.
- Redesigned based on @bratseth comments. - Now you will be timed out either by the reply thread in mbus or by the new thread that will handle sending of leftover messages. Timeout is not honoured correctly if there is a variation in timeout and system overload. A short timeout message might not be expired if an earlier message with alonger timeout is still on the Q. But I will let factory have a go at it before completing.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java119
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java90
2 files changed, 149 insertions, 60 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index b9b78d18359..554b093df83 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -11,14 +11,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
-import static java.lang.System.nanoTime;
-
/**
* <p>A session supporting sending new messages.</p>
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public final class SourceSession implements ReplyHandler {
+public final class SourceSession implements ReplyHandler, Runnable {
private static Logger log = Logger.getLogger(SourceSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -30,8 +28,9 @@ public final class SourceSession implements ReplyHandler {
private final ThrottlePolicy throttlePolicy;
private volatile double timeout;
private volatile int pendingCount = 0;
- private boolean closed = false;
- private final Queue<BlockingHandler> blockedQ = new LinkedList<>();
+ private volatile boolean closed = false;
+ private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
+ private final Thread blockedMessageSender;
/**
* <p>The default constructor requires values for all final member variables
@@ -52,6 +51,9 @@ public final class SourceSession implements ReplyHandler {
replyHandler = params.getReplyHandler();
throttlePolicy = params.getThrottlePolicy();
timeout = params.getTimeout();
+ blockedMessageSender = new Thread(this);
+ blockedMessageSender.setDaemon(true);
+ blockedMessageSender.start();
}
@Override
@@ -130,10 +132,17 @@ public final class SourceSession implements ReplyHandler {
* @return The result of <i>initiating</i> sending of this message.
*/
public Result send(Message msg) {
+
+ return sendInternal(updateTiming(msg));
+ }
+ private Message updateTiming(Message msg) {
msg.setTimeReceivedNow();
if (msg.getTimeRemaining() <= 0) {
- msg.setTimeRemaining((long)(timeout * 1000));
+ msg.setTimeRemaining((long) (timeout * 1000));
}
+ return msg;
+ }
+ private Result sendInternal(Message msg) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
@@ -159,39 +168,69 @@ public final class SourceSession implements ReplyHandler {
return Result.ACCEPTED;
}
- static private class BlockingHandler {
+ @Override
+ public void run() {
+ while (!closed) {
+ sendBlockedMessages();
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
- private boolean complete = false;
+ private class BlockedMessage {
private final Message msg;
- BlockingHandler(Message msg) {
+ private Result result = null;
+ BlockedMessage(Message msg) {
this.msg = msg;
}
- void sendComplete() {
+ private synchronized void notifyFailure(Result result) {
+ synchronized (this) {
+ this.result = result;
+ notify();
+ }
+ }
+ private void notifySuccess(Result result) {
synchronized (this) {
- complete = true;
+ this.result = result;
notify();
}
}
Message getMessage() { return msg; }
- boolean waitComplete() throws InterruptedException {
- synchronized (this) {
-
- while( !complete && !msg.isExpired() ) {
- wait(msg.getTimeRemainingNow());
+ boolean sendOrExpire() {
+ if (msg.isExpired()) {
+ Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
+ notifyFailure(new Result(error));
+ replyHandler.handleReply(new SendTimeoutReply(msg, error));
+ } else {
+ Result res = sendInternal(msg);
+ if ( ! isSendQFull(res) ) {
+ notifySuccess(res);
+ } else {
+ return false;
}
- return complete;
}
+ return true;
+ }
+
+ Result waitComplete() throws InterruptedException {
+ synchronized (this) {
+ this.wait();
+ }
+ return result;
}
}
- static public class SendTimeoutReply extends Reply {
+ static class SendTimeoutReply extends Reply {
- SendTimeoutReply(Message msg) {
+ SendTimeoutReply(Message msg, Error error) {
setMessage(msg);
- addError(new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"));
+ addError(error);
}
@Override
@@ -205,6 +244,10 @@ public final class SourceSession implements ReplyHandler {
}
}
+ static private boolean isSendQFull(Result res) {
+ return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL);
+ }
+
/**
* <p>This is a blocking proxy to the {@link #send(Message)} method. This
* method blocks until the message is accepted by the send queue. Note that
@@ -217,36 +260,24 @@ public final class SourceSession implements ReplyHandler {
* @throws InterruptedException Thrown if the calling thread is interrupted.
*/
public Result sendBlocking(Message msg) throws InterruptedException {
- while (true) {
- Result res = send(msg);
- if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) {
- return res;
- }
- BlockingHandler sendHandler = null;
+ Result res = send(msg);
+ if (isSendQFull(res)) {
+ BlockedMessage blockedMessage = new BlockedMessage(msg);
synchronized (lock) {
- if (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
- sendHandler = new BlockingHandler(msg);
- blockedQ.add(sendHandler);
- }
- }
- if ( (sendHandler != null) && !sendHandler.waitComplete() ) {
- replyHandler.handleReply(new SendTimeoutReply(msg));
+ blockedQ.add(blockedMessage);
}
+ res = blockedMessage.waitComplete();
}
+ return res;
}
- private void drainQ() {
+ private void sendBlockedMessages() {
synchronized (lock) {
- while (!blockedQ.isEmpty() ) {
- BlockingHandler handler = blockedQ.element();
- if ( ! handler.getMessage().isExpired() ) {
- if (throttlePolicy.canSend(handler.getMessage(), pendingCount)) {
- handler.sendComplete();
- } else {
- return;
- }
+ for (boolean success = true; success && !blockedQ.isEmpty(); ) {
+ success = blockedQ.element().sendOrExpire();
+ if (success) {
+ blockedQ.remove();
}
- blockedQ.remove();
}
}
}
@@ -264,7 +295,7 @@ public final class SourceSession implements ReplyHandler {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- drainQ();
+ sendBlockedMessages();
}
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index 296e724a502..9db0bda04af 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -1,19 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.local;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.IntermediateSession;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
@@ -91,6 +81,68 @@ public class LocalNetworkTest {
server.mbus.destroy();
}
+ @Test
+ public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException {
+ final LocalWire wire = new LocalWire();
+
+ final Server serverA = new Server(wire);
+ final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1));
+
+ final Server serverB = new Server(wire);
+ final IntermediateSession intermediate = serverB.newIntermediateSession();
+
+ final Server serverC = new Server(wire);
+ final DestinationSession destination = serverC.newDestinationSession();
+
+ Message msg = new SimpleMessage("foo");
+ msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ assertThat(source.sendBlocking(msg).isAccepted(), is(true));
+ Message msg2 = new SimpleMessage("foo2");
+ msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ long TIMEOUT = 1000;
+ msg2.setTimeRemaining(TIMEOUT);
+ long start = System.currentTimeMillis();
+ Result res = source.sendBlocking(msg2);
+ assertThat(res.isAccepted(), is(false));
+ assertEquals(ErrorCode.TIMEOUT, res.getError().getCode());
+ assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ"));
+ long end = System.currentTimeMillis();
+ assertTrue(end - start > TIMEOUT);
+ assertTrue(end - start < 2*TIMEOUT);
+
+ msg = serverB.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ intermediate.forward(msg);
+
+ msg = serverC.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ destination.reply(reply);
+
+ reply = serverB.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+ intermediate.forward(reply);
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ"));
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+
+ serverA.mbus.destroy();
+ serverB.mbus.destroy();
+ serverC.mbus.destroy();
+
+ }
+
private static class Server implements MessageHandler, ReplyHandler {
final MessageBus mbus;
@@ -104,10 +156,16 @@ public class LocalNetworkTest {
}
SourceSession newSourceSession() {
- return mbus.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this));
+ }
+ SourceSession newSourceSession(ThrottlePolicy throttlePolicy) {
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this)
+ .setThrottlePolicy(throttlePolicy));
}
-
IntermediateSession newIntermediateSession() {
return mbus.createIntermediateSession(new IntermediateSessionParams()
.setMessageHandler(this)