diff options
14 files changed, 72 insertions, 45 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java index bb3d7e049d1..825e6b033cd 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlobrokClient.java @@ -206,7 +206,7 @@ public class SlobrokClient implements NodeLookup { String service = (st.hasMoreTokens() ? st.nextToken() : ""); // skip assert(addressType.equals("storage")); Node n = new Node(nodeType, nodeIndex); - result.put(n, new SlobrokData(n, entry.getSpec())); + result.put(n, new SlobrokData(n, entry.getSpecString())); } return result; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java index e81ac4ae05e..e000364e0d4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternPolicy.java @@ -105,7 +105,7 @@ public class ExternPolicy implements DocumentProtocolRoutingPolicy { recipients.clear(); List<Mirror.Entry> arr = mirror.lookup(pattern); for (Mirror.Entry entry : arr) { - recipients.add(Hop.parse(entry.getSpec() + session)); + recipients.add(Hop.parse(entry.getSpecString() + session)); } } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java index 3d129684465..ae4606adaaf 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java @@ -58,7 +58,7 @@ public class LoadBalancerPolicy extends SlobrokPolicy { if (node != null) { context.setContext(node); Route route = new Route(context.getRoute()); - route.setHop(0, Hop.parse(node.entry.getSpec() + "/" + session)); + route.setHop(0, Hop.parse(node.entry.getSpecString() + "/" + session)); context.addChild(route); } else { context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, "Could not resolve any nodes to send to in pattern " + pattern); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java index d4ebd4ecd81..2db45ed0472 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LocalServicePolicy.java @@ -86,7 +86,7 @@ public class LocalServicePolicy implements DocumentProtocolRoutingPolicy { List<Mirror.Entry> arr = ctx.getMirror().lookup(ctx.getHopPrefix() + "*" + ctx.getHopSuffix()); String self = localAddress != null ? localAddress : toAddress(ctx.getMessageBus().getConnectionSpec()); for (Mirror.Entry item : arr) { - if (self.equals(toAddress(item.getSpec()))) { + if (self.equals(toAddress(item.getSpecString()))) { entry.recipients.add(Hop.parse(item.getName())); } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 552f47ec5a6..adf9d5ee912 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -144,12 +144,12 @@ public class StoragePolicy extends SlobrokPolicy { if (arr.isEmpty()) return null; if (distributor != null) { if (arr.size() == 1) { - return convertSlobrokNameToSessionName(arr.get(0).getSpec()); + return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); } else { log.log(LogLevel.WARNING, "Got " + arr.size() + " matches for a distributor."); } } else { - return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpec()); + return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); } return null; } diff --git a/jrt/src/com/yahoo/jrt/Spec.java b/jrt/src/com/yahoo/jrt/Spec.java index 7e4f6d987fa..f4b70bd4f9e 100644 --- a/jrt/src/com/yahoo/jrt/Spec.java +++ b/jrt/src/com/yahoo/jrt/Spec.java @@ -3,15 +3,15 @@ package com.yahoo.jrt; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Objects; /** * A Spec is a network address used for either listening or * connecting. */ -public class Spec { +public class Spec implements Comparable<Spec> { - private final SocketAddress address; private final String host; private final int port; private final boolean malformed; @@ -57,13 +57,11 @@ public class Spec { port = portNum; malformed = ! correct; host = correct ? hostStr : null; - address = correct ? createAddress(host, port) : null; asString = correct ? createString(host, port) : "MALFORMED"; } else { malformed = true; port = 0; host = null; - address = null; asString = "MALFORMED"; } } @@ -79,7 +77,6 @@ public class Spec { this.port = port; malformed = false; asString = createString(host, port); - address = createAddress(host, port); } /** @@ -130,7 +127,7 @@ public class Spec { * @return socket address */ SocketAddress address() { - return address; + return !malformed ? createAddress(host, port) : null; } /** @@ -143,4 +140,34 @@ public class Spec { return asString; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Spec spec = (Spec) o; + return port == spec.port && + malformed == spec.malformed && + Objects.equals(host, spec.host); + } + + @Override + public int hashCode() { + return Objects.hash(host, port, malformed); + } + + @Override + public int compareTo(Spec o) { + int cmp = 0; + if ((host != null) && (o.host != null)) { + cmp = host.compareTo(o.host); + } else if (host != null) { + return -1; + } else if (o.host != null) { + return 1; + } + return (cmp == 0) + ? Integer.compare(port, o.port) + : cmp; + + } } diff --git a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java index 9ed8eafbd30..31d7543304a 100644 --- a/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java +++ b/jrt/src/com/yahoo/jrt/slobrok/api/Mirror.java @@ -315,12 +315,12 @@ public class Mirror implements IMirror { public static final class Entry implements Comparable<Entry> { private final String name; - private final String spec; + private final Spec spec; private final char [] nameArray; public Entry(String name, String spec) { this.name = name; - this.spec = spec; + this.spec = new Spec(spec); this.nameArray = name.toCharArray(); } @@ -343,7 +343,8 @@ public class Mirror implements IMirror { char [] getNameArray() { return nameArray; } public String getName() { return name; } - public String getSpec() { return spec; } + public Spec getSpec() { return spec; } + public String getSpecString() { return spec.toString(); } } diff --git a/jrt/tests/com/yahoo/jrt/SlobrokTest.java b/jrt/tests/com/yahoo/jrt/SlobrokTest.java index 20266b0826a..552780dd0fd 100644 --- a/jrt/tests/com/yahoo/jrt/SlobrokTest.java +++ b/jrt/tests/com/yahoo/jrt/SlobrokTest.java @@ -88,14 +88,14 @@ public class SlobrokTest { err(" { EMPTY }"); } for (Entry e : actual) { - err(" {" + e.getName() + ", " + e.getSpec() + "}"); + err(" {" + e.getName() + ", " + e.getSpecString() + "}"); } err("expected values:"); if (expect.isEmpty()) { err(" { EMPTY }"); } for (Entry e : expect) { - err(" {" + e.getName() + ", " + e.getSpec() + "}"); + err(" {" + e.getName() + ", " + e.getSpecString() + "}"); } } @@ -119,9 +119,7 @@ public class SlobrokTest { assertFalse(one.equals(null)); assertFalse(one.equals(register)); assertTrue(one.getName().equals(wantName)); - assertTrue(one.getSpec().equals(mySpec)); - int wantHC = mySpec.hashCode() + wantName.hashCode(); - assertTrue(one.hashCode() == wantHC); + assertTrue(one.getSpecString().equals(mySpec)); register.registerName("B/x"); check("B/x", new SpecList().add("B/x", mySpec)); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java index 7c404207737..fb3c4cf9971 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java @@ -53,7 +53,10 @@ public class RPCService { addressList = mirror.lookup(pattern); } if (addressList != null && !addressList.isEmpty()) { - addressIdx = ++addressIdx % addressList.size(); + ++addressIdx; + if (addressIdx >= addressList.size()) { + addressIdx = 0; + } Mirror.Entry entry = addressList.get(addressIdx); return new RPCServiceAddress(entry.getName(), entry.getSpec()); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java index 8a1b99073e7..0a6a58d4e89 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java @@ -23,15 +23,16 @@ public class RPCServiceAddress implements ServiceAddress { * @param serviceName The full service name of the address. * @param connectionSpec The connection specification. */ - public RPCServiceAddress(String serviceName, String connectionSpec) { + public RPCServiceAddress(String serviceName, Spec connectionSpec) { this.serviceName = serviceName; int pos = serviceName.lastIndexOf('/'); - if (pos > 0 && pos < serviceName.length() - 1) { - sessionName = serviceName.substring(pos + 1); - } else { - sessionName = null; - } - this.connectionSpec = new Spec(connectionSpec); + sessionName = (pos > 0 && pos < serviceName.length() - 1) + ? serviceName.substring(pos + 1) + : null; + this.connectionSpec = connectionSpec; + } + public RPCServiceAddress(String serviceName, String connectionSpec) { + this(serviceName, new Spec(connectionSpec)); } @Override @@ -97,7 +98,7 @@ public class RPCServiceAddress implements ServiceAddress { } /** - * Sets the RPC target to be used when communicating with the remove service. + * Sets the RPC target to be used when communicating with the remote service. * * @param target The target to set. */ diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java index 04d9f4ade68..7f5721ee2bf 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java @@ -18,7 +18,7 @@ import java.util.Map; */ public class RPCTargetPool { - private final Map<String, Entry> targets = new HashMap<>(); + private final Map<Spec, Entry> targets = new HashMap<>(); private final Timer timer; private final long expireMillis; private final int numTargetsPerSpec; @@ -86,33 +86,32 @@ public class RPCTargetPool { */ public RPCTarget getTarget(Supervisor orb, RPCServiceAddress address) { Spec spec = address.getConnectionSpec(); - String key = spec.toString(); long now = timer.milliTime(); synchronized (this) { - Entry entry = targets.get(key); + Entry entry = targets.get(spec); if (entry != null) { RPCTarget target = entry.getTarget(now); if (target != null) { return target; } - dropTarget(entry, key); + dropTarget(entry, spec); } - return createAndAddTarget(orb, spec, key, now); + return createAndAddTarget(orb, spec, now); } } - private void dropTarget(Entry entry, String key) { + private void dropTarget(Entry entry, Spec key) { entry.close(); targets.remove(key); } - private RPCTarget createAndAddTarget(Supervisor orb, Spec spec, String key, long now) { + private RPCTarget createAndAddTarget(Supervisor orb, Spec spec, long now) { RPCTarget [] tmpTargets = new RPCTarget[numTargetsPerSpec]; for (int i=0; i < tmpTargets.length; i++) { tmpTargets[i] = new RPCTarget(spec, orb); } Entry entry = new Entry(tmpTargets, now); - targets.put(key, entry); + targets.put(spec, entry); return entry.getTarget(now); } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java index dd779fd84c0..886833b8b14 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java @@ -11,9 +11,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -69,7 +67,7 @@ public class SlobrokTestCase { System.out.printf(" { EMPTY }\n"); } else { for (Mirror.Entry entry : actual) { - System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec()); + System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpecString()); } } System.out.printf("expected values:\n"); @@ -77,7 +75,7 @@ public class SlobrokTestCase { System.out.printf(" { EMPTY }\n"); } else { for (Mirror.Entry entry : expect) { - System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec()); + System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpecString()); } } assertTrue(false); diff --git a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/resources/InstanceResource.java b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/resources/InstanceResource.java index 65ef1c9eaf0..bd265ed39e4 100644 --- a/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/resources/InstanceResource.java +++ b/orchestrator/src/main/java/com/yahoo/vespa/orchestrator/resources/InstanceResource.java @@ -105,7 +105,7 @@ public class InstanceResource { List<Mirror.Entry> entries = slobrokApi.lookup(applicationId, pattern); return entries.stream() - .map(entry -> new SlobrokEntryResponse(entry.getName(), entry.getSpec())) + .map(entry -> new SlobrokEntryResponse(entry.getName(), entry.getSpecString())) .collect(Collectors.toList()); } diff --git a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/resources/InstanceResourceTest.java b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/resources/InstanceResourceTest.java index 61102461bf0..ef6e26d2e99 100644 --- a/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/resources/InstanceResourceTest.java +++ b/orchestrator/src/test/java/com/yahoo/vespa/orchestrator/resources/InstanceResourceTest.java @@ -28,8 +28,8 @@ public class InstanceResourceTest { private static final ApplicationId APPLICATION_ID = ApplicationId.from( "tenant", "app", "instance"); private static final List<Mirror.Entry> ENTRIES = Arrays.asList( - new Mirror.Entry("name1", "spec1"), - new Mirror.Entry("name2", "spec2")); + new Mirror.Entry("name1", "tcp/spec:1"), + new Mirror.Entry("name2", "tcp/spec:2")); private static final ClusterId CLUSTER_ID = new ClusterId("cluster-id"); private final SlobrokApi slobrokApi = mock(SlobrokApi.class); @@ -85,7 +85,7 @@ public class InstanceResourceTest { ObjectMapper mapper = new ObjectMapper(); String actualJson = mapper.writeValueAsString(response); assertEquals( - "[{\"name\":\"name1\",\"spec\":\"spec1\"},{\"name\":\"name2\",\"spec\":\"spec2\"}]", + "[{\"name\":\"name1\",\"spec\":\"tcp/spec:1\"},{\"name\":\"name2\",\"spec\":\"tcp/spec:2\"}]", actualJson); } }
\ No newline at end of file |