diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-12 15:46:04 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-12 15:46:04 +0200 |
commit | 751002293a4bb389c7db69a3af28b467c0f55478 (patch) | |
tree | ae2ef16f431495de3a577a1a7806dc98ca437d60 /container-messagebus | |
parent | a9e3a26a04f795c20b4ed6ca3f8731b3fbcbe73f (diff) |
A collection of code cleanup in messagebus. And a bonus of catching missing shutdown of config subscription.
Diffstat (limited to 'container-messagebus')
3 files changed, 51 insertions, 98 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java index 9e76f1bf651..89d4aa05200 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java @@ -20,9 +20,7 @@ public class MbusClientProvider implements Provider<MbusClient> { private final MbusClient client; - private static MbusClient createSourceClient( - SessionCache sessionCache, - boolean setAllPassThrottlePolicy) { + private static MbusClient createSourceClient(SessionCache sessionCache, boolean setAllPassThrottlePolicy) { final SourceSessionParams sourceSessionParams = new SourceSessionParams(); if (setAllPassThrottlePolicy) { sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy()); diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 1e60050375e..113d99f77f9 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -14,7 +14,6 @@ import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.SharedResource; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.ConfigAgent; -import com.yahoo.messagebus.DestinationSessionParams; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.IntermediateSessionParams; import com.yahoo.messagebus.MessageBusParams; @@ -24,7 +23,6 @@ import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.shared.SharedDestinationSession; import com.yahoo.messagebus.shared.SharedIntermediateSession; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; @@ -60,10 +58,6 @@ public final class SessionCache extends AbstractComponent { private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>(); private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator(); - private final Object destinationLock = new Object(); - private final Map<String, SharedDestinationSession> destinations = new HashMap<>(); - private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator(); - private final Object sourceLock = new Object(); private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>(); private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); @@ -85,6 +79,13 @@ public final class SessionCache extends AbstractComponent { this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager()); } + public void deconstruct() { + if (configAgent != null) { + configAgent.shutdown(); + } + } + + private void start() { ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId); if (documentManagerConfigId != null) { @@ -98,6 +99,7 @@ public final class SessionCache extends AbstractComponent { configAgent.subscribe(); } + private boolean isStarted() { return messageBus != null; } @@ -136,7 +138,7 @@ public final class SessionCache extends AbstractComponent { (((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes.")); } - public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) { + ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) { synchronized (this) { if (!isStarted()) { start(); @@ -145,15 +147,6 @@ public final class SessionCache extends AbstractComponent { return intermediatesCreator.retain(intermediateLock, intermediates, p); } - public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) { - synchronized (this) { - if (!isStarted()) { - start(); - } - } - return destinationsCreator.retain(destinationLock, destinations, p); - } - public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) { synchronized (this) { if (!isStarted()) { @@ -203,27 +196,6 @@ public final class SessionCache extends AbstractComponent { } - private class DestinationSessionCreator - extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> { - - @Override - SharedDestinationSession create(DestinationSessionParams p) { - log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + ""); - return messageBus.newDestinationSession(p); - } - - @Override - String buildKey(DestinationSessionParams p) { - return p.getName(); - } - - @Override - void logReuse(SharedDestinationSession session) { - log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + ""); - } - - } - private class SourceSessionCreator extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> { @@ -377,7 +349,6 @@ public final class SessionCache extends AbstractComponent { } static class UnknownThrottlePolicySignature extends ThrottlePolicySignature { - private final ThrottlePolicy policy; UnknownThrottlePolicySignature(final ThrottlePolicy policy) { @@ -419,10 +390,7 @@ public final class SessionCache extends AbstractComponent { @Override public String toString() { - return "SourceSessionKey{" + - "timeout=" + timeout + - ", policy=" + policy + - '}'; + return "SourceSessionKey{" + "timeout=" + timeout + ", policy=" + policy + '}'; } @Override diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java index 1b814208d26..89bc5b9cecd 100644 --- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java +++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java @@ -25,22 +25,16 @@ public class MbusSessionKeyTestCase { public final void staticThrottlePolicySignature() { final StaticThrottlePolicy base = new StaticThrottlePolicy(); final StaticThrottlePolicy other = new StaticThrottlePolicy(); - other.setMaxPendingCount(500); - other.setMaxPendingSize(500 * 1000 * 1000); - base.setMaxPendingCount(1); - base.setMaxPendingSize(1000); - final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature( - base); - final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature( - other); - assertFalse("The policies are different, but signatures are equal.", - sigBase.equals(sigOther)); + other.setMaxPendingCount(500).setMaxPendingSize(500 * 1000 * 1000); + base.setMaxPendingCount(1).setMaxPendingSize(1000); + final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature(base); + final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature(other); + assertFalse("The policies are different, but signatures are equal.", sigBase.equals(sigOther)); assertTrue("Sigs created from same policy evaluated as different.", sigBase.equals(new StaticThrottlePolicySignature(base))); other.setMaxPendingCount(1); other.setMaxPendingSize(1000); - assertTrue( - "Sigs created from different policies with same settings evaluated as different.", + assertTrue("Sigs created from different policies with same settings evaluated as different.", sigBase.equals(new StaticThrottlePolicySignature(other))); } @@ -49,52 +43,46 @@ public class MbusSessionKeyTestCase { public final void dynamicThrottlePolicySignature() { final DynamicThrottlePolicy base = new DynamicThrottlePolicy(); final DynamicThrottlePolicy other = new DynamicThrottlePolicy(); - base.setEfficiencyThreshold(5); - base.setMaxPendingCount(3); - base.setMaxPendingSize(3 * 100); - base.setMaxThroughput(1e6); - base.setMaxWindowSize(1e9); - base.setMinWindowSize(1e5); - base.setWeight(1.0); - base.setWindowSizeBackOff(.6); - base.setWindowSizeIncrement(500); - other.setEfficiencyThreshold(5 + 1); - other.setMaxPendingCount(3 + 1); - other.setMaxPendingSize(3 * 100 + 1); - other.setMaxThroughput(1e6 + 1); - other.setMaxWindowSize(1e9 + 1); - other.setMinWindowSize(1e5 + 1); - other.setWeight(1.0 + 1); - other.setWindowSizeBackOff(.6 + 1); - other.setWindowSizeIncrement(500 + 1); - final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature( - base); - final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature( - other); - assertFalse("The policies are different, but signatures are equal.", - sigBase.equals(sigOther)); + base.setEfficiencyThreshold(5) + .setMaxPendingCount(3) + .setMaxWindowSize(1e9) + .setMinWindowSize(1e5) + .setWeight(1.0) + .setWindowSizeBackOff(.6) + .setWindowSizeIncrement(500) + .setMaxThroughput(1e6) + .setMaxPendingSize(3 * 100); + other.setEfficiencyThreshold(5 + 1) + .setMaxPendingCount(3 + 1) + .setMaxThroughput(1e6 + 1) + .setMaxWindowSize(1e9 + 1) + .setMinWindowSize(1e5 + 1) + .setWeight(1.0 + 1) + .setWindowSizeBackOff(.6 + 1) + .setWindowSizeIncrement(500 + 1) + .setMaxPendingSize(3 * 100 + 1); + final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature(base); + final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature(other); + assertFalse("The policies are different, but signatures are equal.", sigBase.equals(sigOther)); assertTrue("Sigs created from same policy evaluated as different.", sigBase.equals(new DynamicThrottlePolicySignature(base))); - other.setEfficiencyThreshold(5); - other.setMaxPendingCount(3); - other.setMaxPendingSize(3 * 100); - other.setMaxThroughput(1e6); - other.setMaxWindowSize(1e9); - other.setMinWindowSize(1e5); - other.setWeight(1.0); - other.setWindowSizeBackOff(.6); - other.setWindowSizeIncrement(500); - assertTrue( - "Sigs created from different policies with same settings evaluated as different.", + other.setEfficiencyThreshold(5) + .setMaxPendingCount(3) + .setMaxThroughput(1e6) + .setMaxWindowSize(1e9) + .setMinWindowSize(1e5) + .setWeight(1.0) + .setWindowSizeBackOff(.6) + .setWindowSizeIncrement(500) + .setMaxPendingSize(3 * 100); + assertTrue("Sigs created from different policies with same settings evaluated as different.", sigBase.equals(new DynamicThrottlePolicySignature(other))); } @Test public final void unknownThrottlePolicySignature() { - final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature( - new StaticThrottlePolicy()); - final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature( - new StaticThrottlePolicy()); + final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature(new StaticThrottlePolicy()); + final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature(new StaticThrottlePolicy()); assertEquals(baseSig, baseSig); assertFalse(otherSig.equals(baseSig)); } @@ -107,7 +95,6 @@ public class MbusSessionKeyTestCase { final SourceSessionParams other = new SourceSessionParams(); assertEquals(new SourceSessionKey(base), new SourceSessionKey(other)); other.setTimeout(other.getTimeout() + 1); - assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey( - other))); + assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey(other))); } } |