summaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-12 15:46:04 +0200
commit751002293a4bb389c7db69a3af28b467c0f55478 (patch)
treeae2ef16f431495de3a577a1a7806dc98ca437d60 /container-messagebus/src/main/java
parenta9e3a26a04f795c20b4ed6ca3f8731b3fbcbe73f (diff)
A collection of code cleanup in messagebus. And a bonus of catching missing shutdown of config subscription.
Diffstat (limited to 'container-messagebus/src/main/java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java4
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java52
2 files changed, 11 insertions, 45 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
index 9e76f1bf651..89d4aa05200 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
@@ -20,9 +20,7 @@ public class MbusClientProvider implements Provider<MbusClient> {
private final MbusClient client;
- private static MbusClient createSourceClient(
- SessionCache sessionCache,
- boolean setAllPassThrottlePolicy) {
+ private static MbusClient createSourceClient(SessionCache sessionCache, boolean setAllPassThrottlePolicy) {
final SourceSessionParams sourceSessionParams = new SourceSessionParams();
if (setAllPassThrottlePolicy) {
sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy());
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 1e60050375e..113d99f77f9 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -14,7 +14,6 @@ import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.ConfigAgent;
-import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
@@ -24,7 +23,6 @@ import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.shared.SharedDestinationSession;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
@@ -60,10 +58,6 @@ public final class SessionCache extends AbstractComponent {
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
- private final Object destinationLock = new Object();
- private final Map<String, SharedDestinationSession> destinations = new HashMap<>();
- private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator();
-
private final Object sourceLock = new Object();
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
@@ -85,6 +79,13 @@ public final class SessionCache extends AbstractComponent {
this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
}
+ public void deconstruct() {
+ if (configAgent != null) {
+ configAgent.shutdown();
+ }
+ }
+
+
private void start() {
ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
if (documentManagerConfigId != null) {
@@ -98,6 +99,7 @@ public final class SessionCache extends AbstractComponent {
configAgent.subscribe();
}
+
private boolean isStarted() {
return messageBus != null;
}
@@ -136,7 +138,7 @@ public final class SessionCache extends AbstractComponent {
(((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes."));
}
- public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
synchronized (this) {
if (!isStarted()) {
start();
@@ -145,15 +147,6 @@ public final class SessionCache extends AbstractComponent {
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
- public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
- return destinationsCreator.retain(destinationLock, destinations, p);
- }
-
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
synchronized (this) {
if (!isStarted()) {
@@ -203,27 +196,6 @@ public final class SessionCache extends AbstractComponent {
}
- private class DestinationSessionCreator
- extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> {
-
- @Override
- SharedDestinationSession create(DestinationSessionParams p) {
- log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + "");
- return messageBus.newDestinationSession(p);
- }
-
- @Override
- String buildKey(DestinationSessionParams p) {
- return p.getName();
- }
-
- @Override
- void logReuse(SharedDestinationSession session) {
- log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + "");
- }
-
- }
-
private class SourceSessionCreator
extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {
@@ -377,7 +349,6 @@ public final class SessionCache extends AbstractComponent {
}
static class UnknownThrottlePolicySignature extends ThrottlePolicySignature {
-
private final ThrottlePolicy policy;
UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
@@ -419,10 +390,7 @@ public final class SessionCache extends AbstractComponent {
@Override
public String toString() {
- return "SourceSessionKey{" +
- "timeout=" + timeout +
- ", policy=" + policy +
- '}';
+ return "SourceSessionKey{" + "timeout=" + timeout + ", policy=" + policy + '}';
}
@Override