diff options
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index ad06aed43ec..53fd49dcf58 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -34,7 +34,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked void dec() { count --; } boolean enough() { return count > 5; } } - private static ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(Counter::new); + private static final ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(Counter::new); /** * The default constructor requires values for all final member variables @@ -139,13 +139,15 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked if (closed) { return new Result(ErrorCode.SEND_QUEUE_CLOSED, "Source session is closed."); } - if (throttlePolicy != null && ! throttlePolicy.canSend(message, pendingCount)) { - return new Result(ErrorCode.SEND_QUEUE_FULL, - "Too much pending data (" + pendingCount + " messages)."); - } - message.pushHandler(replyHandler); if (throttlePolicy != null) { + if (! throttlePolicy.canSend(message, pendingCount)) { + return new Result(ErrorCode.SEND_QUEUE_FULL, + "Too much pending data (" + pendingCount + " messages)."); + } + message.pushHandler(replyHandler); throttlePolicy.processMessage(message); + } else { + message.pushHandler(replyHandler); } ++pendingCount; } |