diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-29 11:03:57 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-29 11:03:57 +0200 |
commit | 77b683a0d5d02420cfaf3e3a1b08d52dfef144f9 (patch) | |
tree | 0c7294d609bcdbf1851fed153df76c907e1d4110 /messagebus | |
parent | 9192aa0bad75c648ebfe52b3d8fb310b084f076a (diff) |
Cleanup
Diffstat (limited to 'messagebus')
3 files changed, 66 insertions, 30 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java new file mode 100644 index 00000000000..24e177f1fbf --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.network.Network; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The combination of a messagebus and a network over which it may send data. + * + * @author bratseth + */ +public class NetworkMessageBus { + + private final Network network; + private final MessageBus messageBus; + + private final AtomicBoolean destroyed = new AtomicBoolean(false); + + public NetworkMessageBus(Network network, MessageBus messageBus) { + this.network = network; + this.messageBus = messageBus; + } + + /** Returns the contained message bus object */ + public MessageBus getMessageBus() { return messageBus; } + + /** Returns the network of this as a Network */ + public Network getNetwork() { return network; } + + /** + * Irreversibly destroys the content of this. + * + * @return whether this destroyed anything, or if it was already destroyed + */ + public boolean destroy() { + if ( destroyed.getAndSet(true)) return false; + + getMessageBus().destroy(); + return true; + } + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java index d767e197b11..cfa50a35549 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java @@ -3,6 +3,7 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.rpc.RPCNetwork; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; @@ -17,12 +18,9 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public class RPCMessageBus { +public class RPCMessageBus extends NetworkMessageBus { private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName()); - private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final MessageBus mbus; - private final RPCNetwork net; private final ConfigAgent configAgent; /** @@ -33,9 +31,16 @@ public class RPCMessageBus { * @param routingCfgId The config id for message bus routing specs. */ public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) { - net = new RPCNetwork(rpcParams); - mbus = new MessageBus(net, mbusParams); - configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", mbus); + this(mbusParams, new RPCNetwork(rpcParams), routingCfgId); + } + + private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) { + this(new MessageBus(network, mbusParams), network, routingCfgId); + } + + private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) { + super(network, messageBus); + configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus); configAgent.subscribe(); } @@ -80,33 +85,17 @@ public class RPCMessageBus { * Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies. * Even if you retain a reference to this object, all of its content is allowed to be garbage collected. * - * @return True if content existed and was destroyed. + * @return true if content existed and was destroyed. */ + @Override public boolean destroy() { - if (!destroyed.getAndSet(true)) { + boolean destroyed = super.destroy(); + if (destroyed) configAgent.shutdown(); - mbus.destroy(); - return true; - } - return false; + return destroyed; } - /** - * Returns the contained message bus object. - * - * @return Message bus. - */ - public MessageBus getMessageBus() { - return mbus; - } - - /** - * Returns the contained rpc network object. - * - * @return RPC network. - */ - public RPCNetwork getRPCNetwork() { - return net; - } + /** Returns the network of this as a RPCNetwork */ + public RPCNetwork getRPCNetwork() { return (RPCNetwork)getNetwork(); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index bbbb49e4370..78cf352cfbf 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -32,6 +32,10 @@ public class LocalNetwork implements Network { private final String hostId; private volatile NetworkOwner owner; + public LocalNetwork() { + this(new LocalWire()); + } + public LocalNetwork(LocalWire wire) { this.wire = wire; this.hostId = wire.newHostId(); |