diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-01-05 10:08:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-05 10:08:28 +0100 |
commit | f3d54a087fee498ac1669ad06da1724e2bdce4a8 (patch) | |
tree | ab3965013acf6098bd402282d58d3777106315c0 | |
parent | 9f68966bd023a659e0ff518ebc7ab04b2e3bbf49 (diff) | |
parent | 3e32d199fca6e9fd347bbec75d7cf30f56b99253 (diff) |
Merge pull request #15873 from vespa-engine/jonmv/remove-storage-policy
Jonmv/remove storage policy
35 files changed, 1060 insertions, 1242 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index fd887a4196b..2f769143e6e 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -217,12 +217,10 @@ public class Reindexer { static class Cluster { private final String name; - private final String configId; private final Map<DocumentType, String> documentBuckets; - Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) { + Cluster(String name, Map<DocumentType, String> documentBuckets) { this.name = requireNonNull(name); - this.configId = requireNonNull(configId); this.documentBuckets = Map.copyOf(documentBuckets); } @@ -231,7 +229,7 @@ public class Reindexer { } String route() { - return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]"; + return name + "-direct"; } String bucketSpaceOf(DocumentType documentType) { @@ -244,20 +242,18 @@ public class Reindexer { if (o == null || getClass() != o.getClass()) return false; Cluster cluster = (Cluster) o; return name.equals(cluster.name) && - configId.equals(cluster.configId) && documentBuckets.equals(cluster.documentBuckets); } @Override public int hashCode() { - return Objects.hash(name, configId, documentBuckets); + return Objects.hash(name, documentBuckets); } @Override public String toString() { return "Cluster{" + "name='" + name + '\'' + - ", configId='" + configId + '\'' + ", documentBuckets=" + documentBuckets + '}'; } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 8668ed037ef..e62314429fb 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -141,7 +141,6 @@ public class ReindexingMaintainer extends AbstractComponent { return clusters.storage().stream() .filter(storage -> storage.name().equals(name)) .map(storage -> new Cluster(name, - storage.configid(), bucketSpaces.cluster(name) .documentType().entrySet().stream() .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 9a88d8aad1f..160022cc996 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -45,7 +45,7 @@ class ReindexerTest { final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); - final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); + final Cluster cluster = new Cluster("cluster", Map.of(music, "default")); final MockMetric metric = new MockMetric(); final ManualClock clock = new ManualClock(Instant.EPOCH); @@ -59,7 +59,7 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); + () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); } @Test @@ -84,7 +84,7 @@ class ReindexerTest { assertEquals("music:[document]", parameters.getFieldSet()); assertSame(token, parameters.getResumeToken()); assertEquals("default", parameters.getBucketSpace()); - assertEquals("[Storage:cluster=cluster;clusterconfigid=id]", parameters.getRoute().toString()); + assertEquals("cluster-direct", parameters.getRoute().toString()); assertEquals("cluster", parameters.getRemoteDataHandler()); assertEquals("music", parameters.getDocumentSelection()); assertEquals(DocumentProtocol.Priority.NORMAL_3, parameters.getPriority()); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java index afa68debadb..f69c5d28a01 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -43,7 +43,7 @@ class ReindexingMaintainerTest { .build(), manager)); - assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")), + assertEquals(new Cluster("cluster", Map.of(manager.getDocumentType("music"), "default")), parseCluster("cluster", new ClusterListConfig.Builder() .storage(new ClusterListConfig.Storage.Builder() diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index a70f59bb9fb..39d6898215c 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -1519,30 +1519,92 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$ContentParameters": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters", + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$BucketIdCalculator": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>()" + ], + "fields": [] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$DistributorSelectionLogic": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void destroy()" + ], + "fields": [] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "abstract" + ], + "methods": [ + "protected void <init>(int)", + "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)", + "public void close()" + ], + "fields": [ + "protected final java.util.Random randomizer" + ] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters": { + "superClass": "java.lang.Object", "interfaces": [], "attributes": [ "public" ], "methods": [ "public void <init>(java.util.Map)", - "public java.lang.String getDistributionConfigId()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()" + "public java.lang.String getClusterName()", + "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator createPatternGenerator()", + "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)", + "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)" + ], + "fields": [ + "protected final java.lang.String clusterName", + "protected final java.lang.String distributionConfigId", + "protected final com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator" + ] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostFetcher": { + "superClass": "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)", + "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)" ], "fields": [] }, "com.yahoo.documentapi.messagebus.protocol.ContentPolicy": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy", + "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy", "interfaces": [], "attributes": [ "public" ], "methods": [ + "public void <init>(java.lang.String)", "public void <init>(java.util.Map)", - "public void <init>(java.lang.String)" + "public void <init>(com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters)", + "public void select(com.yahoo.messagebus.routing.RoutingContext)", + "public void merge(com.yahoo.messagebus.routing.RoutingContext)", + "public void destroy()" ], - "fields": [] + "fields": [ + "public static final java.lang.String owningBucketStates" + ] }, "com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage": { "superClass": "com.yahoo.documentapi.messagebus.protocol.DocumentMessage", @@ -2979,104 +3041,6 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$BucketIdCalculator": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>()" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$DistributorSelectionLogic": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void destroy()" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public", - "abstract" - ], - "methods": [ - "protected void <init>(int)", - "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)", - "public void close()" - ], - "fields": [ - "protected final java.util.Random randomizer" - ] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.util.Map)", - "public java.lang.String getClusterName()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)", - "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)" - ], - "fields": [ - "protected final java.lang.String clusterName", - "protected final java.lang.String distributionConfigId", - "protected final com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator" - ] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostFetcher": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)", - "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public java.lang.String getDistributorHostPattern(java.lang.Integer)" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.lang.String)", - "public void <init>(java.util.Map)", - "public void <init>(com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters)", - "public void select(com.yahoo.messagebus.routing.RoutingContext)", - "public void merge(com.yahoo.messagebus.routing.RoutingContext)", - "public void destroy()" - ], - "fields": [ - "public static final java.lang.String owningBucketStates" - ] - }, "com.yahoo.documentapi.messagebus.protocol.SubsetServicePolicy": { "superClass": "java.lang.Object", "interfaces": [ diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 257d491ea93..982a1c50b85 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -917,7 +917,7 @@ public class MessageBusVisitorSession implements VisitorSession { } } if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) { - handleWrongDistributionReply((WrongDistributionReply)reply); + handleWrongDistributionReply((WrongDistributionReply) reply); } else { if (shouldReportError(reply)) { reportVisitorError(message); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java index d6e20b9d57f..2d78497456d 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java @@ -1,43 +1,613 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.document.BucketId; +import com.yahoo.document.BucketIdFactory; +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingNodeIterator; +import com.yahoo.messagebus.routing.VerbatimDirective; +import com.yahoo.vdslib.distribution.Distribution; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; /** - * Policy to talk to content clusters. + * Routing policy to determine which distributor in a content cluster to send data to. + * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. + * + * cluster=[clusterName] (Mandatory, determines the cluster name) + * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) + * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) + * + * @author Haakon Humberset */ -public class ContentPolicy extends StoragePolicy { +public class ContentPolicy extends SlobrokPolicy { + + private static final Logger log = Logger.getLogger(ContentPolicy.class.getName()); + public static final String owningBucketStates = "uim"; + private static final String upStates = "ui"; + + /** This class merely generates a slobrok host pattern for a given distributor. */ + static class SlobrokHostPatternGenerator { + + private final String base; + + SlobrokHostPatternGenerator(String clusterName) { + this.base = "storage/cluster." + clusterName + "/distributor/"; + } + + /** + * Find host pattern of the hosts that are valid targets for this request. + * + * @param distributor Set to null if any distributor is valid target. + */ + String getDistributorHostPattern(Integer distributor) { + return base + (distributor == null ? "*" : distributor) + "/default"; + } + + } + + /** Helper class to match a host pattern with node to use. */ + public abstract static class HostFetcher { + + private static class Targets { + private final List<Integer> list; + private final int total; + Targets() { + this(Collections.emptyList(), 1); + } + Targets(List<Integer> list, int total) { + this.list = list; + this.total = total; + } + } + + private final int requiredUpPercentageToSendToKnownGoodNodes; + private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets()); + protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. + + protected HostFetcher(int percent) { + requiredUpPercentageToSendToKnownGoodNodes = percent; + } + + void updateValidTargets(ClusterState state) { + List<Integer> validRandomTargets = new ArrayList<>(); + for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) { + if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); + } + validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); + } + public abstract String getTargetSpec(Integer distributor, RoutingContext context); + String getRandomTargetSpec(RoutingContext context) { + Targets targets = validTargets.get(); + // Try to use list of random targets, if at least X % of the nodes are up + while ((targets.total != 0) && + (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) + { + int randIndex = randomizer.nextInt(targets.list.size()); + String targetSpec = getTargetSpec(targets.list.get(randIndex), context); + if (targetSpec != null) { + context.trace(3, "Sending to random node seen up in cluster state"); + return targetSpec; + } + targets.list.remove(randIndex); + } + context.trace(3, "Too few nodes seen up in state. Sending totally random."); + return getTargetSpec(null, context); + } + public void close() {} + } + + /** Host fetcher using a slobrok mirror to find the hosts. */ + public static class SlobrokHostFetcher extends HostFetcher { + private final SlobrokHostPatternGenerator patternGenerator; + private final SlobrokPolicy policy; - public static class ContentParameters extends Parameters { + SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(percent); + this.patternGenerator = patternGenerator; + this.policy = policy; + } - public ContentParameters(Map<String, String> parameters) { - super(parameters); + private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) { + return policy.lookup(context, hostPattern); } + private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } + + public IMirror getMirror(RoutingContext context) { return context.getMirror(); } + @Override - public String getDistributionConfigId() { - if (distributionConfigId != null) { - return distributionConfigId; + public String getTargetSpec(Integer distributor, RoutingContext context) { + List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); + if (arr.isEmpty()) return null; + if (distributor != null) { + if (arr.size() == 1) { + return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); + } else { + log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); + } + } else { + return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); } - return clusterName; + return null; + } + } + + static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { + + /** + * Distributor index to resolved RPC spec cache for a single given Slobrok + * update generation. Uses a thread safe COW map which will grow until stable. + */ + private static class GenerationCache { + private final int generation; + private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>(); + + GenerationCache(int generation) { + this.generation = generation; + } + + public int generation() { return this.generation; } + + public String get(Integer index) { + return targets.get(index); + } + public void put(Integer index, String target) { + targets.put(index, target); + } + } + + private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); + + TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(patternGenerator, policy, percent); } @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + GenerationCache cache = generationCache.get(); + int currentGeneration = getMirror(context).updates(); + // The below code might race with other threads during a generation change. That is OK, as the cache + // is thread safe and will quickly converge to a stable state for the new generation. + if (cache == null || currentGeneration != cache.generation()) { + cache = new GenerationCache(currentGeneration); + generationCache.set(cache); + } + if (distributor != null) { + return cachingGetTargetSpec(distributor, context, cache); + } + // Wildcard lookup case. Must not be cached. + return super.getTargetSpec(null, context); + } + + private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { + String cachedTarget = cache.get(distributor); + if (cachedTarget != null) { + return cachedTarget; + } + // Mirror _may_ be at a higher version if we race with generation read, but that is OK since + // we'll either way get the most up-to-date mapping and the cache will be invalidated on the + // next invocation. + String resolvedTarget = super.getTargetSpec(distributor, context); + cache.put(distributor, resolvedTarget); + return resolvedTarget; + } + + } + + /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ + public static class Parameters { + protected final String clusterName; + protected final String distributionConfigId; + protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; + + public Parameters(Map<String, String> params) { + clusterName = params.get("cluster"); + distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove + slobrokHostPatternGenerator = createPatternGenerator(); + if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); + } + + String getDistributionConfigId() { + return distributionConfigId == null ? clusterName : distributionConfigId; + } + public String getClusterName() { + return clusterName; + } public SlobrokHostPatternGenerator createPatternGenerator() { - return new SlobrokHostPatternGenerator(getClusterName()) { - public String getDistributorHostPattern(Integer distributor) { - return "storage/cluster." + getClusterName() + "/distributor/" + (distributor == null ? "*" : distributor) + "/default"; + return new SlobrokHostPatternGenerator(getClusterName()); + } + public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { + return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); + } + public Distribution createDistribution(SlobrokPolicy policy) { + return new Distribution(getDistributionConfigId()); + } + + /** + * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the + * failure was related to node being bad. (Hard to detect from failure) + */ + int getAttemptRandomOnFailuresLimit() { return 5; } + + /** + * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the + * old one. This guards us against version resets. + */ + int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } + + /** + * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. + * (To avoid hitting trashing bad nodes still in slobrok) + */ + int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } + } + + /** Helper class to get the bucket identifier of a message. */ + public static class BucketIdCalculator { + private static final BucketIdFactory factory = new BucketIdFactory(); + + private BucketId getBucketId(Message msg) { + switch (msg.getType()) { + case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); + case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); + case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); + case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); + default: + log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); + return null; + } + } + + BucketId handleBucketIdCalculation(RoutingContext context) { + BucketId id = getBucketId(context.getMessage()); + if (id == null || id.getRawId() == 0) { + Reply reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); + context.setReply(reply); + } + return id; + } + } + + /** Class handling the logic of picking a distributor */ + public static class DistributorSelectionLogic { + /** Class that tracks a failure of a given type per node. */ + static class InstabilityChecker { + private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>(); + private final int failureLimit; + + InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } + + boolean tooManyFailures(int nodeIndex) { + if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { + nodeFailures.set(nodeIndex, 0); + return true; + } else { + return false; + } + } + + void addFailure(Integer calculatedDistributor) { + while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); + nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); + } + } + /** Message context class. Contains data we want to inspect about a request at reply time. */ + private static class MessageContext { + final Integer calculatedDistributor; + final ClusterState usedState; + + MessageContext(ClusterState usedState) { + this(usedState, null); + } + MessageContext(ClusterState usedState, Integer calculatedDistributor) { + this.calculatedDistributor = calculatedDistributor; + this.usedState = usedState; + } + + public String toString() { + return "Context(Distributor " + calculatedDistributor + + ", state version " + usedState.getVersion() + ")"; + } + } + + private final HostFetcher hostFetcher; + private final Distribution distribution; + private final InstabilityChecker persistentFailureChecker; + private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null); + private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); + private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection + + DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { + try { + hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); + distribution = params.createDistribution(policy); + persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); + maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); + } catch (Throwable e) { + destroy(); + throw e; + } + } + + public void destroy() { + if (hostFetcher != null) { + hostFetcher.close(); + } + if (distribution != null) { + distribution.close(); + } + } + + String getTargetSpec(RoutingContext context, BucketId bucketId) { + String sendRandomReason = null; + ClusterState cachedClusterState = safeCachedClusterState.get(); + + if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. + try{ + Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); + // If we have had too many failures towards existing node, reset failure count and send to random + if (persistentFailureChecker.tooManyFailures(target)) { + sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; + target = null; + } + // If we have found a target, and the target exists in slobrok, send to it. + if (target != null) { + context.setContext(new MessageContext(cachedClusterState, target)); + String targetSpec = hostFetcher.getTargetSpec(target, context); + if (targetSpec != null) { + if (context.shouldTrace(1)) { + context.trace(1, "Using distributor " + target + " for " + + bucketId + " as our state version is " + cachedClusterState.getVersion()); + } + return targetSpec; + } else { + sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; + log.log(Level.FINE, "Target distributor is not in slobrok"); + } + } else { + context.setContext(new MessageContext(cachedClusterState)); + } + } catch (Distribution.TooFewBucketBitsInUseException e) { + Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); + reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(reply); + return null; + } catch (Distribution.NoDistributorsAvailableException e) { + log.log(Level.FINE, "No distributors available; clearing cluster state"); + safeCachedClusterState.set(null); + sendRandomReason = "No distributors available. Sending to random distributor."; + context.setContext(createRandomDistributorTargetContext()); + } + } else { + context.setContext(createRandomDistributorTargetContext()); + sendRandomReason = "No cluster state cached. Sending to random distributor."; + } + if (context.shouldTrace(1)) { + context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); + } + return hostFetcher.getRandomTargetSpec(context); + } + + private static MessageContext createRandomDistributorTargetContext() { + return new MessageContext(null); + } + + private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) { + try { + return Optional.of(new ClusterState(reply.getSystemState())); + } catch (Exception e) { + reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); + return Optional.empty(); + } + } + + void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { + final MessageContext context = (MessageContext) routingContext.getContext(); + final Optional<ClusterState> replyState = clusterStateFromReply(reply); + if (!replyState.isPresent()) { + return; + } + final ClusterState newState = replyState.get(); + resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); + markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); + + if (context.calculatedDistributor == null) { + traceReplyFromRandomDistributor(reply, newState); + } else { + traceReplyFromSpecificDistributor(reply, context, newState); + } + updateCachedRoutingStateFromWrongDistribution(context, newState); + } + + private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { + safeCachedClusterState.set(newState); + if (newState.getClusterState().equals(State.UP)) { + hostFetcher.updateValidTargets(newState); + } + } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { + safeCachedClusterState.set(null); + } else if (context.calculatedDistributor != null) { + persistentFailureChecker.addFailure(context.calculatedDistributor); + } + } + + private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState == null) { + String msg = "Used state must be set as distributor is calculated. Bug."; + reply.getTrace().trace(1, msg); + log.log(Level.SEVERE, msg); + } else if (newState.getVersion() == context.usedState.getVersion()) { + String msg = "Message sent to distributor " + context.calculatedDistributor + + " retrieved cluster state version " + newState.getVersion() + + " which was the state we used to calculate distributor as target last time."; + reply.getTrace().trace(1, msg); + // Client load can be rejected towards distributors even with a matching cluster state version. + // This usually happens during a node fail-over transition, where the target distributor will + // reject an operation bound to a particular bucket if it does not own the bucket in _both_ + // the current and the next (transition target) state. Since it can happen during normal operation + // and will happen per client operation, we keep this as debug level to prevent spamming the logs. + log.log(Level.FINE, msg); + } else if (newState.getVersion() > context.usedState.getVersion()) { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " updated cluster state from version " + context.usedState.getVersion() + + " to " + newState.getVersion()); + } + } else { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " returned older cluster state version " + newState.getVersion()); + } + } + } + + private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { + if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { + oldClusterVersionGottenCount.set(0); + safeCachedClusterState.set(null); + } + } + } + + private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(-1); + } + } else { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(0); + } + } + } + + private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { + if (!reply.getTrace().shouldTrace(1)) { + return; + } + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null) { + reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); + } else if (newState.getVersion() == cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); + } else if (newState.getVersion() > cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); + } else { + reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); + } + } + + void handleErrorReply(Reply reply, Object untypedContext) { + MessageContext messageContext = (MessageContext) untypedContext; + if (messageContext.calculatedDistributor != null) { + persistentFailureChecker.addFailure(messageContext.calculatedDistributor); + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Failed with " + messageContext.toString()); } - }; + } } } + private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); + private final DistributorSelectionLogic distributorSelectionLogic; + private final Parameters parameters; + + /** Constructor used in production. */ + public ContentPolicy(String param) { + this(parse(param)); + } + public ContentPolicy(Map<String, String> params) { - super(new ContentParameters(params)); + this(new Parameters(params)); } - public ContentPolicy(String parameters) { - this(parse(parameters)); + /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ + public ContentPolicy(Parameters p) { + super(); + parameters = p; + distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); } + @Override + public void select(RoutingContext context) { + if (context.shouldTrace(1)) { + context.trace(1, "Selecting route"); + } + + BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); + if (context.hasReply()) return; + + String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); + if (context.hasReply()) return; + if (targetSpec != null) { + Route route = new Route(context.getRoute()); + route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); + context.addChild(route); + } else { + context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "Could not resolve any distributors to send to in cluster " + parameters.clusterName); + } + } + + @Override + public void merge(RoutingContext context) { + RoutingNodeIterator it = context.getChildIterator(); + Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); + if (reply == null) { + reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "No reply in any children, nor in the routing context: " + context)); + } + + if (reply instanceof WrongDistributionReply) { + distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); + } else if (reply.hasErrors()) { + distributorSelectionLogic.handleErrorReply(reply, context.getContext()); + } else if (reply instanceof WriteDocumentReply) { + if (context.shouldTrace(9)) { + context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); + } + } + context.setReply(reply); + } + + @Override + public void destroy() { + distributorSelectionLogic.destroy(); + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java index ca32c5722e6..f5b4920fa3f 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java @@ -263,7 +263,7 @@ public class DocumentProtocol implements Protocol { putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg)); putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory()); putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory()); - putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory()); + putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory()); putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory()); // Prepare version specifications to use when adding routable factories. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java index 6954d8f3a1d..7b44a1a4f0d 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java @@ -16,15 +16,6 @@ public abstract class RoutingPolicyFactories { } } - static class StoragePolicyFactory implements RoutingPolicyFactory { - public DocumentProtocolRoutingPolicy createPolicy(String param) { - return new StoragePolicy(param); - } - - public void destroy() { - } - } - static class ContentPolicyFactory implements RoutingPolicyFactory { public DocumentProtocolRoutingPolicy createPolicy(String param) { return new ContentPolicy(param); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java deleted file mode 100644 index b74f7431531..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ /dev/null @@ -1,615 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.concurrent.CopyOnWriteHashMap; -import com.yahoo.document.BucketId; -import com.yahoo.document.BucketIdFactory; -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.jrt.slobrok.api.Mirror; -import java.util.logging.Level; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.routing.Hop; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNodeIterator; -import com.yahoo.messagebus.routing.VerbatimDirective; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.state.ClusterState; -import com.yahoo.vdslib.state.Node; -import com.yahoo.vdslib.state.NodeType; -import com.yahoo.vdslib.state.State; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; - -/** - * Routing policy to determine which distributor in a storage cluster to send data to. - * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. - * - * cluster=[clusterName] (Mandatory, determines the cluster name) - * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) - * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) - * - * @author Haakon Humberset - */ -public class StoragePolicy extends SlobrokPolicy { - - private static final Logger log = Logger.getLogger(StoragePolicy.class.getName()); - public static final String owningBucketStates = "uim"; - private static final String upStates = "ui"; - - /** This class merely generates slobrok a host pattern for a given distributor. */ - public static class SlobrokHostPatternGenerator { - private final String base; - private final String all; - SlobrokHostPatternGenerator(String clusterName) { - base = "storage/cluster." + clusterName + "/distributor/"; - all = base + "*/default"; - - } - - /** - * Find host pattern of the hosts that are valid targets for this request. - * @param distributor Set to -1 if any distributor is valid target. - */ - public String getDistributorHostPattern(Integer distributor) { - return (distributor == null) ? all : (base + distributor + "/default"); - } - } - - /** Helper class to match a host pattern with node to use. */ - public abstract static class HostFetcher { - - private static class Targets { - private final List<Integer> list; - private final int total; - Targets() { - this(Collections.emptyList(), 1); - } - Targets(List<Integer> list, int total) { - this.list = list; - this.total = total; - } - } - - private final int requiredUpPercentageToSendToKnownGoodNodes; - private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets()); - protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. - - protected HostFetcher(int percent) { - requiredUpPercentageToSendToKnownGoodNodes = percent; - } - - void updateValidTargets(ClusterState state) { - List<Integer> validRandomTargets = new ArrayList<>(); - for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) { - if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); - } - validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); - } - public abstract String getTargetSpec(Integer distributor, RoutingContext context); - String getRandomTargetSpec(RoutingContext context) { - Targets targets = validTargets.get(); - // Try to use list of random targets, if at least X % of the nodes are up - while ((targets.total != 0) && - (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) - { - int randIndex = randomizer.nextInt(targets.list.size()); - String targetSpec = getTargetSpec(targets.list.get(randIndex), context); - if (targetSpec != null) { - context.trace(3, "Sending to random node seen up in cluster state"); - return targetSpec; - } - targets.list.remove(randIndex); - } - context.trace(3, "Too few nodes seen up in state. Sending totally random."); - return getTargetSpec(null, context); - } - public void close() {} - } - - /** Host fetcher using a slobrok mirror to find the hosts. */ - public static class SlobrokHostFetcher extends HostFetcher { - private final SlobrokHostPatternGenerator patternGenerator; - private final SlobrokPolicy policy; - - SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(percent); - this.patternGenerator = patternGenerator; - this.policy = policy; - } - - private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) { - return policy.lookup(context, hostPattern); - } - - private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } - - public IMirror getMirror(RoutingContext context) { return context.getMirror(); } - - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); - if (arr.isEmpty()) return null; - if (distributor != null) { - if (arr.size() == 1) { - return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); - } else { - log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); - } - } else { - return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); - } - return null; - } - } - - static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { - - /** - * Distributor index to resolved RPC spec cache for a single given Slobrok - * update generation. Uses a thread safe COW map which will grow until stable. - */ - private static class GenerationCache { - private final int generation; - private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>(); - - GenerationCache(int generation) { - this.generation = generation; - } - - public int generation() { return this.generation; } - - public String get(Integer index) { - return targets.get(index); - } - public void put(Integer index, String target) { - targets.put(index, target); - } - } - - private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); - - TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(patternGenerator, policy, percent); - } - - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - GenerationCache cache = generationCache.get(); - int currentGeneration = getMirror(context).updates(); - // The below code might race with other threads during a generation change. That is OK, as the cache - // is thread safe and will quickly converge to a stable state for the new generation. - if (cache == null || currentGeneration != cache.generation()) { - cache = new GenerationCache(currentGeneration); - generationCache.set(cache); - } - if (distributor != null) { - return cachingGetTargetSpec(distributor, context, cache); - } - // Wildcard lookup case. Must not be cached. - return super.getTargetSpec(null, context); - } - - private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { - String cachedTarget = cache.get(distributor); - if (cachedTarget != null) { - return cachedTarget; - } - // Mirror _may_ be at a higher version if we race with generation read, but that is OK since - // we'll either way get the most up-to-date mapping and the cache will be invalidated on the - // next invocation. - String resolvedTarget = super.getTargetSpec(distributor, context); - cache.put(distributor, resolvedTarget); - return resolvedTarget; - } - - } - - /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ - public static class Parameters { - protected final String clusterName; - protected final String distributionConfigId; - protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; - - public Parameters(Map<String, String> params) { - clusterName = params.get("cluster"); - distributionConfigId = params.get("clusterconfigid"); - slobrokHostPatternGenerator = createPatternGenerator(); - if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); - } - - String getDistributionConfigId() { - return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId); - } - public String getClusterName() { - return clusterName; - } - public SlobrokHostPatternGenerator createPatternGenerator() { - return new SlobrokHostPatternGenerator(getClusterName()); - } - public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { - return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); - } - public Distribution createDistribution(SlobrokPolicy policy) { - return new Distribution(getDistributionConfigId()); - } - - /** - * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the - * failure was related to node being bad. (Hard to detect from failure) - */ - int getAttemptRandomOnFailuresLimit() { return 5; } - - /** - * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the - * old one. This guards us against version resets. - */ - int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } - - /** - * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. - * (To avoid hitting trashing bad nodes still in slobrok) - */ - int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } - } - - /** Helper class to get the bucket identifier of a message. */ - public static class BucketIdCalculator { - private static final BucketIdFactory factory = new BucketIdFactory(); - - private BucketId getBucketId(Message msg) { - switch (msg.getType()) { - case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); - case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); - case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); - case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); - default: - log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); - return null; - } - } - - BucketId handleBucketIdCalculation(RoutingContext context) { - BucketId id = getBucketId(context.getMessage()); - if (id == null || id.getRawId() == 0) { - Reply reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); - context.setReply(reply); - } - return id; - } - } - - /** Class handling the logic of picking a distributor */ - public static class DistributorSelectionLogic { - /** Class that tracks a failure of a given type per node. */ - static class InstabilityChecker { - private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>(); - private final int failureLimit; - - InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } - - boolean tooManyFailures(int nodeIndex) { - if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { - nodeFailures.set(nodeIndex, 0); - return true; - } else { - return false; - } - } - - void addFailure(Integer calculatedDistributor) { - while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); - nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); - } - } - /** Message context class. Contains data we want to inspect about a request at reply time. */ - private static class MessageContext { - final Integer calculatedDistributor; - final ClusterState usedState; - - MessageContext() { - this(null, null); - } - MessageContext(ClusterState usedState) { - this(usedState, null); - } - MessageContext(ClusterState usedState, Integer calculatedDistributor) { - this.calculatedDistributor = calculatedDistributor; - this.usedState = usedState; - } - - public String toString() { - return "Context(Distributor " + calculatedDistributor + - ", state version " + usedState.getVersion() + ")"; - } - } - - private final HostFetcher hostFetcher; - private final Distribution distribution; - private final InstabilityChecker persistentFailureChecker; - private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null); - private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); - private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection - - DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { - try { - hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); - distribution = params.createDistribution(policy); - persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); - maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); - } catch (Throwable e) { - destroy(); - throw e; - } - } - - public void destroy() { - if (hostFetcher != null) { - hostFetcher.close(); - } - if (distribution != null) { - distribution.close(); - } - } - - String getTargetSpec(RoutingContext context, BucketId bucketId) { - String sendRandomReason = null; - ClusterState cachedClusterState = safeCachedClusterState.get(); - - if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. - try{ - Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); - // If we have had too many failures towards existing node, reset failure count and send to random - if (persistentFailureChecker.tooManyFailures(target)) { - sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; - target = null; - } - // If we have found a target, and the target exists in slobrok, send to it. - if (target != null) { - context.setContext(new MessageContext(cachedClusterState, target)); - String targetSpec = hostFetcher.getTargetSpec(target, context); - if (targetSpec != null) { - if (context.shouldTrace(1)) { - context.trace(1, "Using distributor " + target + " for " + - bucketId + " as our state version is " + cachedClusterState.getVersion()); - } - return targetSpec; - } else { - sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; - log.log(Level.FINE, "Target distributor is not in slobrok"); - } - } else { - context.setContext(new MessageContext(cachedClusterState)); - } - } catch (Distribution.TooFewBucketBitsInUseException e) { - Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); - reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(reply); - return null; - } catch (Distribution.NoDistributorsAvailableException e) { - log.log(Level.FINE, "No distributors available; clearing cluster state"); - safeCachedClusterState.set(null); - sendRandomReason = "No distributors available. Sending to random distributor."; - context.setContext(createRandomDistributorTargetContext()); - } - } else { - context.setContext(createRandomDistributorTargetContext()); - sendRandomReason = "No cluster state cached. Sending to random distributor."; - } - if (context.shouldTrace(1)) { - context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); - } - return hostFetcher.getRandomTargetSpec(context); - } - - private static MessageContext createRandomDistributorTargetContext() { - return new MessageContext(null); - } - - private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) { - try { - return Optional.of(new ClusterState(reply.getSystemState())); - } catch (Exception e) { - reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); - return Optional.empty(); - } - } - - void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { - final MessageContext context = (MessageContext) routingContext.getContext(); - final Optional<ClusterState> replyState = clusterStateFromReply(reply); - if (!replyState.isPresent()) { - return; - } - final ClusterState newState = replyState.get(); - resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); - markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); - - if (context.calculatedDistributor == null) { - traceReplyFromRandomDistributor(reply, newState); - } else { - traceReplyFromSpecificDistributor(reply, context, newState); - } - updateCachedRoutingStateFromWrongDistribution(context, newState); - } - - private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { - safeCachedClusterState.set(newState); - if (newState.getClusterState().equals(State.UP)) { - hostFetcher.updateValidTargets(newState); - } - } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { - safeCachedClusterState.set(null); - } else if (context.calculatedDistributor != null) { - persistentFailureChecker.addFailure(context.calculatedDistributor); - } - } - - private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState == null) { - String msg = "Used state must be set as distributor is calculated. Bug."; - reply.getTrace().trace(1, msg); - log.log(Level.SEVERE, msg); - } else if (newState.getVersion() == context.usedState.getVersion()) { - String msg = "Message sent to distributor " + context.calculatedDistributor + - " retrieved cluster state version " + newState.getVersion() + - " which was the state we used to calculate distributor as target last time."; - reply.getTrace().trace(1, msg); - // Client load can be rejected towards distributors even with a matching cluster state version. - // This usually happens during a node fail-over transition, where the target distributor will - // reject an operation bound to a particular bucket if it does not own the bucket in _both_ - // the current and the next (transition target) state. Since it can happen during normal operation - // and will happen per client operation, we keep this as debug level to prevent spamming the logs. - log.log(Level.FINE, msg); - } else if (newState.getVersion() > context.usedState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " updated cluster state from version " + context.usedState.getVersion() + - " to " + newState.getVersion()); - } - } else { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " returned older cluster state version " + newState.getVersion()); - } - } - } - - private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { - if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { - oldClusterVersionGottenCount.set(0); - safeCachedClusterState.set(null); - } - } - } - - private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(-1); - } - } else { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(0); - } - } - } - - private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { - if (!reply.getTrace().shouldTrace(1)) { - return; - } - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null) { - reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); - } else if (newState.getVersion() == cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); - } else if (newState.getVersion() > cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); - } else { - reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); - } - } - - void handleErrorReply(Reply reply, Object untypedContext) { - MessageContext messageContext = (MessageContext) untypedContext; - if (messageContext.calculatedDistributor != null) { - persistentFailureChecker.addFailure(messageContext.calculatedDistributor); - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Failed with " + messageContext.toString()); - } - } - } - } - - private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); - private final DistributorSelectionLogic distributorSelectionLogic; - private final Parameters parameters; - - /** Constructor used in production. */ - public StoragePolicy(String param) { - this(parse(param)); - } - - public StoragePolicy(Map<String, String> params) { - this(new Parameters(params)); - } - - /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ - public StoragePolicy(Parameters p) { - super(); - parameters = p; - distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); - } - - @Override - public void select(RoutingContext context) { - if (context.shouldTrace(1)) { - context.trace(1, "Selecting route"); - } - - BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); - if (context.hasReply()) return; - - String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); - if (context.hasReply()) return; - if (targetSpec != null) { - Route route = new Route(context.getRoute()); - route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); - context.addChild(route); - } else { - context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "Could not resolve any distributors to send to in cluster " + parameters.clusterName); - } - } - - @Override - public void merge(RoutingContext context) { - RoutingNodeIterator it = context.getChildIterator(); - Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); - if (reply == null) { - reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "No reply in any children, nor in the routing context: " + context)); - } - - if (reply instanceof WrongDistributionReply) { - distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); - } else if (reply.hasErrors()) { - distributorSelectionLogic.handleErrorReply(reply, context.getContext()); - } else if (reply instanceof WriteDocumentReply) { - if (context.shouldTrace(9)) { - context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); - } - } - context.setReply(reply); - } - - @Override - public void destroy() { - distributorSelectionLogic.destroy(); - } -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java index cdc5878321a..52afcdfd77c 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -49,8 +49,8 @@ public class TargetCachingSlobrokHostFetcherTest { static class Fixture { SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class); IMirror mockMirror = mock(IMirror.class); - StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); - StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); + ContentPolicy.SlobrokHostPatternGenerator patternGenerator = new ContentPolicy.SlobrokHostPatternGenerator("foo"); + ContentPolicy.TargetCachingSlobrokHostFetcher hostFetcher = new ContentPolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); RoutingContext routingContext = mock(RoutingContext.class); Fixture() { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java index 8a6df061430..018697c0719 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java @@ -15,7 +15,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -public class BasicTests extends StoragePolicyTestEnvironment { +public class BasicTests extends ContentPolicyTestEnvironment { /** Test that we can send a message through the policy. */ @Test diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java index b0cea8ee819..f324245b612 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java @@ -4,7 +4,7 @@ package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy; import org.junit.Ignore; import org.junit.Test; -public class StoragePolicyTest extends Simulator { +public class ContentPolicyTest extends Simulator { /** * Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often * so other requests are sent to wrong target) diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java index 00a045367bb..479e0b0f422 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java @@ -10,7 +10,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy; import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory; -import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; +import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; import com.yahoo.documentapi.messagebus.protocol.test.PolicyTestFrame; import com.yahoo.messagebus.EmptyReply; @@ -36,7 +36,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public abstract class StoragePolicyTestEnvironment { +public abstract class ContentPolicyTestEnvironment { protected StoragePolicyTestFactory policyFactory; protected PolicyTestFrame frame; @@ -102,7 +102,7 @@ public abstract class StoragePolicyTestEnvironment { assertTrue(nodes.remove(second)); } - public static class TestHostFetcher extends StoragePolicy.HostFetcher { + public static class TestHostFetcher extends ContentPolicy.HostFetcher { private final String clusterName; private RandomGen randomizer = new RandomGen(1234); private final Set<Integer> nodes; @@ -143,7 +143,7 @@ public abstract class StoragePolicyTestEnvironment { } } - public static class TestParameters extends StoragePolicy.Parameters { + public static class TestParameters extends ContentPolicy.Parameters { private final TestHostFetcher hostFetcher; private final Distribution distribution; @@ -154,7 +154,7 @@ public abstract class StoragePolicyTestEnvironment { } @Override - public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } + public ContentPolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } @Override public Distribution createDistribution(SlobrokPolicy policy) { return distribution; } @@ -171,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment { public DocumentProtocolRoutingPolicy createPolicy(String parameters) { parameterInstances.addLast(new TestParameters(parameters, nodes)); ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); - return new StoragePolicy(parameterInstances.getLast()); + return new ContentPolicy(parameterInstances.getLast()); } public void avoidPickingAtRandom(Integer distributor) { avoidPickingAtRandom = distributor; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java index 68405b002c8..d23dd9ea998 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java @@ -5,7 +5,7 @@ import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.document.DocumentId; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; +import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; import com.yahoo.messagebus.routing.RoutingNode; import com.yahoo.vdslib.distribution.RandomGen; import com.yahoo.vdslib.state.ClusterState; @@ -21,7 +21,7 @@ import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public abstract class Simulator extends StoragePolicyTestEnvironment { +public abstract class Simulator extends ContentPolicyTestEnvironment { enum FailureType { TRANSIENT_ERROR, @@ -175,7 +175,7 @@ public abstract class Simulator extends StoragePolicyTestEnvironment { for (int i=0; i<params.getParallellRequests(); ++i) { RoutingNode target = targets[i]; int index = getAddress(target).getSecond(); - if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(StoragePolicy.owningBucketStates)) { + if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(ContentPolicy.owningBucketStates)) { ++downnode[half]; } BadNode badNode = params.getBadNodes().get(index); diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 0c659f589d6..02bd6b297d0 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -4,13 +4,13 @@ #include <vespa/documentapi/documentapi.h> #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> #include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/routing/routingnode.h> @@ -51,7 +51,7 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); - StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, + ContentPolicy &setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); @@ -83,10 +83,10 @@ public: void requireThatExternPolicyWithUnknownPatternSelectsNone(); void requireThatExternPolicySelectsFromExternSlobrok(); void requireThatExternPolicyMergesOneReplyAsProtocol(); - void requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); - void requireThatStoragePolicyIsRandomWithoutState(); - void requireThatStoragePolicyIsTargetedWithState(); - void requireThatStoragePolicyCombinesSystemAndSlobrokState(); + void requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); + void requireThatContentPolicyIsRandomWithoutState(); + void requireThatContentPolicyIsTargetedWithState(); + void requireThatContentPolicyCombinesSystemAndSlobrokState(); }; TEST_APPHOOK(Test); @@ -128,10 +128,10 @@ Test::Main() { requireThatExternPolicySelectsFromExternSlobrok(); TEST_FLUSH(); requireThatExternPolicyMergesOneReplyAsProtocol(); TEST_FLUSH(); - requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); - requireThatStoragePolicyIsRandomWithoutState(); TEST_FLUSH(); - requireThatStoragePolicyIsTargetedWithState(); TEST_FLUSH(); - requireThatStoragePolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); + requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); + requireThatContentPolicyIsRandomWithoutState(); TEST_FLUSH(); + requireThatContentPolicyIsTargetedWithState(); TEST_FLUSH(); + requireThatContentPolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); TEST_DONE(); } @@ -782,15 +782,15 @@ void Test::testLoadBalancer() { } void -Test::requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy() +Test::requireThatContentPolicyWithIllegalParamIsAnErrorPolicy() { - EXPECT_TRUE(isErrorPolicy("Storage", "")); - EXPECT_TRUE(isErrorPolicy("Storage", "config=foo;slobroks=foo")); - EXPECT_TRUE(isErrorPolicy("Storage", "slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "")); + EXPECT_TRUE(isErrorPolicy("Content", "config=foo;slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "slobroks=foo")); } void -Test::requireThatStoragePolicyIsRandomWithoutState() +Test::requireThatContentPolicyIsRandomWithoutState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -808,7 +808,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -826,15 +826,15 @@ Test::requireThatStoragePolicyIsRandomWithoutState() } } -StoragePolicy & -Test::setupStoragePolicy(TestFrame &frame, const string ¶m, +ContentPolicy & +Test::setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern, int32_t numEntries) { - frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Storage:%s]", param.c_str()))); + frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Content:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0)); - StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + ContentPolicy &policy = static_cast<ContentPolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); @@ -845,7 +845,7 @@ Test::setupStoragePolicy(TestFrame &frame, const string ¶m, } void -Test::requireThatStoragePolicyIsTargetedWithState() +Test::requireThatContentPolicyIsTargetedWithState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -863,7 +863,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -888,7 +888,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() } void -Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() +Test::requireThatContentPolicyCombinesSystemAndSlobrokState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -902,7 +902,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); ASSERT_TRUE(policy.getSystemState() == nullptr); diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 560f2f28f0e..0f86eb38aca 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo, // When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now. putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>()); putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); - putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); + putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO Vespa 8: remove putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg)); putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>()); + putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>()); + putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>()); - putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>()); putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>()); - putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 26d51e702e9..83e1df02a24 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp externslobrokpolicy.cpp - storagepolicy.cpp contentpolicy.cpp messagetypepolicy.cpp documentrouteselectorpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index aea393a60af..7150794653f 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -1,12 +1,79 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "contentpolicy.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/error.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config/subscription/configuri.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".contentpolicy"); + +using vespalib::make_string; namespace documentapi { ContentPolicy::ContentPolicy(const string& param) - : StoragePolicy(param) -{ } + : ExternSlobrokPolicy(parse(param)), + _bucketIdFactory() +{ + std::map<string, string> params(parse(param)); + + if (params.find("cluster") != params.end()) { + _clusterName = params.find("cluster")->second; + } else { + _error = "Required parameter clustername not set"; + } + + if (params.find("clusterconfigid") != params.end()) { + _clusterConfigId = params.find("clusterconfigid")->second; + } +} + +namespace { + class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> + { + public: + CallBack(ContentPolicy & policy) : _policy(policy) { } + void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { + _policy.configure(std::move(config)); + } + private: + ContentPolicy & _policy; + }; +} +string ContentPolicy::init() +{ + string error = ExternSlobrokPolicy::init(); + if (!error.empty()) { + return error; + } + + if (_clusterConfigId.empty()) { + _clusterConfigId = createConfigId(_clusterName); + } + + using storage::lib::Distribution; + config::ConfigUri uri(_clusterConfigId); + if (!_configSources.empty()) { + _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + } else { + _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + } + _callBack = std::make_unique<CallBack>(*this); + _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); + _configFetcher->start(); + return ""; +} + +ContentPolicy::~ContentPolicy() = default; string ContentPolicy::createConfigId(const string & clusterName) const @@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const return clusterName; } +string +ContentPolicy::createPattern(const string & clusterName, int distributor) const +{ + vespalib::asciistream ost; + + ost << "storage/cluster." << clusterName << "/distributor/"; + + if (distributor == -1) { + ost << '*'; + } else { + ost << distributor; + } + ost << "/default"; + return ost.str(); +} + +void +ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) +{ + try { + _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); + } catch (const std::exception& e) { + LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); + throw e; + } +} + +void +ContentPolicy::doSelect(mbus::RoutingContext &context) +{ + const mbus::Message &msg = context.getMessage(); + + int distributor = -1; + + if (_state.get()) { + document::BucketId id; + switch(msg.getType()) { + case DocumentProtocol::MESSAGE_PUTDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); + break; + + case DocumentProtocol::MESSAGE_GETDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); + break; + + case DocumentProtocol::MESSAGE_STATBUCKET: + id = static_cast<const StatBucketMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_GETBUCKETLIST: + id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_CREATEVISITOR: + id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; + break; + + case DocumentProtocol::MESSAGE_REMOVELOCATION: + id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); + break; + + default: + LOG(error, "Message type '%d' not supported.", msg.getType()); + return; + } + + // _P_A_R_A_N_O_I_A_ + if (id.getRawId() == 0) { + mbus::Reply::UP reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, + "No bucket id available in message.")); + context.setReply(std::move(reply)); + return; + } + + // Pick a distributor using ideal state algorithm + try { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { + _distribution = std::move(_nextDistribution); + _nextDistribution.reset(); + } + assert(_distribution.get()); + distributor = _distribution->getIdealDistributorNode(*_state, id); + } catch (storage::lib::TooFewBucketBitsInUseException& e) { + auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); + reply->addError(mbus::Error( + DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(std::move(reply)); + return; + } catch (storage::lib::NoDistributorsAvailableException& e) { + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target + _state.reset(); + distributor = -1; + } + } + + mbus::Hop hop = getRecipient(context, distributor); + + if (distributor != -1 && !hop.hasDirectives()) { + hop = getRecipient(context, -1); + } + + if (hop.hasDirectives()) { + mbus::Route route = context.getRoute(); + route.setHop(0, hop); + context.addChild(route); + } else { + context.setError( + mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, + make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); + } +} + +mbus::Hop +ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) +{ + slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); + + if (!entries.empty()) { + return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); + } + + return mbus::Hop(); +} + +void +ContentPolicy::merge(mbus::RoutingContext &context) +{ + mbus::RoutingNodeIterator it = context.getChildIterator(); + mbus::Reply::UP reply = it.removeReply(); + + if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { + updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); + } else if (reply->hasErrors()) { + _state.reset(); + } + + context.setReply(std::move(reply)); +} + +void +ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) +{ + std::unique_ptr<storage::lib::ClusterState> newState( + new storage::lib::ClusterState(wdr.getSystemState())); + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { + wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", + _state->getVersion(), newState->getVersion())); + } else { + wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); + } + + _state = std::move(newState); + } else { + wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " + "while old state had version %d. New states should not have a lower version than the old.", + newState->getVersion(), _state->getVersion())); + _state.reset(); + } +} + } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index 4b2f356c740..e29fbb75524 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -1,17 +1,62 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "storagepolicy.h" +#include "externslobrokpolicy.h" +#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> +#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/config/helper/configfetcher.h> + +namespace config { + class ICallback; + class ConfigFetcher; +} + +namespace storage { +namespace lib { + class Distribution; + class ClusterState; +} +} namespace documentapi { -class ContentPolicy : public StoragePolicy +class ContentPolicy : public ExternSlobrokPolicy { +private: + document::BucketIdFactory _bucketIdFactory; + std::unique_ptr<storage::lib::ClusterState> _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr<config::ICallback> _callBack; + std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::unique_ptr<storage::lib::Distribution> _distribution; + std::unique_ptr<storage::lib::Distribution> _nextDistribution; + + mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + public: ContentPolicy(const string& param); + ~ContentPolicy(); + void doSelect(mbus::RoutingContext &context) override; + void merge(mbus::RoutingContext &context) override; + + void updateStateFromReply(WrongDistributionReply& reply); + + /** + * @return a pointer to the system state registered with this policy. If + * we haven't received a system state yet, returns NULL. + */ + const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + + virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); + string init() override; + private: - string createConfigId(const string & clusterName) const override; + string createConfigId(const string & clusterName) const; + string createPattern(const string & clusterName, int distributor) const; }; } - diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp deleted file mode 100644 index 3fc1df0352a..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storagepolicy.h" -#include <vespa/document/base/documentid.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/error.h> -#include <vespa/documentapi/documentapi.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config/subscription/configuri.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".storagepolicy"); - -using vespalib::make_string; - -namespace documentapi { - -StoragePolicy::StoragePolicy(const string& param) - : ExternSlobrokPolicy(parse(param)), - _bucketIdFactory() -{ - std::map<string, string> params(parse(param)); - - if (params.find("cluster") != params.end()) { - _clusterName = params.find("cluster")->second; - } else { - _error = "Required parameter clustername not set"; - } - - if (params.find("clusterconfigid") != params.end()) { - _clusterConfigId = params.find("clusterconfigid")->second; - } -} - -namespace { - class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> - { - public: - CallBack(StoragePolicy & policy) : _policy(policy) { } - void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { - _policy.configure(std::move(config)); - } - private: - StoragePolicy & _policy; - }; -} -string StoragePolicy::init() -{ - string error = ExternSlobrokPolicy::init(); - if (!error.empty()) { - return error; - } - - if (_clusterConfigId.empty()) { - _clusterConfigId = createConfigId(_clusterName); - } - - using storage::lib::Distribution; - config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); - } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); - } - _callBack = std::make_unique<CallBack>(*this); - _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); - _configFetcher->start(); - return ""; -} - -StoragePolicy::~StoragePolicy() = default; - -string -StoragePolicy::createConfigId(const string & clusterName) const -{ - return "storage/cluster." + clusterName; -} - -string -StoragePolicy::createPattern(const string & clusterName, int distributor) const -{ - vespalib::asciistream ost; - - ost << "storage/cluster." << clusterName << "/distributor/"; - - if (distributor == -1) { - ost << '*'; - } else { - ost << distributor; - } - ost << "/default"; - return ost.str(); -} - -void -StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) -{ - try { - _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); - } catch (const std::exception& e) { - LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); - throw e; - } -} - -void -StoragePolicy::doSelect(mbus::RoutingContext &context) -{ - const mbus::Message &msg = context.getMessage(); - - int distributor = -1; - - if (_state.get()) { - document::BucketId id; - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); - break; - - case DocumentProtocol::MESSAGE_STATBUCKET: - id = static_cast<const StatBucketMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_GETBUCKETLIST: - id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_CREATEVISITOR: - id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; - break; - - case DocumentProtocol::MESSAGE_REMOVELOCATION: - id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - - // _P_A_R_A_N_O_I_A_ - if (id.getRawId() == 0) { - mbus::Reply::UP reply(new mbus::EmptyReply()); - reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, - "No bucket id available in message.")); - context.setReply(std::move(reply)); - return; - } - - // Pick a distributor using ideal state algorithm - try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); - } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); - reply->addError(mbus::Error( - DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(std::move(reply)); - return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target - _state.reset(); - distributor = -1; - } - } - - mbus::Hop hop = getRecipient(context, distributor); - - if (distributor != -1 && !hop.hasDirectives()) { - hop = getRecipient(context, -1); - } - - if (hop.hasDirectives()) { - mbus::Route route = context.getRoute(); - route.setHop(0, hop); - context.addChild(route); - } else { - context.setError( - mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, - make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); - } -} - -mbus::Hop -StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor) -{ - slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); - - if (!entries.empty()) { - return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); - } - - return mbus::Hop(); -} - -void -StoragePolicy::merge(mbus::RoutingContext &context) -{ - mbus::RoutingNodeIterator it = context.getChildIterator(); - mbus::Reply::UP reply = it.removeReply(); - - if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); - } else if (reply->hasErrors()) { - _state.reset(); - } - - context.setReply(std::move(reply)); -} - -void -StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) -{ - std::unique_ptr<storage::lib::ClusterState> newState( - new storage::lib::ClusterState(wdr.getSystemState())); - if (!_state || newState->getVersion() >= _state->getVersion()) { - if (_state) { - wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", - _state->getVersion(), newState->getVersion())); - } else { - wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); - } - - _state = std::move(newState); - } else { - wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " - "while old state had version %d. New states should not have a lower version than the old.", - newState->getVersion(), _state->getVersion())); - _state.reset(); - } -} - -} // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h deleted file mode 100644 index 5cd2efcbbd3..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "externslobrokpolicy.h" -#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/messagebus/routing/hop.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> - -namespace config { - class ICallback; - class ConfigFetcher; -} - -namespace storage { -namespace lib { - class Distribution; - class ClusterState; -} -} - -namespace documentapi { - -class StoragePolicy : public ExternSlobrokPolicy -{ -private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr<storage::lib::ClusterState> _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr<config::ICallback> _callBack; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<storage::lib::Distribution> _distribution; - std::unique_ptr<storage::lib::Distribution> _nextDistribution; - - mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); - -public: - StoragePolicy(const string& param); - ~StoragePolicy(); - void doSelect(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; - - void updateStateFromReply(WrongDistributionReply& reply); - - /** - * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. - */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } - - virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); - string init() override; - -private: - virtual string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; -}; - -} - diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 2c244c63046..f945fe8cd02 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -1,16 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingpolicyfactories.h" #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> +#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> +#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> -#include <vespa/documentapi/messagebus/policies/contentpolicy.h> -#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> -#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> using namespace documentapi; @@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string ¶m) cons } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string ¶m) const -{ - mbus::IRoutingPolicy::UP ret(new StoragePolicy(param)); - string error = static_cast<StoragePolicy&>(*ret).getError(); - if (!error.empty()) { - ret.reset(new ErrorPolicy(error)); - } - return ret; -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index e2bf5119c58..533ad93e644 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -16,10 +16,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class StoragePolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class MessageTypePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index 1f5d60a630a..a4b5422d502 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler { } /** - * Conveniece method for acknowledging a message back to the sender. + * Convenience method for acknowledging a message for its sender. * * This is equivalent to: * <pre> @@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler { * Messages should be acknowledged when * <ul> * <li>this destination has safely and permanently applied the message, or - * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message + * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message. * </ul> * * @param msg The message to acknowledge back to the sender. @@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler { } /** - * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it - * used to reach the detination. + * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it + * used to reach the destination. * * @param reply The reply, created from the message this is a reply to. */ diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java index 4aa85c1ea87..475dea3d7ac 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java @@ -78,9 +78,9 @@ public final class Error { @Override public String toString() { String name = ErrorCode.getName(code); - return "[" + - (name != null ? name : code) + " @ " + - (service != null ? service : "localhost") + - "]: " + message; + return "[" + + name + " @ " + + (service != null ? service : "localhost") + + "]: " + message; } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index 88a6d5376b1..f3c6422fbfd 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -97,36 +97,25 @@ public class LocalNetwork implements Network { msg.setRetryEnabled(envelope.msg.getRetryEnabled()); msg.setRetry(envelope.msg.getRetry()); msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); - msg.pushHandler(new ReplyHandler() { - - @Override - public void handleReply(Reply reply) { - new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); - } - }); - owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress()) - .getSessionName()); + msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); + owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); } }); } private void receiveLater(ReplyEnvelope envelope) { byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); - executor.execute(new Runnable() { - - @Override - public void run() { - Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); - reply.setRetryDelay(envelope.reply.getRetryDelay()); - reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); - for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - Error error = envelope.reply.getError(i); - reply.addError(new Error(error.getCode(), - error.getMessage(), - error.getService() != null ? error.getService() : envelope.sender.hostId)); - } - owner.deliverReply(reply, envelope.parent.recipient); + executor.execute(() -> { + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + reply.setRetryDelay(envelope.reply.getRetryDelay()); + reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); + for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { + Error error = envelope.reply.getError(i); + reply.addError(new Error(error.getCode(), + error.getMessage(), + error.getService() != null ? error.getService() : envelope.sender.hostId)); } + owner.deliverReply(reply, envelope.parent.recipient); }); } @@ -137,23 +126,16 @@ public class LocalNetwork implements Network { return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode); } - @SuppressWarnings("unchecked") private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { - if (toDecode.length == 0) { - return clazz.cast(new EmptyReply()); - } - return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); + return clazz.cast(toDecode.length == 0 ? new EmptyReply() + : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); } @Override - public void sync() { - - } + public void sync() { } @Override - public void shutdown() { - - } + public void shutdown() { } @Override public String getConnectionSpec() { @@ -178,7 +160,7 @@ public class LocalNetwork implements Network { } void send() { - LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); + ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this); } } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 5a370209f1c..2988ce07ae6 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -377,10 +377,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .map(parameters::withRoute) .orElse(parameters); break; - case CONDITION: - parameters = getProperty(request, ROUTE).map(parameters::withRoute) - .orElse(parameters); - break; case FIELD_SET: parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet) .orElse(parameters); @@ -702,7 +698,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.reader = reader; } - /** Write is complete when we have stored the buffer — call completion handler. */ + /** Write is complete when we have stored the buffer — call completion handler. */ @Override public void write(ByteBuffer buf, CompletionHandler handler) { try { @@ -948,18 +944,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { static class StorageCluster { private final String name; - private final String configId; private final Map<String, String> documentBuckets; - StorageCluster(String name, String configId, Map<String, String> documentBuckets) { + StorageCluster(String name, Map<String, String> documentBuckets) { this.name = requireNonNull(name); - this.configId = requireNonNull(configId); this.documentBuckets = Map.copyOf(documentBuckets); } String name() { return name; } - String configId() { return configId; } - String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } + String route() { return name() + "-direct"; } Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } } @@ -968,7 +961,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return clusters.storage().stream() .collect(toUnmodifiableMap(storage -> storage.name(), storage -> new StorageCluster(storage.name(), - storage.configid(), buckets.cluster(storage.name()) .documentType().entrySet().stream() .collect(toMap(entry -> entry.getKey(), diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 826821981ba..2b47f9c92c2 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -113,7 +113,6 @@ public class DocumentV1ApiTest { } final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", - "config-id", Map.of("music", "default"))); ManualClock clock; MockDocumentAccess access; @@ -139,7 +138,7 @@ public class DocumentV1ApiTest { public void testResolveCluster() { assertEquals("content", DocumentV1ApiHandler.resolveCluster(Optional.empty(), clusters).name()); - assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", + assertEquals("content-direct", DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters).route()); try { DocumentV1ApiHandler.resolveCluster(Optional.empty(), Map.of()); @@ -157,8 +156,8 @@ public class DocumentV1ApiTest { } try { Map<String, StorageCluster> twoClusters = new TreeMap<>(); - twoClusters.put("one", new StorageCluster("one", "one-config", Map.of())); - twoClusters.put("two", new StorageCluster("two", "two-config", Map.of())); + twoClusters.put("one", new StorageCluster("one", Map.of())); + twoClusters.put("two", new StorageCluster("two", Map.of())); DocumentV1ApiHandler.resolveCluster(Optional.empty(), twoClusters); fail("More than one cluster and no document type should fail"); } @@ -193,7 +192,7 @@ public class DocumentV1ApiTest { // GET at root is a visit. Numeric parameters have an upper bound. access.expect(parameters -> { - assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", parameters.getRoute().toString()); + assertEquals("content-direct", parameters.getRoute().toString()); assertEquals("default", parameters.getBucketSpace()); assertEquals(1024, parameters.getMaxTotalHits()); assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); @@ -275,7 +274,7 @@ public class DocumentV1ApiTest { // GET with full document ID is a document get operation which returns 404 when no document is found access.session.expect((id, parameters) -> { assertEquals(doc1.getId(), id); - assertEquals(parameters().withRoute("[Storage:cluster=content;clusterconfigid=config-id]").withFieldSet("go"), parameters); + assertEquals(parameters().withRoute("content-direct").withFieldSet("go"), parameters); parameters.responseHandler().get().handleResponse(new DocumentResponse(0, null)); return new Result(Result.ResultType.SUCCESS, null); }); diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java index 95bd9cf1cb5..f3b8c189fc5 100644 --- a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java @@ -2,14 +2,8 @@ package com.yahoo.vespaclient; public class ClusterDef { - public ClusterDef(String name, String configId) { - this.name = name; - this.configId = configId; - } - - String name; - String configId; - + private final String name; + public ClusterDef(String name) { this.name = name; } public String getName() { return name; } - public String getConfigId() { return configId; } + public String getRoute() { return name + "-direct"; } } diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java index 4525ffcae39..2c3a0b72ed5 100644 --- a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java @@ -4,40 +4,38 @@ package com.yahoo.vespaclient; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.config.subscription.ConfigGetter; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** A list of content clusters, either obtained from a list, a given config or by self-subscribing */ public class ClusterList { - List<ClusterDef> contentClusters = new ArrayList<>(); + private final List<ClusterDef> contentClusters; public ClusterList() { - this(new ArrayList<>()); + this(List.of()); } public ClusterList(List<ClusterDef> contentClusters) { - this.contentClusters = contentClusters; + this.contentClusters = List.copyOf(contentClusters); } public ClusterList(String configId) { - configure(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId)); + this(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId)); } public ClusterList(ClusterListConfig config) { - configure(config); + this(parse(config)); } - private void configure(ClusterListConfig config) { - contentClusters.clear(); // TODO: Create a new - for (int i = 0; i < config.storage().size(); i++) - contentClusters.add(new ClusterDef(config.storage(i).name(), config.storage(i).configid())); + public List<ClusterDef> getStorageClusters() { + return contentClusters; } - /** Returns a reference to the mutable list */ - public List<ClusterDef> getStorageClusters() { - return contentClusters; // TODO: Use immutable list + private static List<ClusterDef> parse(ClusterListConfig config) { + return config.storage().stream() + .map(storage -> new ClusterDef(storage.name())) + .collect(Collectors.toUnmodifiableList()); } } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java b/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java index 6c8296d7979..ebed1685a5f 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java @@ -116,7 +116,7 @@ public class DocumentRetriever { "The Vespa cluster contains the content clusters %s, not %s. Please select a valid vespa cluster.", names, clusterName)); } - return String.format("[Storage:cluster=%s;clusterconfigid=%s]", clusterDef.getName(), clusterDef.getConfigId()); + return clusterDef.getRoute(); } private LoadType resolveLoadType(String loadTypeName) throws DocumentRetrieverException { diff --git a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java index 88eed9dfc59..b1f91e44e5c 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java @@ -600,7 +600,7 @@ public class VdsVisit { names + ". Please use the -c option to select one of them as a target for visiting."); } - return "[Storage:cluster=" + found.getName() + ";clusterconfigid=" + found.getConfigId() + "]"; + return found.getRoute(); } protected static void verbosePrintParameters(VdsVisitParameters vdsParams, PrintStream out) { diff --git a/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java b/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java index d8b5c267bf2..d6bda4b8bc1 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java @@ -234,14 +234,14 @@ public class DocumentRetrieverTest { @Test public void testClusterLookup() throws DocumentRetrieverException { - final String cluster = "storage", configId = "content/cluster.foo/storage", - expectedRoute = "[Storage:cluster=storage;clusterconfigid=content/cluster.foo/storage]"; + final String cluster = "storage", + expectedRoute = "storage-direct"; ClientParameters params = createParameters() .setCluster(cluster) .build(); - ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef(cluster, configId))); + ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef(cluster))); DocumentRetriever documentRetriever = createDocumentRetriever(params, clusterList); documentRetriever.retrieveDocuments(); @@ -258,7 +258,7 @@ public class DocumentRetrieverTest { .setCluster("invalidclustername") .build(); - ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef("storage", "content/cluster.foo/storage"))); + ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef("storage"))); DocumentRetriever documentRetriever = createDocumentRetriever(params, clusterList); documentRetriever.retrieveDocuments(); diff --git a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java index 4c8fbb1beee..7828fbb249d 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java @@ -228,17 +228,17 @@ public class VdsVisitTestCase { @Test public void testAutoSelectClusterRoute() throws Exception { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); + clusterDefs.add(new ClusterDef("storage")); ClusterList clusterList = new ClusterList(clusterDefs); String route = VdsVisit.resolveClusterRoute(clusterList, null); - assertEquals("[Storage:cluster=storage;clusterconfigid=content/cluster.foo/storage]", route); + assertEquals("storage-direct", route); } @Test public void testBadClusterName() throws Exception { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); + clusterDefs.add(new ClusterDef("storage")); ClusterList clusterList = new ClusterList(clusterDefs); try { VdsVisit.resolveClusterRoute(clusterList, "borkbork"); @@ -252,8 +252,8 @@ public class VdsVisitTestCase { @Test public void testRequireClusterOptionIfMultipleClusters() { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); - clusterDefs.add(new ClusterDef("storage2", "content/cluster.bar/storage")); + clusterDefs.add(new ClusterDef("storage")); + clusterDefs.add(new ClusterDef("storage2")); ClusterList clusterList = new ClusterList(clusterDefs); try { VdsVisit.resolveClusterRoute(clusterList, null); @@ -265,12 +265,12 @@ public class VdsVisitTestCase { @Test public void testExplicitClusterOptionWithMultipleClusters() { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); - clusterDefs.add(new ClusterDef("storage2", "content/cluster.bar/storage")); + clusterDefs.add(new ClusterDef("storage")); + clusterDefs.add(new ClusterDef("storage2")); ClusterList clusterList = new ClusterList(clusterDefs); String route = VdsVisit.resolveClusterRoute(clusterList, "storage2"); - assertEquals("[Storage:cluster=storage2;clusterconfigid=content/cluster.bar/storage]", route); + assertEquals("storage2-direct", route); } @Test |