summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
commit751002293a4bb389c7db69a3af28b467c0f55478 (patch)
treeae2ef16f431495de3a577a1a7806dc98ca437d60 /messagebus
parenta9e3a26a04f795c20b4ed6ca3f8731b3fbcbe73f (diff)
A collection of code cleanup in messagebus. And a bonus of catching missing shutdown of config subscription.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/abi-spec.json2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/CallStack.java3
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/ConfigAgent.java1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java4
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java4
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/MessageHandler.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Trace.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/TraceNode.java6
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java4
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTarget.java12
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java14
14 files changed, 25 insertions, 35 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index e8e752127b1..876bf4a6960 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -114,7 +114,7 @@
"public void <init>(com.yahoo.concurrent.Timer)",
"public double getWindowSizeIncrement()",
"public double getWindowSizeBackOff()",
- "public void setMaxThroughput(double)",
+ "public com.yahoo.messagebus.DynamicThrottlePolicy setMaxThroughput(double)",
"public boolean canSend(com.yahoo.messagebus.Message, int)",
"public void processMessage(com.yahoo.messagebus.Message)",
"public void processReply(com.yahoo.messagebus.Reply)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/CallStack.java b/messagebus/src/main/java/com/yahoo/messagebus/CallStack.java
index 4a280317692..717d2a97fe2 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/CallStack.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/CallStack.java
@@ -3,7 +3,6 @@ package com.yahoo.messagebus;
import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.Stack;
/**
* An wrapper around a stack of frame objects that is aware of the message that owns it. It contains functionality to
@@ -75,7 +74,7 @@ public class CallStack {
private final ReplyHandler handler;
private final Object context;
- public StackFrame(ReplyHandler handler, Object context) {
+ StackFrame(ReplyHandler handler, Object context) {
this.handler = handler;
this.context = context;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ConfigAgent.java b/messagebus/src/main/java/com/yahoo/messagebus/ConfigAgent.java
index c88d3f1308d..f0c7d712f8d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/ConfigAgent.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ConfigAgent.java
@@ -3,7 +3,6 @@ package com.yahoo.messagebus;
import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.config.subscription.ConfigURI;
-import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.routing.HopSpec;
import com.yahoo.messagebus.routing.RouteSpec;
import com.yahoo.messagebus.routing.RoutingSpec;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
index c0b1ee179c6..1f5d60a630a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
@@ -1,10 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import com.yahoo.log.LogLevel;
-
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
/**
* A session supporting receiving and replying to messages. A destination is expected to reply to every message
@@ -14,7 +11,6 @@ import java.util.logging.Logger;
*/
public final class DestinationSession implements MessageHandler {
- private static Logger log = Logger.getLogger(DestinationSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
private final boolean broadcastName;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index dae20543b34..525d7ae8867 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -59,8 +59,9 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
return windowSizeBackOff;
}
- public void setMaxThroughput(double maxThroughput) {
+ public DynamicThrottlePolicy setMaxThroughput(double maxThroughput) {
this.maxThroughput = maxThroughput;
+ return this;
}
@Override
@@ -112,7 +113,6 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
double efficiency = throughput*period/windowSize;
if (efficiency < efficiencyThreshold) {
- double newSize = Math.min(windowSize,throughput * period);
windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement);
localMaxThroughput = 0;
} else {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
index 75534605153..8286404aa23 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
@@ -2,7 +2,6 @@
package com.yahoo.messagebus;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
/**
* A session which supports receiving, forwarding and acknowledgement of messages. An intermediate session is expacted
@@ -12,7 +11,6 @@ import java.util.logging.Logger;
*/
public final class IntermediateSession implements MessageHandler, ReplyHandler {
- private static final Logger log = Logger.getLogger(IntermediateSession.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
private final boolean broadcastName;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
index 06b2af7ea92..7f55401cf43 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBusParams.java
@@ -15,7 +15,7 @@ import java.util.List;
*/
public class MessageBusParams {
- private final List<Protocol> protocols = new ArrayList<Protocol>();
+ private final List<Protocol> protocols = new ArrayList<>();
private RetryPolicy retryPolicy;
private int maxPendingCount;
private int maxPendingSize;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageHandler.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageHandler.java
index 3f18e36b076..5f90a220605 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageHandler.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageHandler.java
@@ -15,5 +15,5 @@ public interface MessageHandler {
*
* @param message The message that arrived.
*/
- public void handleMessage(Message message);
+ void handleMessage(Message message);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
index df60fc5bdbf..ddddaf23299 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
@@ -24,7 +24,7 @@ public class SendProxy implements MessageHandler, ReplyHandler {
private final Resender resender;
private Message msg = null;
private boolean logTrace = false;
- private long sendTime = 0;
+ private final long sendTime;
/**
* Constructs a new instance of this class to maintain sending of a single message.
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Trace.java b/messagebus/src/main/java/com/yahoo/messagebus/Trace.java
index 85b066318ff..87f58f6ce05 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Trace.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Trace.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import java.util.Date;
-
/**
* A Trace object contains ad-hoc string notes organized in a strict-loose tree. A Trace object consists of a trace
* level indicating which trace notes should be included and a TraceTree object containing the tree structure and
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/TraceNode.java b/messagebus/src/main/java/com/yahoo/messagebus/TraceNode.java
index 7b40b683227..38963645d54 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/TraceNode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/TraceNode.java
@@ -25,7 +25,7 @@ public class TraceNode implements Comparable<TraceNode> {
private TraceNode parent = null;
private boolean strict = true;
private String note = null;
- private List<TraceNode> children = new ArrayList<TraceNode>();
+ private List<TraceNode> children = new ArrayList<>();
/**
* Create an empty trace tree.
@@ -48,7 +48,7 @@ public class TraceNode implements Comparable<TraceNode> {
*
* @param rhs The tree to copy.
*/
- TraceNode(TraceNode rhs) {
+ private TraceNode(TraceNode rhs) {
strict = rhs.strict;
note = rhs.note;
addChildren(rhs.children);
@@ -148,7 +148,7 @@ public class TraceNode implements Comparable<TraceNode> {
return this;
}
List<TraceNode> tmp = this.children;
- this.children = new ArrayList<TraceNode>();
+ this.children = new ArrayList<>();
for (TraceNode child : tmp) {
child.compact();
if (child.isEmpty()) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
index daa31ae2701..1cc45eeb2d8 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -113,7 +113,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
private void doRequestDone(Request req) {
SendContext ctx = (SendContext)req.getContext();
String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
- Reply reply = null;
+ Reply reply;
Error error = null;
if (!req.checkReturnTypes(getReturnSpec())) {
// Map all known JRT errors to the appropriate message bus error.
@@ -269,7 +269,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
final Request request;
final Version version;
- public ReplyContext(Request request, Version version) {
+ ReplyContext(Request request, Version version) {
this.request = request;
this.version = version;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTarget.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTarget.java
index 6179097e5d9..07a33a646d9 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTarget.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTarget.java
@@ -43,7 +43,7 @@ public class RPCTarget implements RequestWaiter {
*
* @return The target.
*/
- public Target getJRTTarget() {
+ Target getJRTTarget() {
return target;
}
@@ -54,7 +54,7 @@ public class RPCTarget implements RequestWaiter {
*
* @see #subRef()
*/
- public void addRef() {
+ void addRef() {
ref.incrementAndGet();
}
@@ -65,7 +65,7 @@ public class RPCTarget implements RequestWaiter {
*
* @see #addRef()
*/
- public void subRef() {
+ void subRef() {
if (ref.decrementAndGet() == 0) {
target.close();
}
@@ -77,7 +77,7 @@ public class RPCTarget implements RequestWaiter {
*
* @return The number of references in use.
*/
- public int getRefCount() {
+ int getRefCount() {
return ref.get();
}
@@ -90,7 +90,7 @@ public class RPCTarget implements RequestWaiter {
* @param timeout The timeout for the request in seconds.
* @param handler The handler to be called once the version is available.
*/
- public void resolveVersion(double timeout, VersionHandler handler) {
+ void resolveVersion(double timeout, VersionHandler handler) {
boolean hasVersion = false;
boolean shouldInvoke = false;
boolean shouldLog = log.isLoggable(LogLevel.DEBUG);
@@ -168,6 +168,6 @@ public class RPCTarget implements RequestWaiter {
*
* @param ver The version of corresponding target, or null.
*/
- public void handleVersion(Version ver);
+ void handleVersion(Version ver);
}
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
index 432524ea2a9..f6185f1d410 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
@@ -133,8 +133,8 @@ public class ThrottlerTestCase {
CustomTimer timer = new CustomTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
- policy.setWindowSizeIncrement(5);
- policy.setResizeRate(1);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, timer, 100);
assertTrue(windowSize >= 90 && windowSize <= 110);
@@ -157,8 +157,8 @@ public class ThrottlerTestCase {
CustomTimer timer = new CustomTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
- policy.setWindowSizeIncrement(5);
- policy.setResizeRate(1);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1);
double windowSize = getWindowSize(policy, timer, 100);
assertTrue(windowSize >= 90 && windowSize <= 110);
@@ -183,9 +183,9 @@ public class ThrottlerTestCase {
CustomTimer timer = new CustomTimer();
DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
- policy.setWindowSizeIncrement(5);
- policy.setResizeRate(1);
- policy.setMinWindowSize(150);
+ policy.setWindowSizeIncrement(5)
+ .setResizeRate(1)
+ .setMinWindowSize(150);
double windowSize = getWindowSize(policy, timer, 200);
assertTrue(windowSize >= 150 && windowSize <= 210);