diff options
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java | 56 |
1 files changed, 35 insertions, 21 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 096c0c0b485..96a0a5fecef 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -2,17 +2,28 @@ package com.yahoo.messagebus.network.rpc; import com.yahoo.component.Version; -import com.yahoo.component.VersionSpecification; import com.yahoo.component.Vtag; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.MethodHandler; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Task; +import com.yahoo.jrt.Transport; import com.yahoo.jrt.slobrok.api.IMirror; import com.yahoo.jrt.slobrok.api.Mirror; import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Protocol; +import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.NetworkOwner; @@ -22,8 +33,16 @@ import com.yahoo.messagebus.routing.RoutingNode; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.*; -import java.util.concurrent.*; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -44,7 +63,7 @@ public class RPCNetwork implements Network, MethodHandler { private final Acceptor listener; private final Mirror mirror; private final Register register; - private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>(); + private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>(); private NetworkOwner owner; private final SlobrokConfigSubscriber slobroksConfig; private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true); @@ -162,9 +181,10 @@ public class RPCNetwork implements Network, MethodHandler { } this.owner = owner; - RPCSendAdapter adapter = new RPCSendV1(); - addSendAdapter(new VersionSpecification(5), adapter); - addSendAdapter(new VersionSpecification(6), adapter); + RPCSendAdapter adapter1 = new RPCSendV1(); + RPCSendAdapter adapter2 = new RPCSendV2(); + addSendAdapter(new Version(5), adapter1); + addSendAdapter(new Version(6,149), adapter2); } @Override @@ -234,11 +254,9 @@ public class RPCNetwork implements Network, MethodHandler { */ private void send(SendContext ctx) { if (destroyed.get()) { - replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, - "Network layer has performed shutdown."); + replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown."); } else if (ctx.hasError) { - replyError(ctx, ErrorCode.HANDSHAKE_FAILED, - "An error occured while resolving version."); + replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version."); } else { sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); } @@ -315,7 +333,7 @@ public class RPCNetwork implements Network, MethodHandler { * @param version The version for which to register an adapter. * @param adapter The adapter to register. */ - private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) { + private void addSendAdapter(Version version, RPCSendAdapter adapter) { adapter.attach(this); sendAdapters.put(version, adapter); } @@ -327,13 +345,9 @@ public class RPCNetwork implements Network, MethodHandler { * @param version The version for which to return an adapter. * @return The compatible adapter. */ - private RPCSendAdapter getSendAdapter(Version version) { - for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) { - if (entry.getKey().matches(version)) { - return entry.getValue(); - } - } - return null; + public RPCSendAdapter getSendAdapter(Version version) { + Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version); + return (lower != null) ? lower.getValue() : null; } /** |