summaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
commit751002293a4bb389c7db69a3af28b467c0f55478 (patch)
treeae2ef16f431495de3a577a1a7806dc98ca437d60 /container-messagebus
parenta9e3a26a04f795c20b4ed6ca3f8731b3fbcbe73f (diff)
A collection of code cleanup in messagebus. And a bonus of catching missing shutdown of config subscription.
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java4
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java52
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java93
3 files changed, 51 insertions, 98 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
index 9e76f1bf651..89d4aa05200 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
@@ -20,9 +20,7 @@ public class MbusClientProvider implements Provider<MbusClient> {
private final MbusClient client;
- private static MbusClient createSourceClient(
- SessionCache sessionCache,
- boolean setAllPassThrottlePolicy) {
+ private static MbusClient createSourceClient(SessionCache sessionCache, boolean setAllPassThrottlePolicy) {
final SourceSessionParams sourceSessionParams = new SourceSessionParams();
if (setAllPassThrottlePolicy) {
sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy());
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 1e60050375e..113d99f77f9 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -14,7 +14,6 @@ import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.ConfigAgent;
-import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
@@ -24,7 +23,6 @@ import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.shared.SharedDestinationSession;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
@@ -60,10 +58,6 @@ public final class SessionCache extends AbstractComponent {
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
- private final Object destinationLock = new Object();
- private final Map<String, SharedDestinationSession> destinations = new HashMap<>();
- private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator();
-
private final Object sourceLock = new Object();
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
@@ -85,6 +79,13 @@ public final class SessionCache extends AbstractComponent {
this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
}
+ public void deconstruct() {
+ if (configAgent != null) {
+ configAgent.shutdown();
+ }
+ }
+
+
private void start() {
ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
if (documentManagerConfigId != null) {
@@ -98,6 +99,7 @@ public final class SessionCache extends AbstractComponent {
configAgent.subscribe();
}
+
private boolean isStarted() {
return messageBus != null;
}
@@ -136,7 +138,7 @@ public final class SessionCache extends AbstractComponent {
(((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes."));
}
- public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
synchronized (this) {
if (!isStarted()) {
start();
@@ -145,15 +147,6 @@ public final class SessionCache extends AbstractComponent {
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
- public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
- return destinationsCreator.retain(destinationLock, destinations, p);
- }
-
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
synchronized (this) {
if (!isStarted()) {
@@ -203,27 +196,6 @@ public final class SessionCache extends AbstractComponent {
}
- private class DestinationSessionCreator
- extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> {
-
- @Override
- SharedDestinationSession create(DestinationSessionParams p) {
- log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + "");
- return messageBus.newDestinationSession(p);
- }
-
- @Override
- String buildKey(DestinationSessionParams p) {
- return p.getName();
- }
-
- @Override
- void logReuse(SharedDestinationSession session) {
- log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + "");
- }
-
- }
-
private class SourceSessionCreator
extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {
@@ -377,7 +349,6 @@ public final class SessionCache extends AbstractComponent {
}
static class UnknownThrottlePolicySignature extends ThrottlePolicySignature {
-
private final ThrottlePolicy policy;
UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
@@ -419,10 +390,7 @@ public final class SessionCache extends AbstractComponent {
@Override
public String toString() {
- return "SourceSessionKey{" +
- "timeout=" + timeout +
- ", policy=" + policy +
- '}';
+ return "SourceSessionKey{" + "timeout=" + timeout + ", policy=" + policy + '}';
}
@Override
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
index 1b814208d26..89bc5b9cecd 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
@@ -25,22 +25,16 @@ public class MbusSessionKeyTestCase {
public final void staticThrottlePolicySignature() {
final StaticThrottlePolicy base = new StaticThrottlePolicy();
final StaticThrottlePolicy other = new StaticThrottlePolicy();
- other.setMaxPendingCount(500);
- other.setMaxPendingSize(500 * 1000 * 1000);
- base.setMaxPendingCount(1);
- base.setMaxPendingSize(1000);
- final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature(
- base);
- final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature(
- other);
- assertFalse("The policies are different, but signatures are equal.",
- sigBase.equals(sigOther));
+ other.setMaxPendingCount(500).setMaxPendingSize(500 * 1000 * 1000);
+ base.setMaxPendingCount(1).setMaxPendingSize(1000);
+ final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature(base);
+ final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature(other);
+ assertFalse("The policies are different, but signatures are equal.", sigBase.equals(sigOther));
assertTrue("Sigs created from same policy evaluated as different.",
sigBase.equals(new StaticThrottlePolicySignature(base)));
other.setMaxPendingCount(1);
other.setMaxPendingSize(1000);
- assertTrue(
- "Sigs created from different policies with same settings evaluated as different.",
+ assertTrue("Sigs created from different policies with same settings evaluated as different.",
sigBase.equals(new StaticThrottlePolicySignature(other)));
}
@@ -49,52 +43,46 @@ public class MbusSessionKeyTestCase {
public final void dynamicThrottlePolicySignature() {
final DynamicThrottlePolicy base = new DynamicThrottlePolicy();
final DynamicThrottlePolicy other = new DynamicThrottlePolicy();
- base.setEfficiencyThreshold(5);
- base.setMaxPendingCount(3);
- base.setMaxPendingSize(3 * 100);
- base.setMaxThroughput(1e6);
- base.setMaxWindowSize(1e9);
- base.setMinWindowSize(1e5);
- base.setWeight(1.0);
- base.setWindowSizeBackOff(.6);
- base.setWindowSizeIncrement(500);
- other.setEfficiencyThreshold(5 + 1);
- other.setMaxPendingCount(3 + 1);
- other.setMaxPendingSize(3 * 100 + 1);
- other.setMaxThroughput(1e6 + 1);
- other.setMaxWindowSize(1e9 + 1);
- other.setMinWindowSize(1e5 + 1);
- other.setWeight(1.0 + 1);
- other.setWindowSizeBackOff(.6 + 1);
- other.setWindowSizeIncrement(500 + 1);
- final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature(
- base);
- final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature(
- other);
- assertFalse("The policies are different, but signatures are equal.",
- sigBase.equals(sigOther));
+ base.setEfficiencyThreshold(5)
+ .setMaxPendingCount(3)
+ .setMaxWindowSize(1e9)
+ .setMinWindowSize(1e5)
+ .setWeight(1.0)
+ .setWindowSizeBackOff(.6)
+ .setWindowSizeIncrement(500)
+ .setMaxThroughput(1e6)
+ .setMaxPendingSize(3 * 100);
+ other.setEfficiencyThreshold(5 + 1)
+ .setMaxPendingCount(3 + 1)
+ .setMaxThroughput(1e6 + 1)
+ .setMaxWindowSize(1e9 + 1)
+ .setMinWindowSize(1e5 + 1)
+ .setWeight(1.0 + 1)
+ .setWindowSizeBackOff(.6 + 1)
+ .setWindowSizeIncrement(500 + 1)
+ .setMaxPendingSize(3 * 100 + 1);
+ final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature(base);
+ final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature(other);
+ assertFalse("The policies are different, but signatures are equal.", sigBase.equals(sigOther));
assertTrue("Sigs created from same policy evaluated as different.",
sigBase.equals(new DynamicThrottlePolicySignature(base)));
- other.setEfficiencyThreshold(5);
- other.setMaxPendingCount(3);
- other.setMaxPendingSize(3 * 100);
- other.setMaxThroughput(1e6);
- other.setMaxWindowSize(1e9);
- other.setMinWindowSize(1e5);
- other.setWeight(1.0);
- other.setWindowSizeBackOff(.6);
- other.setWindowSizeIncrement(500);
- assertTrue(
- "Sigs created from different policies with same settings evaluated as different.",
+ other.setEfficiencyThreshold(5)
+ .setMaxPendingCount(3)
+ .setMaxThroughput(1e6)
+ .setMaxWindowSize(1e9)
+ .setMinWindowSize(1e5)
+ .setWeight(1.0)
+ .setWindowSizeBackOff(.6)
+ .setWindowSizeIncrement(500)
+ .setMaxPendingSize(3 * 100);
+ assertTrue("Sigs created from different policies with same settings evaluated as different.",
sigBase.equals(new DynamicThrottlePolicySignature(other)));
}
@Test
public final void unknownThrottlePolicySignature() {
- final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature(
- new StaticThrottlePolicy());
- final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature(
- new StaticThrottlePolicy());
+ final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature(new StaticThrottlePolicy());
+ final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature(new StaticThrottlePolicy());
assertEquals(baseSig, baseSig);
assertFalse(otherSig.equals(baseSig));
}
@@ -107,7 +95,6 @@ public class MbusSessionKeyTestCase {
final SourceSessionParams other = new SourceSessionParams();
assertEquals(new SourceSessionKey(base), new SourceSessionKey(other));
other.setTimeout(other.getTimeout() + 1);
- assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey(
- other)));
+ assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey(other)));
}
}