summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-09-27 14:09:40 +0200
committerGitHub <noreply@github.com>2021-09-27 14:09:40 +0200
commite23ba7cc2020e5949543aa4db1af8b23373cb9df (patch)
tree3473a584f3705fa3b67ebe45b9e742e19914e40e /messagebus
parentf06a5e8921ea16da7b64591223e4533f04e858df (diff)
parentf144f291504fe451e5220257ecb2b6dcbd30fecc (diff)
Merge pull request #19285 from vespa-engine/jonmv/shut-down-RPC-when-no-message-bus-owner-remain
Shut down network when all owners also have detached
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java35
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java6
2 files changed, 25 insertions, 16 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index e75b5d0934a..4bd9bbf7f7f 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -7,10 +7,9 @@ import com.yahoo.text.Utf8Array;
import java.util.Deque;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
@@ -33,15 +32,16 @@ public class NetworkMultiplexer implements NetworkOwner {
private final Network net;
private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>();
private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>();
- private final boolean shared;
+ private final AtomicBoolean disowned;
private NetworkMultiplexer(Network net, boolean shared) {
net.attach(this);
this.net = net;
- this.shared = shared;
+ this.disowned = new AtomicBoolean( ! shared);
}
- /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */
+ /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s,
+ * and will shut down when all these have detached, and {@link #disown()} has been called, in any order. */
public static NetworkMultiplexer shared(Network net) {
return new NetworkMultiplexer(net, true);
}
@@ -100,6 +100,7 @@ public class NetworkMultiplexer implements NetworkOwner {
owner.deliverMessage(message, session);
}
+ /** Attach the network owner to this, allowing this to forward messages to it. */
public void attach(NetworkOwner owner) {
if (owners.contains(owner))
throw new IllegalArgumentException(owner + " is already attached to this");
@@ -107,23 +108,27 @@ public class NetworkMultiplexer implements NetworkOwner {
owners.add(owner);
}
+ /** Detach the network owner from this, no longer allowing messages to it, and shutting down this is ownerless. */
public void detach(NetworkOwner owner) {
if ( ! owners.remove(owner))
throw new IllegalArgumentException(owner + " not attached to this");
- if ( ! shared && owners.isEmpty())
- net.shutdown();
+ destroyIfOwnerless();
}
- public void destroy() {
- if ( ! shared)
- throw new UnsupportedOperationException("Destroy called on a dedicated multiplexer; " +
- "this automatically shuts down when detached from");
+ /** Signal that external ownership of this is relinquished, allowing destruction on last owner detachment. */
+ public void disown() {
+ if (disowned.getAndSet(true))
+ throw new IllegalStateException("Destroy called on a dedicated multiplexer--" +
+ "this automatically shuts down when detached from--or " +
+ "called multiple times on a shared multiplexer");
- if ( ! owners.isEmpty())
- log.warning("NetworkMultiplexer destroyed before all owners detached: " + this);
+ destroyIfOwnerless();
+ }
- net.shutdown();
+ private void destroyIfOwnerless() {
+ if (disowned.get() && owners.isEmpty())
+ net.shutdown();
}
public Network net() {
@@ -136,7 +141,7 @@ public class NetworkMultiplexer implements NetworkOwner {
"net=" + net +
", owners=" + owners +
", sessions=" + sessions +
- ", shared=" + shared +
+ ", destructible=" + disowned +
'}';
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
index 3bf754f29c7..6451a692fb6 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -86,7 +86,11 @@ public class NetworkMultiplexerTest {
shared.detach(owner2);
assertFalse(net.shutDown.get());
- shared.destroy();
+ shared.attach(owner2);
+ shared.disown();
+ assertFalse(net.shutDown.get());
+
+ shared.detach(owner2);
assertTrue(net.shutDown.get());
}