summaryrefslogtreecommitdiffstats
path: root/messagebus/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 07:32:01 +0100
committerGitHub <noreply@github.com>2017-01-06 07:32:01 +0100
commit9830ef4b45b59a2dca227e2e8756bde627ac5046 (patch)
tree02d6cba4f4dcebdd9d1c21385748ee724f3ac1ad /messagebus/src
parent01271a56e795c06cb22ff7042e7a9f802ddebdf0 (diff)
Revert "First pass at a scalable blocking send."
Diffstat (limited to 'messagebus/src')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java135
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java90
2 files changed, 37 insertions, 188 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 554b093df83..12ec4186513 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,9 +4,7 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
-import com.yahoo.text.Utf8String;
-import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -16,7 +14,7 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public final class SourceSession implements ReplyHandler, Runnable {
+public final class SourceSession implements ReplyHandler {
private static Logger log = Logger.getLogger(SourceSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -28,9 +26,8 @@ public final class SourceSession implements ReplyHandler, Runnable {
private final ThrottlePolicy throttlePolicy;
private volatile double timeout;
private volatile int pendingCount = 0;
- private volatile boolean closed = false;
- private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
- private final Thread blockedMessageSender;
+ private int blockedCount = 0;
+ private boolean closed = false;
/**
* <p>The default constructor requires values for all final member variables
@@ -51,9 +48,6 @@ public final class SourceSession implements ReplyHandler, Runnable {
replyHandler = params.getReplyHandler();
throttlePolicy = params.getThrottlePolicy();
timeout = params.getTimeout();
- blockedMessageSender = new Thread(this);
- blockedMessageSender.setDaemon(true);
- blockedMessageSender.start();
}
@Override
@@ -132,17 +126,10 @@ public final class SourceSession implements ReplyHandler, Runnable {
* @return The result of <i>initiating</i> sending of this message.
*/
public Result send(Message msg) {
-
- return sendInternal(updateTiming(msg));
- }
- private Message updateTiming(Message msg) {
msg.setTimeReceivedNow();
if (msg.getTimeRemaining() <= 0) {
- msg.setTimeRemaining((long) (timeout * 1000));
+ msg.setTimeRemaining((long)(timeout * 1000));
}
- return msg;
- }
- private Result sendInternal(Message msg) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
@@ -168,86 +155,6 @@ public final class SourceSession implements ReplyHandler, Runnable {
return Result.ACCEPTED;
}
- @Override
- public void run() {
- while (!closed) {
- sendBlockedMessages();
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
-
- private class BlockedMessage {
- private final Message msg;
- private Result result = null;
- BlockedMessage(Message msg) {
- this.msg = msg;
- }
-
- private synchronized void notifyFailure(Result result) {
- synchronized (this) {
- this.result = result;
- notify();
- }
- }
- private void notifySuccess(Result result) {
- synchronized (this) {
- this.result = result;
- notify();
- }
- }
-
- Message getMessage() { return msg; }
-
- boolean sendOrExpire() {
- if (msg.isExpired()) {
- Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
- notifyFailure(new Result(error));
- replyHandler.handleReply(new SendTimeoutReply(msg, error));
- } else {
- Result res = sendInternal(msg);
- if ( ! isSendQFull(res) ) {
- notifySuccess(res);
- } else {
- return false;
- }
- }
- return true;
- }
-
- Result waitComplete() throws InterruptedException {
- synchronized (this) {
- this.wait();
- }
- return result;
- }
- }
-
- static class SendTimeoutReply extends Reply {
-
- SendTimeoutReply(Message msg, Error error) {
- setMessage(msg);
- addError(error);
- }
-
- @Override
- public Utf8String getProtocol() {
- return null;
- }
-
- @Override
- public int getType() {
- return 0;
- }
- }
-
- static private boolean isSendQFull(Result res) {
- return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL);
- }
-
/**
* <p>This is a blocking proxy to the {@link #send(Message)} method. This
* method blocks until the message is accepted by the send queue. Note that
@@ -260,23 +167,19 @@ public final class SourceSession implements ReplyHandler, Runnable {
* @throws InterruptedException Thrown if the calling thread is interrupted.
*/
public Result sendBlocking(Message msg) throws InterruptedException {
- Result res = send(msg);
- if (isSendQFull(res)) {
- BlockedMessage blockedMessage = new BlockedMessage(msg);
- synchronized (lock) {
- blockedQ.add(blockedMessage);
+ while (true) {
+ Result res = send(msg);
+ if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) {
+ return res;
}
- res = blockedMessage.waitComplete();
- }
- return res;
- }
-
- private void sendBlockedMessages() {
- synchronized (lock) {
- for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- success = blockedQ.element().sendOrExpire();
- if (success) {
- blockedQ.remove();
+ synchronized (lock) {
+ try {
+ blockedCount++;
+ while (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
+ lock.wait(10);
+ }
+ } finally {
+ blockedCount--;
}
}
}
@@ -295,7 +198,11 @@ public final class SourceSession implements ReplyHandler, Runnable {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- sendBlockedMessages();
+ if (blockedCount < 10) {
+ lock.notifyAll();
+ } else {
+ lock.notify();
+ }
}
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index 9db0bda04af..296e724a502 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -1,7 +1,19 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.local;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.IntermediateSession;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -15,9 +27,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
@@ -81,68 +91,6 @@ public class LocalNetworkTest {
server.mbus.destroy();
}
- @Test
- public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException {
- final LocalWire wire = new LocalWire();
-
- final Server serverA = new Server(wire);
- final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1));
-
- final Server serverB = new Server(wire);
- final IntermediateSession intermediate = serverB.newIntermediateSession();
-
- final Server serverC = new Server(wire);
- final DestinationSession destination = serverC.newDestinationSession();
-
- Message msg = new SimpleMessage("foo");
- msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
- .addHop(Hop.parse(destination.getConnectionSpec())));
- assertThat(source.sendBlocking(msg).isAccepted(), is(true));
- Message msg2 = new SimpleMessage("foo2");
- msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
- .addHop(Hop.parse(destination.getConnectionSpec())));
- long TIMEOUT = 1000;
- msg2.setTimeRemaining(TIMEOUT);
- long start = System.currentTimeMillis();
- Result res = source.sendBlocking(msg2);
- assertThat(res.isAccepted(), is(false));
- assertEquals(ErrorCode.TIMEOUT, res.getError().getCode());
- assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ"));
- long end = System.currentTimeMillis();
- assertTrue(end - start > TIMEOUT);
- assertTrue(end - start < 2*TIMEOUT);
-
- msg = serverB.messages.poll(60, TimeUnit.SECONDS);
- assertThat(msg, instanceOf(SimpleMessage.class));
- assertThat(((SimpleMessage)msg).getValue(), is("foo"));
- intermediate.forward(msg);
-
- msg = serverC.messages.poll(60, TimeUnit.SECONDS);
- assertThat(msg, instanceOf(SimpleMessage.class));
- assertThat(((SimpleMessage)msg).getValue(), is("foo"));
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- destination.reply(reply);
-
- reply = serverB.replies.poll(60, TimeUnit.SECONDS);
- assertThat(reply, instanceOf(SimpleReply.class));
- assertThat(((SimpleReply)reply).getValue(), is("bar"));
- intermediate.forward(reply);
-
- reply = serverA.replies.poll(60, TimeUnit.SECONDS);
- assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode());
- assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ"));
-
- reply = serverA.replies.poll(60, TimeUnit.SECONDS);
- assertThat(reply, instanceOf(SimpleReply.class));
- assertThat(((SimpleReply)reply).getValue(), is("bar"));
-
- serverA.mbus.destroy();
- serverB.mbus.destroy();
- serverC.mbus.destroy();
-
- }
-
private static class Server implements MessageHandler, ReplyHandler {
final MessageBus mbus;
@@ -156,16 +104,10 @@ public class LocalNetworkTest {
}
SourceSession newSourceSession() {
- return mbus.createSourceSession(new SourceSessionParams()
- .setTimeout(600.0)
- .setReplyHandler(this));
- }
- SourceSession newSourceSession(ThrottlePolicy throttlePolicy) {
- return mbus.createSourceSession(new SourceSessionParams()
- .setTimeout(600.0)
- .setReplyHandler(this)
- .setThrottlePolicy(throttlePolicy));
+ return mbus.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
}
+
IntermediateSession newIntermediateSession() {
return mbus.createIntermediateSession(new IntermediateSessionParams()
.setMessageHandler(this)