summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-01-05 14:50:46 +0100
commitbe5ea0ad39c15c13fb85a70d9990165499a92896 (patch)
treee92462f5f130fa68f40175ec7d987c661dd9ae0f /messagebus
parent6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff)
Revert "Revert "Jonmv/remove storage policy""
This reverts commit 75b2e4c11ea6463c335f1c77dab3fdb5493e5600.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java8
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Error.java8
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java52
3 files changed, 25 insertions, 43 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
index 1f5d60a630a..a4b5422d502 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
@@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler {
}
/**
- * Conveniece method for acknowledging a message back to the sender.
+ * Convenience method for acknowledging a message for its sender.
*
* This is equivalent to:
* <pre>
@@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler {
* Messages should be acknowledged when
* <ul>
* <li>this destination has safely and permanently applied the message, or
- * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message
+ * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message.
* </ul>
*
* @param msg The message to acknowledge back to the sender.
@@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler {
}
/**
- * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it
- * used to reach the detination.
+ * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it
+ * used to reach the destination.
*
* @param reply The reply, created from the message this is a reply to.
*/
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java
index 4aa85c1ea87..475dea3d7ac 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java
@@ -78,9 +78,9 @@ public final class Error {
@Override
public String toString() {
String name = ErrorCode.getName(code);
- return "[" +
- (name != null ? name : code) + " @ " +
- (service != null ? service : "localhost") +
- "]: " + message;
+ return "[" +
+ name + " @ " +
+ (service != null ? service : "localhost") +
+ "]: " + message;
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index 88a6d5376b1..f3c6422fbfd 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -97,36 +97,25 @@ public class LocalNetwork implements Network {
msg.setRetryEnabled(envelope.msg.getRetryEnabled());
msg.setRetry(envelope.msg.getRetry());
msg.setTimeRemaining(envelope.msg.getTimeRemainingNow());
- msg.pushHandler(new ReplyHandler() {
-
- @Override
- public void handleReply(Reply reply) {
- new ReplyEnvelope(LocalNetwork.this, envelope, reply).send();
- }
- });
- owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress())
- .getSessionName());
+ msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send());
+ owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName());
}
});
}
private void receiveLater(ReplyEnvelope envelope) {
byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply);
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
- reply.setRetryDelay(envelope.reply.getRetryDelay());
- reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode()));
- for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) {
- Error error = envelope.reply.getError(i);
- reply.addError(new Error(error.getCode(),
- error.getMessage(),
- error.getService() != null ? error.getService() : envelope.sender.hostId));
- }
- owner.deliverReply(reply, envelope.parent.recipient);
+ executor.execute(() -> {
+ Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
+ reply.setRetryDelay(envelope.reply.getRetryDelay());
+ reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode()));
+ for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) {
+ Error error = envelope.reply.getError(i);
+ reply.addError(new Error(error.getCode(),
+ error.getMessage(),
+ error.getService() != null ? error.getService() : envelope.sender.hostId));
}
+ owner.deliverReply(reply, envelope.parent.recipient);
});
}
@@ -137,23 +126,16 @@ public class LocalNetwork implements Network {
return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode);
}
- @SuppressWarnings("unchecked")
private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) {
- if (toDecode.length == 0) {
- return clazz.cast(new EmptyReply());
- }
- return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode));
+ return clazz.cast(toDecode.length == 0 ? new EmptyReply()
+ : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode));
}
@Override
- public void sync() {
-
- }
+ public void sync() { }
@Override
- public void shutdown() {
-
- }
+ public void shutdown() { }
@Override
public String getConnectionSpec() {
@@ -178,7 +160,7 @@ public class LocalNetwork implements Network {
}
void send() {
- LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this);
+ ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this);
}
}