summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java56
1 files changed, 35 insertions, 21 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 096c0c0b485..96a0a5fecef 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,17 +2,28 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.VersionSpecification;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Task;
+import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
@@ -22,8 +33,16 @@ import com.yahoo.messagebus.routing.RoutingNode;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -44,7 +63,7 @@ public class RPCNetwork implements Network, MethodHandler {
private final Acceptor listener;
private final Mirror mirror;
private final Register register;
- private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>();
+ private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>();
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
@@ -162,9 +181,10 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter = new RPCSendV1();
- addSendAdapter(new VersionSpecification(5), adapter);
- addSendAdapter(new VersionSpecification(6), adapter);
+ RPCSendAdapter adapter1 = new RPCSendV1();
+ RPCSendAdapter adapter2 = new RPCSendV2();
+ addSendAdapter(new Version(5), adapter1);
+ addSendAdapter(new Version(6,149), adapter2);
}
@Override
@@ -234,11 +254,9 @@ public class RPCNetwork implements Network, MethodHandler {
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
- replyError(ctx, ErrorCode.NETWORK_SHUTDOWN,
- "Network layer has performed shutdown.");
+ replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -315,7 +333,7 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to register an adapter.
* @param adapter The adapter to register.
*/
- private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) {
+ private void addSendAdapter(Version version, RPCSendAdapter adapter) {
adapter.attach(this);
sendAdapters.put(version, adapter);
}
@@ -327,13 +345,9 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to return an adapter.
* @return The compatible adapter.
*/
- private RPCSendAdapter getSendAdapter(Version version) {
- for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) {
- if (entry.getKey().matches(version)) {
- return entry.getValue();
- }
- }
- return null;
+ public RPCSendAdapter getSendAdapter(Version version) {
+ Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
+ return (lower != null) ? lower.getValue() : null;
}
/**