summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-16 14:01:09 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commitc5b3b3400d8cdd533008148cab04a1da0838bae1 (patch)
treebbbf82bb251a829aa2eec3be57609176f342b9de /messagebus
parentb919b72514821cb98028af232fab2eaec6bf414a (diff)
Unit test for multiplexer
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java187
1 files changed, 187 insertions, 0 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
new file mode 100644
index 00000000000..81b6e4cac27
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -0,0 +1,187 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.text.Utf8Array;
+import com.yahoo.text.Utf8String;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author jonmv
+ */
+public class NetworkMultiplexerTest {
+
+ @Test
+ public void testShared() {
+ MockNetwork net = new MockNetwork();
+ MockOwner owner1 = new MockOwner();
+ MockOwner owner2 = new MockOwner();
+ NetworkMultiplexer shared = NetworkMultiplexer.shared(net);
+ assertEquals(Set.of(shared), net.attached);
+ assertEquals(Set.of(), net.registered);
+ assertFalse(net.shutDown.get());
+
+ shared.attach(owner1);
+ shared.registerSession("s1", owner1);
+ try {
+ shared.registerSession("s1", owner1);
+ fail("Illegal to register same session multiple times with the same owner");
+ }
+ catch (IllegalArgumentException expected) {
+ assertEquals("Session 's1' with owner 'mock owner' already registered with this", expected.getMessage());
+ }
+ assertEquals(Set.of("s1"), net.registered);
+
+ shared.attach(owner2);
+ shared.registerSession("s1", owner2);
+ shared.registerSession("s2", owner2);
+ assertEquals(Set.of("s1", "s2"), net.registered);
+
+ Utf8String name = new Utf8String("protocol");
+ Protocol protocol1 = new SimpleProtocol();
+ Protocol protocol2 = new SimpleProtocol();
+ owner1.protocols.put(name, protocol1);
+ owner2.protocols.put(name, protocol2);
+ assertEquals(protocol1, shared.getProtocol(name));
+
+ Message message1 = new SimpleMessage("one");
+ Message message2 = new SimpleMessage("two");
+ Message message3 = new SimpleMessage("three");
+ shared.deliverMessage(message1, "s1");
+ shared.deliverMessage(message2, "s2");
+ shared.unregisterSession("s1", owner1);
+ shared.deliverMessage(message3, "s1");
+ assertEquals(Map.of("s1", List.of(message1)), owner1.messages);
+ assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3)), owner2.messages);
+
+ shared.detach(owner1);
+ assertEquals(protocol2, shared.getProtocol(name));
+
+ shared.detach(owner2);
+ assertFalse(net.shutDown.get());
+
+ shared.deconstruct();
+ assertTrue(net.shutDown.get());
+ }
+
+ @Test
+ public void testDedicated() {
+ MockNetwork net = new MockNetwork();
+ MockOwner owner = new MockOwner();
+ NetworkMultiplexer dedicated = NetworkMultiplexer.dedicated(net);
+ assertEquals(Set.of(dedicated), net.attached);
+ assertEquals(Set.of(), net.registered);
+ assertFalse(net.shutDown.get());
+
+ dedicated.attach(owner);
+ dedicated.detach(owner);
+ assertTrue(net.shutDown.get());
+ }
+
+ static class MockOwner implements NetworkOwner {
+
+ final Map<Utf8Array, Protocol> protocols = new HashMap<>();
+ final Map<String, List<Message>> messages = new HashMap<>();
+
+ @Override
+ public Protocol getProtocol(Utf8Array name) {
+ return protocols.get(name);
+ }
+
+ @Override
+ public void deliverMessage(Message message, String session) {
+ messages.computeIfAbsent(session, __ -> new ArrayList<>()).add(message);
+ }
+
+ @Override
+ public String toString() {
+ return "mock owner";
+ }
+
+ }
+
+ static class MockNetwork implements Network {
+
+ final Set<NetworkOwner> attached = new HashSet<>();
+ final Set<String> registered = new HashSet<>();
+ final AtomicBoolean shutDown = new AtomicBoolean();
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+ assertTrue(attached.add(owner));
+ }
+
+ @Override
+ public void registerSession(String session) {
+ assertTrue(registered.add(session));
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+ assertTrue(registered.remove(session));
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void sync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdown() {
+ assertFalse(shutDown.getAndSet(true));
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IMirror getMirror() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}