aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-12 16:42:28 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commit079c23411d250e47b0a9d321906bea56cd27bbd4 (patch)
tree1da160fc78735c7c01a648f45f78b9cec5030a3e /messagebus
parent2e25cf88e1e25c81841c6a55c5130e6ea2c36d78 (diff)
Reduce entanglement
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java11
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java4
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java8
6 files changed, 12 insertions, 16 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 925968b9ade..6c1b7bc10be 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -423,7 +423,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
return getProtocol((Utf8Array)name);
}
- @Override
public void deliverReply(Reply reply, ReplyHandler handler) {
msn.deliverReply(reply, handler);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
index 2b55bf5b901..c4185d67251 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
@@ -24,7 +24,7 @@ public interface NetworkOwner {
* @param name The name of the protocol to return.
* @return The named protocol.
*/
- public Protocol getProtocol(Utf8Array name);
+ Protocol getProtocol(Utf8Array name);
/**
* All messages that arrive in the network layer is passed to its owner through this function.
@@ -32,13 +32,6 @@ public interface NetworkOwner {
* @param message The message that just arrived from the network.
* @param session The name of the session that is the recipient of the request.
*/
- public void deliverMessage(Message message, String session);
+ void deliverMessage(Message message, String session);
- /**
- * All replies that arrive in the network layer is passed through this to unentangle it from the network thread.
- *
- * @param reply The reply that just arrived from the network.
- * @param handler The handler that is to receive the reply.
- */
- public void deliverReply(Reply reply, ReplyHandler handler);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index f3c6422fbfd..06b29ed524c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -115,7 +115,7 @@ public class LocalNetwork implements Network {
error.getMessage(),
error.getService() != null ? error.getService() : envelope.sender.hostId));
}
- owner.deliverReply(reply, envelope.parent.recipient);
+ envelope.parent.recipient.handleReply(reply);
});
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index a7efe6e0a8a..e95883d8dda 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -348,7 +348,7 @@ public class RPCNetwork implements Network, MethodHandler {
Reply reply = new EmptyReply();
reply.getTrace().setLevel(ctx.traceLevel);
reply.addError(new Error(errCode, errMsg));
- owner.deliverReply(reply, recipient);
+ recipient.handleReply(reply);
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
index 5fc1f64866f..68eadfbf542 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -80,7 +80,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
}
Reply reply = new EmptyReply();
reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
+ recipient.handleReply(reply);
} else {
req.setContext(ctx);
address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
@@ -147,7 +147,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
if (error != null) {
reply.addError(error);
}
- ctx.owner.deliverReply(reply, ctx.recipient);
+ ctx.recipient.handleReply(reply);
}
protected final class Params {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
index 05fc6f62236..42d1ff9ba2d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
@@ -805,9 +805,13 @@ public class RoutingNode implements ReplyHandler {
this.serviceAddress = serviceAddress;
}
+ /** Proxy through message bus in case it was destroyed in the meantime. */
@Override
public void handleReply(Reply reply) {
- setReply(reply);
- notifyParent();
+ mbus.deliverReply(reply, r -> {
+ setReply(reply);
+ notifyParent();
+ });
}
+
}