summaryrefslogtreecommitdiffstats
path: root/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java')
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java60
1 files changed, 60 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
new file mode 100644
index 00000000000..2ce76a50b73
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
@@ -0,0 +1,60 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName());
+ private final SharedMessageBus mbus;
+ private final SourceSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.mbus = mbus;
+ this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public SourceSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Result sendMessageBlocking(Message msg) throws InterruptedException {
+ return session.sendBlocking(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying shared source session.");
+ session.close();
+ mbusReference.close();
+ }
+
+}