aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-01-05 12:29:17 +0100
committerGitHub <noreply@github.com>2021-01-05 12:29:17 +0100
commit75b2e4c11ea6463c335f1c77dab3fdb5493e5600 (patch)
tree1c01f7e4d6492ed9e508d8841f975e2c1fc177c6 /messagebus
parent25f217ff7f0c4f8ae921b71788a083c6b6acf2cc (diff)
Revert "Jonmv/remove storage policy"
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java8
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Error.java8
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java52
3 files changed, 43 insertions, 25 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
index a4b5422d502..1f5d60a630a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
@@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler {
}
/**
- * Convenience method for acknowledging a message for its sender.
+ * Conveniece method for acknowledging a message back to the sender.
*
* This is equivalent to:
* <pre>
@@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler {
* Messages should be acknowledged when
* <ul>
* <li>this destination has safely and permanently applied the message, or
- * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message.
+ * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message
* </ul>
*
* @param msg The message to acknowledge back to the sender.
@@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler {
}
/**
- * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it
- * used to reach the destination.
+ * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it
+ * used to reach the detination.
*
* @param reply The reply, created from the message this is a reply to.
*/
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java
index 475dea3d7ac..4aa85c1ea87 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java
@@ -78,9 +78,9 @@ public final class Error {
@Override
public String toString() {
String name = ErrorCode.getName(code);
- return "[" +
- name + " @ " +
- (service != null ? service : "localhost") +
- "]: " + message;
+ return "[" +
+ (name != null ? name : code) + " @ " +
+ (service != null ? service : "localhost") +
+ "]: " + message;
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index f3c6422fbfd..88a6d5376b1 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -97,25 +97,36 @@ public class LocalNetwork implements Network {
msg.setRetryEnabled(envelope.msg.getRetryEnabled());
msg.setRetry(envelope.msg.getRetry());
msg.setTimeRemaining(envelope.msg.getTimeRemainingNow());
- msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send());
- owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName());
+ msg.pushHandler(new ReplyHandler() {
+
+ @Override
+ public void handleReply(Reply reply) {
+ new ReplyEnvelope(LocalNetwork.this, envelope, reply).send();
+ }
+ });
+ owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress())
+ .getSessionName());
}
});
}
private void receiveLater(ReplyEnvelope envelope) {
byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply);
- executor.execute(() -> {
- Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
- reply.setRetryDelay(envelope.reply.getRetryDelay());
- reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode()));
- for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) {
- Error error = envelope.reply.getError(i);
- reply.addError(new Error(error.getCode(),
- error.getMessage(),
- error.getService() != null ? error.getService() : envelope.sender.hostId));
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
+ reply.setRetryDelay(envelope.reply.getRetryDelay());
+ reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode()));
+ for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) {
+ Error error = envelope.reply.getError(i);
+ reply.addError(new Error(error.getCode(),
+ error.getMessage(),
+ error.getService() != null ? error.getService() : envelope.sender.hostId));
+ }
+ owner.deliverReply(reply, envelope.parent.recipient);
}
- owner.deliverReply(reply, envelope.parent.recipient);
});
}
@@ -126,16 +137,23 @@ public class LocalNetwork implements Network {
return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode);
}
+ @SuppressWarnings("unchecked")
private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) {
- return clazz.cast(toDecode.length == 0 ? new EmptyReply()
- : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode));
+ if (toDecode.length == 0) {
+ return clazz.cast(new EmptyReply());
+ }
+ return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode));
}
@Override
- public void sync() { }
+ public void sync() {
+
+ }
@Override
- public void shutdown() { }
+ public void shutdown() {
+
+ }
@Override
public String getConnectionSpec() {
@@ -160,7 +178,7 @@ public class LocalNetwork implements Network {
}
void send() {
- ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this);
+ LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this);
}
}