diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-01-03 12:24:44 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-01-03 12:24:44 +0100 |
commit | acaed07c3d10f2109935ed5d83bd5c415ab2d943 (patch) | |
tree | bce4e236700ea46ef68a1f8e997fe42b95dc849a /messagebus | |
parent | 11eb110513640129fc617f67d2e3322fb0fb8674 (diff) |
We have relied on dynamic throttling for 12 years or so.
Time to let the old one go.
Diffstat (limited to 'messagebus')
3 files changed, 12 insertions, 67 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 0c3f3168568..4b911d7c38e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -72,7 +72,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, private final Messenger msn; private final Resender resender; private int maxPendingCount; - private int maxPendingSize; private int pendingCount = 0; private int pendingSize = 0; private final Thread careTaker = new Thread(this::sendBlockedMessages); @@ -142,7 +141,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, public MessageBus(NetworkMultiplexer net, MessageBusParams params) { // Add all known protocols to the repository. maxPendingCount = params.getMaxPendingCount(); - maxPendingSize = params.getMaxPendingSize(); for (int i = 0, len = params.getNumProtocols(); i < len; ++i) { protocolRepository.putProtocol(params.getProtocol(i)); } @@ -375,7 +373,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } private boolean doAccounting() { - return (maxPendingCount > 0 || maxPendingSize > 0); + return (maxPendingCount > 0); } /** * <p>This method handles choking input data so that message bus does not @@ -392,8 +390,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, if (doAccounting()) { synchronized (this) { - busy = ((maxPendingCount > 0 && pendingCount >= maxPendingCount) || - (maxPendingSize > 0 && pendingSize >= maxPendingSize)); + busy = (maxPendingCount > 0 && pendingCount >= maxPendingCount); if (!busy) { pendingCount++; pendingSize += size; @@ -487,7 +484,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * * @return The resender. */ - @Deprecated // Remove on 9 + @Deprecated (forRemoval = true)// Remove on 9 public Resender getResender() { return resender; } @@ -520,7 +517,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * * @param maxCount The max count. */ - @Deprecated // Remove on 9 + @Deprecated(forRemoval = true) // Remove on 9 public void setMaxPendingCount(int maxCount) { maxPendingCount = maxCount; } @@ -529,7 +526,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * Gets maximum number of messages that can be received without being * replied to yet. */ - @Deprecated // Remove on 9 + @Deprecated (forRemoval = true)// Remove on 9 public int getMaxPendingCount() { return maxPendingCount; } @@ -540,18 +537,18 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * * @param maxSize The max size. */ - @Deprecated // Remove on 9 + @Deprecated (forRemoval = true)// Remove on 9 public void setMaxPendingSize(int maxSize) { - maxPendingSize = maxSize; + } /** * Gets maximum combined size of messages that can be received without * being replied to yet. */ - @Deprecated // Remove on 9 + @Deprecated (forRemoval = true)// Remove on 9 public int getMaxPendingSize() { - return maxPendingSize; + return Integer.MAX_VALUE; } /** diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java index 1b18178d638..198c562e26a 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java @@ -18,7 +18,6 @@ public class MessageBusParams { private final List<Protocol> protocols = new ArrayList<>(); private RetryPolicy retryPolicy; private int maxPendingCount; - private int maxPendingSize; private MessagebusConfig config; /** @@ -27,7 +26,6 @@ public class MessageBusParams { public MessageBusParams() { retryPolicy = new RetryTransientErrorsPolicy(); maxPendingCount = 1024; - maxPendingSize = 128 * 1024 * 1024; config = null; } @@ -40,7 +38,6 @@ public class MessageBusParams { protocols.addAll(params.protocols); retryPolicy = params.retryPolicy; maxPendingCount = params.maxPendingCount; - maxPendingSize = params.maxPendingSize; config = params.config; } @@ -132,8 +129,9 @@ public class MessageBusParams { * * @return The size limit. */ + @Deprecated(forRemoval = true) public int getMaxPendingSize() { - return maxPendingSize; + return Integer.MAX_VALUE; } /** @@ -142,8 +140,8 @@ public class MessageBusParams { * @param maxSize The size limit to set. * @return This, to allow chaining. */ + @Deprecated(forRemoval = true) public MessageBusParams setMaxPendingSize(int maxSize) { - this.maxPendingSize = maxSize; return this; } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java index a8a1fc33284..ff9fc099923 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java @@ -99,56 +99,6 @@ public class ChokeTestCase { assertEquals(0, dstServer.mb.getPendingCount()); } - @Test - @SuppressWarnings("deprecation") - void testMaxSize() { - int size = createMessage("msg").getApproxSize(); - int max = size * 10; - dstServer.mb.setMaxPendingSize(max); - List<Message> lst = new ArrayList<>(); - for (int i = 0; i < max * 2; i += size) { - if (i < max) { - assertEquals(i, dstServer.mb.getPendingSize()); - } else { - assertEquals(max, dstServer.mb.getPendingSize()); - } - assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); - if (i < max) { - Message msg = ((Receptor) dstSession.getMessageHandler()).getMessage(60); - assertNotNull(msg); - lst.add(msg); - } else { - Reply reply = ((Receptor) srcSession.getReplyHandler()).getReply(60); - assertNotNull(reply); - assertEquals(1, reply.getNumErrors()); - assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); - } - } - for (int i = 0; i < 5; ++i) { - Message msg = lst.remove(0); - dstSession.acknowledge(msg); - - Reply reply = ((Receptor) srcSession.getReplyHandler()).getReply(60); - assertNotNull(reply); - assertFalse(reply.hasErrors()); - assertNotNull(msg = reply.getMessage()); - assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted()); - - assertNotNull(msg = ((Receptor) dstSession.getMessageHandler()).getMessage(60)); - lst.add(msg); - } - while (!lst.isEmpty()) { - assertEquals(size * lst.size(), dstServer.mb.getPendingSize()); - Message msg = lst.remove(0); - dstSession.acknowledge(msg); - - Reply reply = ((Receptor) srcSession.getReplyHandler()).getReply(60); - assertNotNull(reply); - assertFalse(reply.hasErrors()); - } - assertEquals(0, dstServer.mb.getPendingSize()); - } - private static Message createMessage(String msg) { Message ret = new SimpleMessage(msg); ret.getTrace().setLevel(9); |