aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-09-29 11:03:57 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-09-29 11:03:57 +0200
commit77b683a0d5d02420cfaf3e3a1b08d52dfef144f9 (patch)
tree0c7294d609bcdbf1851fed153df76c907e1d4110 /messagebus
parent9192aa0bad75c648ebfe52b3d8fb310b084f076a (diff)
Cleanup
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java43
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java49
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java4
3 files changed, 66 insertions, 30 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
new file mode 100644
index 00000000000..24e177f1fbf
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.network.Network;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The combination of a messagebus and a network over which it may send data.
+ *
+ * @author bratseth
+ */
+public class NetworkMessageBus {
+
+ private final Network network;
+ private final MessageBus messageBus;
+
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+ public NetworkMessageBus(Network network, MessageBus messageBus) {
+ this.network = network;
+ this.messageBus = messageBus;
+ }
+
+ /** Returns the contained message bus object */
+ public MessageBus getMessageBus() { return messageBus; }
+
+ /** Returns the network of this as a Network */
+ public Network getNetwork() { return network; }
+
+ /**
+ * Irreversibly destroys the content of this.
+ *
+ * @return whether this destroyed anything, or if it was already destroyed
+ */
+ public boolean destroy() {
+ if ( destroyed.getAndSet(true)) return false;
+
+ getMessageBus().destroy();
+ return true;
+ }
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
index d767e197b11..cfa50a35549 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
@@ -3,6 +3,7 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,12 +18,9 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCMessageBus {
+public class RPCMessageBus extends NetworkMessageBus {
private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName());
- private final AtomicBoolean destroyed = new AtomicBoolean(false);
- private final MessageBus mbus;
- private final RPCNetwork net;
private final ConfigAgent configAgent;
/**
@@ -33,9 +31,16 @@ public class RPCMessageBus {
* @param routingCfgId The config id for message bus routing specs.
*/
public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) {
- net = new RPCNetwork(rpcParams);
- mbus = new MessageBus(net, mbusParams);
- configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", mbus);
+ this(mbusParams, new RPCNetwork(rpcParams), routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) {
+ this(new MessageBus(network, mbusParams), network, routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) {
+ super(network, messageBus);
+ configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus);
configAgent.subscribe();
}
@@ -80,33 +85,17 @@ public class RPCMessageBus {
* Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies.
* Even if you retain a reference to this object, all of its content is allowed to be garbage collected.
*
- * @return True if content existed and was destroyed.
+ * @return true if content existed and was destroyed.
*/
+ @Override
public boolean destroy() {
- if (!destroyed.getAndSet(true)) {
+ boolean destroyed = super.destroy();
+ if (destroyed)
configAgent.shutdown();
- mbus.destroy();
- return true;
- }
- return false;
+ return destroyed;
}
- /**
- * Returns the contained message bus object.
- *
- * @return Message bus.
- */
- public MessageBus getMessageBus() {
- return mbus;
- }
-
- /**
- * Returns the contained rpc network object.
- *
- * @return RPC network.
- */
- public RPCNetwork getRPCNetwork() {
- return net;
- }
+ /** Returns the network of this as a RPCNetwork */
+ public RPCNetwork getRPCNetwork() { return (RPCNetwork)getNetwork(); }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index bbbb49e4370..78cf352cfbf 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -32,6 +32,10 @@ public class LocalNetwork implements Network {
private final String hostId;
private volatile NetworkOwner owner;
+ public LocalNetwork() {
+ this(new LocalWire());
+ }
+
public LocalNetwork(LocalWire wire) {
this.wire = wire;
this.hostId = wire.newHostId();