summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 07:32:01 +0100
committerGitHub <noreply@github.com>2017-01-06 07:32:01 +0100
commit9830ef4b45b59a2dca227e2e8756bde627ac5046 (patch)
tree02d6cba4f4dcebdd9d1c21385748ee724f3ac1ad /messagebus/src/main/java/com
parent01271a56e795c06cb22ff7042e7a9f802ddebdf0 (diff)
Revert "First pass at a scalable blocking send."
Diffstat (limited to 'messagebus/src/main/java/com')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java135
1 files changed, 21 insertions, 114 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 554b093df83..12ec4186513 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,9 +4,7 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
-import com.yahoo.text.Utf8String;
-import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -16,7 +14,7 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public final class SourceSession implements ReplyHandler, Runnable {
+public final class SourceSession implements ReplyHandler {
private static Logger log = Logger.getLogger(SourceSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -28,9 +26,8 @@ public final class SourceSession implements ReplyHandler, Runnable {
private final ThrottlePolicy throttlePolicy;
private volatile double timeout;
private volatile int pendingCount = 0;
- private volatile boolean closed = false;
- private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
- private final Thread blockedMessageSender;
+ private int blockedCount = 0;
+ private boolean closed = false;
/**
* <p>The default constructor requires values for all final member variables
@@ -51,9 +48,6 @@ public final class SourceSession implements ReplyHandler, Runnable {
replyHandler = params.getReplyHandler();
throttlePolicy = params.getThrottlePolicy();
timeout = params.getTimeout();
- blockedMessageSender = new Thread(this);
- blockedMessageSender.setDaemon(true);
- blockedMessageSender.start();
}
@Override
@@ -132,17 +126,10 @@ public final class SourceSession implements ReplyHandler, Runnable {
* @return The result of <i>initiating</i> sending of this message.
*/
public Result send(Message msg) {
-
- return sendInternal(updateTiming(msg));
- }
- private Message updateTiming(Message msg) {
msg.setTimeReceivedNow();
if (msg.getTimeRemaining() <= 0) {
- msg.setTimeRemaining((long) (timeout * 1000));
+ msg.setTimeRemaining((long)(timeout * 1000));
}
- return msg;
- }
- private Result sendInternal(Message msg) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
@@ -168,86 +155,6 @@ public final class SourceSession implements ReplyHandler, Runnable {
return Result.ACCEPTED;
}
- @Override
- public void run() {
- while (!closed) {
- sendBlockedMessages();
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- return;
- }
- }
- }
-
- private class BlockedMessage {
- private final Message msg;
- private Result result = null;
- BlockedMessage(Message msg) {
- this.msg = msg;
- }
-
- private synchronized void notifyFailure(Result result) {
- synchronized (this) {
- this.result = result;
- notify();
- }
- }
- private void notifySuccess(Result result) {
- synchronized (this) {
- this.result = result;
- notify();
- }
- }
-
- Message getMessage() { return msg; }
-
- boolean sendOrExpire() {
- if (msg.isExpired()) {
- Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
- notifyFailure(new Result(error));
- replyHandler.handleReply(new SendTimeoutReply(msg, error));
- } else {
- Result res = sendInternal(msg);
- if ( ! isSendQFull(res) ) {
- notifySuccess(res);
- } else {
- return false;
- }
- }
- return true;
- }
-
- Result waitComplete() throws InterruptedException {
- synchronized (this) {
- this.wait();
- }
- return result;
- }
- }
-
- static class SendTimeoutReply extends Reply {
-
- SendTimeoutReply(Message msg, Error error) {
- setMessage(msg);
- addError(error);
- }
-
- @Override
- public Utf8String getProtocol() {
- return null;
- }
-
- @Override
- public int getType() {
- return 0;
- }
- }
-
- static private boolean isSendQFull(Result res) {
- return ! (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL);
- }
-
/**
* <p>This is a blocking proxy to the {@link #send(Message)} method. This
* method blocks until the message is accepted by the send queue. Note that
@@ -260,23 +167,19 @@ public final class SourceSession implements ReplyHandler, Runnable {
* @throws InterruptedException Thrown if the calling thread is interrupted.
*/
public Result sendBlocking(Message msg) throws InterruptedException {
- Result res = send(msg);
- if (isSendQFull(res)) {
- BlockedMessage blockedMessage = new BlockedMessage(msg);
- synchronized (lock) {
- blockedQ.add(blockedMessage);
+ while (true) {
+ Result res = send(msg);
+ if (res.isAccepted() || res.getError().getCode() != ErrorCode.SEND_QUEUE_FULL) {
+ return res;
}
- res = blockedMessage.waitComplete();
- }
- return res;
- }
-
- private void sendBlockedMessages() {
- synchronized (lock) {
- for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- success = blockedQ.element().sendOrExpire();
- if (success) {
- blockedQ.remove();
+ synchronized (lock) {
+ try {
+ blockedCount++;
+ while (!closed && !throttlePolicy.canSend(msg, pendingCount)) {
+ lock.wait(10);
+ }
+ } finally {
+ blockedCount--;
}
}
}
@@ -295,7 +198,11 @@ public final class SourceSession implements ReplyHandler, Runnable {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- sendBlockedMessages();
+ if (blockedCount < 10) {
+ lock.notifyAll();
+ } else {
+ lock.notify();
+ }
}
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,