summaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/messagebus/shared
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/shared')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java14
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java73
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java22
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java85
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java104
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java68
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java58
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java8
8 files changed, 432 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
new file mode 100644
index 00000000000..0964a254cf2
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
@@ -0,0 +1,14 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Result;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public interface ClientSession extends SharedResource {
+
+ public Result sendMessage(Message msg);
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
new file mode 100644
index 00000000000..ad58d6b9a5e
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -0,0 +1,73 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkOwner;
+import com.yahoo.messagebus.routing.RoutingNode;
+
+import java.util.List;
+
+/**
+ * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p>
+ *
+ * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
+ */
+class NullNetwork implements Network {
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ return true;
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+
+ }
+
+ @Override
+ public void registerSession(String session) {
+
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ return false;
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+
+ }
+
+ @Override
+ public void sync() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ return null;
+ }
+
+ @Override
+ public IMirror getMirror() {
+ return null;
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
new file mode 100644
index 00000000000..56713815c7a
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
@@ -0,0 +1,22 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public interface ServerSession extends SharedResource {
+
+ public MessageHandler getMessageHandler();
+
+ public void setMessageHandler(MessageHandler msgHandler);
+
+ public void sendReply(Reply reply);
+
+ public String connectionSpec();
+
+ public String name();
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
new file mode 100644
index 00000000000..7da164757cd
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
@@ -0,0 +1,85 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession {
+
+ private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final DestinationSession session;
+ private final ResourceReference mbusReference;
+
+ SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) {
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public DestinationSession session() {
+ return session;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared destination session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
new file mode 100644
index 00000000000..5c9fab46e34
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
@@ -0,0 +1,104 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.IntermediateSession;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedIntermediateSession extends AbstractResource
+ implements ClientSession, ServerSession, MessageHandler, ReplyHandler
+{
+
+ private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final IntermediateSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this)
+ .setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public IntermediateSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ session.forward(msg);
+ return Result.ACCEPTED;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.forward(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared intermediate session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
new file mode 100644
index 00000000000..dd135a51378
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -0,0 +1,68 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.jdisc.AbstractResource;
+import java.util.logging.Level;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.cloud.config.SlobroksConfig;
+
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedMessageBus extends AbstractResource {
+
+ private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName());
+ private final MessageBus mbus;
+
+ public SharedMessageBus(MessageBus mbus) {
+ mbus.getClass(); // throws NullPointerException
+ this.mbus = mbus;
+ }
+
+ public MessageBus messageBus() {
+ return mbus;
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared message bus.");
+ mbus.destroy();
+ }
+
+ public SharedSourceSession newSourceSession(SourceSessionParams params) {
+ return new SharedSourceSession(this, params);
+ }
+
+ public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) {
+ return new SharedIntermediateSession(this, params);
+ }
+
+ public SharedDestinationSession newDestinationSession(DestinationSessionParams params) {
+ return new SharedDestinationSession(this, params);
+ }
+
+ public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) {
+ return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams));
+ }
+
+ private static Network newNetwork(RPCNetworkParams params) {
+ SlobroksConfig cfg = params.getSlobroksConfig();
+ if (cfg == null) {
+ cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId());
+ }
+ if (cfg.slobrok().isEmpty()) {
+ return new NullNetwork(); // for LocalApplication
+ }
+ return new RPCNetwork(params);
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
new file mode 100644
index 00000000000..56071682349
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
@@ -0,0 +1,58 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName());
+ private final SourceSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public SourceSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Result sendMessageBlocking(Message msg) throws InterruptedException {
+ return session.sendBlocking(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared source session.");
+ session.close();
+ mbusReference.close();
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java
new file mode 100644
index 00000000000..941a0dc4c5c
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java
@@ -0,0 +1,8 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Not a public API, exported for use in internal components.
+ */
+@ExportPackage
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.osgi.annotation.ExportPackage;