diff options
47 files changed, 1217 insertions, 1307 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index fd887a4196b..3b7c9160b53 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -217,12 +217,10 @@ public class Reindexer { static class Cluster { private final String name; - private final String configId; private final Map<DocumentType, String> documentBuckets; - Cluster(String name, String configId, Map<DocumentType, String> documentBuckets) { + Cluster(String name, Map<DocumentType, String> documentBuckets) { this.name = requireNonNull(name); - this.configId = requireNonNull(configId); this.documentBuckets = Map.copyOf(documentBuckets); } @@ -231,7 +229,7 @@ public class Reindexer { } String route() { - return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]"; + return "[Content:cluster=" + name + "]"; } String bucketSpaceOf(DocumentType documentType) { @@ -244,20 +242,18 @@ public class Reindexer { if (o == null || getClass() != o.getClass()) return false; Cluster cluster = (Cluster) o; return name.equals(cluster.name) && - configId.equals(cluster.configId) && documentBuckets.equals(cluster.documentBuckets); } @Override public int hashCode() { - return Objects.hash(name, configId, documentBuckets); + return Objects.hash(name, documentBuckets); } @Override public String toString() { return "Cluster{" + "name='" + name + '\'' + - ", configId='" + configId + '\'' + ", documentBuckets=" + documentBuckets + '}'; } diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 8668ed037ef..e62314429fb 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -141,7 +141,6 @@ public class ReindexingMaintainer extends AbstractComponent { return clusters.storage().stream() .filter(storage -> storage.name().equals(name)) .map(storage -> new Cluster(name, - storage.configid(), bucketSpaces.cluster(name) .documentType().entrySet().stream() .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 9a88d8aad1f..01586e06015 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -45,7 +45,7 @@ class ReindexerTest { final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); - final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); + final Cluster cluster = new Cluster("cluster", Map.of(music, "default")); final MockMetric metric = new MockMetric(); final ManualClock clock = new ManualClock(Instant.EPOCH); @@ -59,7 +59,7 @@ class ReindexerTest { @Test void throwsWhenUnknownBuckets() { assertThrows(NullPointerException.class, - () -> new Reindexer(new Cluster("cluster", "id", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); + () -> new Reindexer(new Cluster("cluster", Map.of()), Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock, 0.2)); } @Test @@ -84,7 +84,7 @@ class ReindexerTest { assertEquals("music:[document]", parameters.getFieldSet()); assertSame(token, parameters.getResumeToken()); assertEquals("default", parameters.getBucketSpace()); - assertEquals("[Storage:cluster=cluster;clusterconfigid=id]", parameters.getRoute().toString()); + assertEquals("[Content:cluster=cluster]", parameters.getRoute().toString()); assertEquals("cluster", parameters.getRemoteDataHandler()); assertEquals("music", parameters.getDocumentSelection()); assertEquals(DocumentProtocol.Priority.NORMAL_3, parameters.getPriority()); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java index afa68debadb..f69c5d28a01 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -43,7 +43,7 @@ class ReindexingMaintainerTest { .build(), manager)); - assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")), + assertEquals(new Cluster("cluster", Map.of(manager.getDocumentType("music"), "default")), parseCluster("cluster", new ClusterListConfig.Builder() .storage(new ClusterListConfig.Storage.Builder() diff --git a/config-model/src/main/java/com/yahoo/vespa/model/utils/FileSender.java b/config-model/src/main/java/com/yahoo/vespa/model/utils/FileSender.java index 6e6a4d5ac8f..7cc9fa7ae02 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/utils/FileSender.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/utils/FileSender.java @@ -3,15 +3,19 @@ package com.yahoo.vespa.model.utils; import com.yahoo.config.FileReference; import com.yahoo.config.application.api.DeployLogger; -import com.yahoo.config.model.producer.UserConfigRepo; -import java.util.logging.Level; -import com.yahoo.vespa.config.*; import com.yahoo.config.model.producer.AbstractConfigProducer; +import com.yahoo.config.model.producer.UserConfigRepo; +import com.yahoo.vespa.config.ConfigDefinition; import com.yahoo.vespa.config.ConfigDefinition.DefaultValued; +import com.yahoo.vespa.config.ConfigDefinitionKey; +import com.yahoo.vespa.config.ConfigPayloadBuilder; import com.yahoo.vespa.model.AbstractService; import java.io.Serializable; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; /** * Utility methods for sending files to a collection of nodes. @@ -84,7 +88,7 @@ public class FileSender implements Serializable { ConfigDefinition configDefinition = builder.getConfigDefinition(); if (configDefinition == null) { // TODO: throw new IllegalArgumentException("Not able to find config definition for " + builder); - logger.log(Level.WARNING, "Not able to find config definition for " + key + ". Will not send files for this config"); + logger.log(Level.FINE, "Not able to find config definition for " + key + ". Will not send files for this config"); return; } // Inspect fields at this level diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index a70f59bb9fb..39d6898215c 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -1519,30 +1519,92 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$ContentParameters": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters", + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$BucketIdCalculator": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>()" + ], + "fields": [] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$DistributorSelectionLogic": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void destroy()" + ], + "fields": [] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "abstract" + ], + "methods": [ + "protected void <init>(int)", + "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)", + "public void close()" + ], + "fields": [ + "protected final java.util.Random randomizer" + ] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters": { + "superClass": "java.lang.Object", "interfaces": [], "attributes": [ "public" ], "methods": [ "public void <init>(java.util.Map)", - "public java.lang.String getDistributionConfigId()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()" + "public java.lang.String getClusterName()", + "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator createPatternGenerator()", + "public com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)", + "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)" + ], + "fields": [ + "protected final java.lang.String clusterName", + "protected final java.lang.String distributionConfigId", + "protected final com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator" + ] + }, + "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$SlobrokHostFetcher": { + "superClass": "com.yahoo.documentapi.messagebus.protocol.ContentPolicy$HostFetcher", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)", + "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)" ], "fields": [] }, "com.yahoo.documentapi.messagebus.protocol.ContentPolicy": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy", + "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy", "interfaces": [], "attributes": [ "public" ], "methods": [ + "public void <init>(java.lang.String)", "public void <init>(java.util.Map)", - "public void <init>(java.lang.String)" + "public void <init>(com.yahoo.documentapi.messagebus.protocol.ContentPolicy$Parameters)", + "public void select(com.yahoo.messagebus.routing.RoutingContext)", + "public void merge(com.yahoo.messagebus.routing.RoutingContext)", + "public void destroy()" ], - "fields": [] + "fields": [ + "public static final java.lang.String owningBucketStates" + ] }, "com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage": { "superClass": "com.yahoo.documentapi.messagebus.protocol.DocumentMessage", @@ -2979,104 +3041,6 @@ ], "fields": [] }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$BucketIdCalculator": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>()" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$DistributorSelectionLogic": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void destroy()" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public", - "abstract" - ], - "methods": [ - "protected void <init>(int)", - "public abstract java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)", - "public void close()" - ], - "fields": [ - "protected final java.util.Random randomizer" - ] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.util.Map)", - "public java.lang.String getClusterName()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator createPatternGenerator()", - "public com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher createHostFetcher(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy, int)", - "public com.yahoo.vdslib.distribution.Distribution createDistribution(com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy)" - ], - "fields": [ - "protected final java.lang.String clusterName", - "protected final java.lang.String distributionConfigId", - "protected final com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator slobrokHostPatternGenerator" - ] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostFetcher": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$HostFetcher", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public com.yahoo.jrt.slobrok.api.IMirror getMirror(com.yahoo.messagebus.routing.RoutingContext)", - "public java.lang.String getTargetSpec(java.lang.Integer, com.yahoo.messagebus.routing.RoutingContext)" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy$SlobrokHostPatternGenerator": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public java.lang.String getDistributorHostPattern(java.lang.Integer)" - ], - "fields": [] - }, - "com.yahoo.documentapi.messagebus.protocol.StoragePolicy": { - "superClass": "com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy", - "interfaces": [], - "attributes": [ - "public" - ], - "methods": [ - "public void <init>(java.lang.String)", - "public void <init>(java.util.Map)", - "public void <init>(com.yahoo.documentapi.messagebus.protocol.StoragePolicy$Parameters)", - "public void select(com.yahoo.messagebus.routing.RoutingContext)", - "public void merge(com.yahoo.messagebus.routing.RoutingContext)", - "public void destroy()" - ], - "fields": [ - "public static final java.lang.String owningBucketStates" - ] - }, "com.yahoo.documentapi.messagebus.protocol.SubsetServicePolicy": { "superClass": "java.lang.Object", "interfaces": [ diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 257d491ea93..982a1c50b85 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -917,7 +917,7 @@ public class MessageBusVisitorSession implements VisitorSession { } } if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) { - handleWrongDistributionReply((WrongDistributionReply)reply); + handleWrongDistributionReply((WrongDistributionReply) reply); } else { if (shouldReportError(reply)) { reportVisitorError(message); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java index d6e20b9d57f..2d78497456d 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java @@ -1,43 +1,613 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.document.BucketId; +import com.yahoo.document.BucketIdFactory; +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingNodeIterator; +import com.yahoo.messagebus.routing.VerbatimDirective; +import com.yahoo.vdslib.distribution.Distribution; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; /** - * Policy to talk to content clusters. + * Routing policy to determine which distributor in a content cluster to send data to. + * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. + * + * cluster=[clusterName] (Mandatory, determines the cluster name) + * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) + * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) + * + * @author Haakon Humberset */ -public class ContentPolicy extends StoragePolicy { +public class ContentPolicy extends SlobrokPolicy { + + private static final Logger log = Logger.getLogger(ContentPolicy.class.getName()); + public static final String owningBucketStates = "uim"; + private static final String upStates = "ui"; + + /** This class merely generates a slobrok host pattern for a given distributor. */ + static class SlobrokHostPatternGenerator { + + private final String base; + + SlobrokHostPatternGenerator(String clusterName) { + this.base = "storage/cluster." + clusterName + "/distributor/"; + } + + /** + * Find host pattern of the hosts that are valid targets for this request. + * + * @param distributor Set to null if any distributor is valid target. + */ + String getDistributorHostPattern(Integer distributor) { + return base + (distributor == null ? "*" : distributor) + "/default"; + } + + } + + /** Helper class to match a host pattern with node to use. */ + public abstract static class HostFetcher { + + private static class Targets { + private final List<Integer> list; + private final int total; + Targets() { + this(Collections.emptyList(), 1); + } + Targets(List<Integer> list, int total) { + this.list = list; + this.total = total; + } + } + + private final int requiredUpPercentageToSendToKnownGoodNodes; + private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets()); + protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. + + protected HostFetcher(int percent) { + requiredUpPercentageToSendToKnownGoodNodes = percent; + } + + void updateValidTargets(ClusterState state) { + List<Integer> validRandomTargets = new ArrayList<>(); + for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) { + if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); + } + validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); + } + public abstract String getTargetSpec(Integer distributor, RoutingContext context); + String getRandomTargetSpec(RoutingContext context) { + Targets targets = validTargets.get(); + // Try to use list of random targets, if at least X % of the nodes are up + while ((targets.total != 0) && + (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) + { + int randIndex = randomizer.nextInt(targets.list.size()); + String targetSpec = getTargetSpec(targets.list.get(randIndex), context); + if (targetSpec != null) { + context.trace(3, "Sending to random node seen up in cluster state"); + return targetSpec; + } + targets.list.remove(randIndex); + } + context.trace(3, "Too few nodes seen up in state. Sending totally random."); + return getTargetSpec(null, context); + } + public void close() {} + } + + /** Host fetcher using a slobrok mirror to find the hosts. */ + public static class SlobrokHostFetcher extends HostFetcher { + private final SlobrokHostPatternGenerator patternGenerator; + private final SlobrokPolicy policy; - public static class ContentParameters extends Parameters { + SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(percent); + this.patternGenerator = patternGenerator; + this.policy = policy; + } - public ContentParameters(Map<String, String> parameters) { - super(parameters); + private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) { + return policy.lookup(context, hostPattern); } + private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } + + public IMirror getMirror(RoutingContext context) { return context.getMirror(); } + @Override - public String getDistributionConfigId() { - if (distributionConfigId != null) { - return distributionConfigId; + public String getTargetSpec(Integer distributor, RoutingContext context) { + List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); + if (arr.isEmpty()) return null; + if (distributor != null) { + if (arr.size() == 1) { + return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); + } else { + log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); + } + } else { + return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); } - return clusterName; + return null; + } + } + + static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { + + /** + * Distributor index to resolved RPC spec cache for a single given Slobrok + * update generation. Uses a thread safe COW map which will grow until stable. + */ + private static class GenerationCache { + private final int generation; + private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>(); + + GenerationCache(int generation) { + this.generation = generation; + } + + public int generation() { return this.generation; } + + public String get(Integer index) { + return targets.get(index); + } + public void put(Integer index, String target) { + targets.put(index, target); + } + } + + private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); + + TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { + super(patternGenerator, policy, percent); } @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + GenerationCache cache = generationCache.get(); + int currentGeneration = getMirror(context).updates(); + // The below code might race with other threads during a generation change. That is OK, as the cache + // is thread safe and will quickly converge to a stable state for the new generation. + if (cache == null || currentGeneration != cache.generation()) { + cache = new GenerationCache(currentGeneration); + generationCache.set(cache); + } + if (distributor != null) { + return cachingGetTargetSpec(distributor, context, cache); + } + // Wildcard lookup case. Must not be cached. + return super.getTargetSpec(null, context); + } + + private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { + String cachedTarget = cache.get(distributor); + if (cachedTarget != null) { + return cachedTarget; + } + // Mirror _may_ be at a higher version if we race with generation read, but that is OK since + // we'll either way get the most up-to-date mapping and the cache will be invalidated on the + // next invocation. + String resolvedTarget = super.getTargetSpec(distributor, context); + cache.put(distributor, resolvedTarget); + return resolvedTarget; + } + + } + + /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ + public static class Parameters { + protected final String clusterName; + protected final String distributionConfigId; + protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; + + public Parameters(Map<String, String> params) { + clusterName = params.get("cluster"); + distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove + slobrokHostPatternGenerator = createPatternGenerator(); + if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); + } + + String getDistributionConfigId() { + return distributionConfigId == null ? clusterName : distributionConfigId; + } + public String getClusterName() { + return clusterName; + } public SlobrokHostPatternGenerator createPatternGenerator() { - return new SlobrokHostPatternGenerator(getClusterName()) { - public String getDistributorHostPattern(Integer distributor) { - return "storage/cluster." + getClusterName() + "/distributor/" + (distributor == null ? "*" : distributor) + "/default"; + return new SlobrokHostPatternGenerator(getClusterName()); + } + public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { + return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); + } + public Distribution createDistribution(SlobrokPolicy policy) { + return new Distribution(getDistributionConfigId()); + } + + /** + * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the + * failure was related to node being bad. (Hard to detect from failure) + */ + int getAttemptRandomOnFailuresLimit() { return 5; } + + /** + * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the + * old one. This guards us against version resets. + */ + int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } + + /** + * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. + * (To avoid hitting trashing bad nodes still in slobrok) + */ + int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } + } + + /** Helper class to get the bucket identifier of a message. */ + public static class BucketIdCalculator { + private static final BucketIdFactory factory = new BucketIdFactory(); + + private BucketId getBucketId(Message msg) { + switch (msg.getType()) { + case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); + case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); + case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); + case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); + case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); + case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); + default: + log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); + return null; + } + } + + BucketId handleBucketIdCalculation(RoutingContext context) { + BucketId id = getBucketId(context.getMessage()); + if (id == null || id.getRawId() == 0) { + Reply reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); + context.setReply(reply); + } + return id; + } + } + + /** Class handling the logic of picking a distributor */ + public static class DistributorSelectionLogic { + /** Class that tracks a failure of a given type per node. */ + static class InstabilityChecker { + private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>(); + private final int failureLimit; + + InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } + + boolean tooManyFailures(int nodeIndex) { + if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { + nodeFailures.set(nodeIndex, 0); + return true; + } else { + return false; + } + } + + void addFailure(Integer calculatedDistributor) { + while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); + nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); + } + } + /** Message context class. Contains data we want to inspect about a request at reply time. */ + private static class MessageContext { + final Integer calculatedDistributor; + final ClusterState usedState; + + MessageContext(ClusterState usedState) { + this(usedState, null); + } + MessageContext(ClusterState usedState, Integer calculatedDistributor) { + this.calculatedDistributor = calculatedDistributor; + this.usedState = usedState; + } + + public String toString() { + return "Context(Distributor " + calculatedDistributor + + ", state version " + usedState.getVersion() + ")"; + } + } + + private final HostFetcher hostFetcher; + private final Distribution distribution; + private final InstabilityChecker persistentFailureChecker; + private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null); + private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); + private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection + + DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { + try { + hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); + distribution = params.createDistribution(policy); + persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); + maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); + } catch (Throwable e) { + destroy(); + throw e; + } + } + + public void destroy() { + if (hostFetcher != null) { + hostFetcher.close(); + } + if (distribution != null) { + distribution.close(); + } + } + + String getTargetSpec(RoutingContext context, BucketId bucketId) { + String sendRandomReason = null; + ClusterState cachedClusterState = safeCachedClusterState.get(); + + if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. + try{ + Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); + // If we have had too many failures towards existing node, reset failure count and send to random + if (persistentFailureChecker.tooManyFailures(target)) { + sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; + target = null; + } + // If we have found a target, and the target exists in slobrok, send to it. + if (target != null) { + context.setContext(new MessageContext(cachedClusterState, target)); + String targetSpec = hostFetcher.getTargetSpec(target, context); + if (targetSpec != null) { + if (context.shouldTrace(1)) { + context.trace(1, "Using distributor " + target + " for " + + bucketId + " as our state version is " + cachedClusterState.getVersion()); + } + return targetSpec; + } else { + sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; + log.log(Level.FINE, "Target distributor is not in slobrok"); + } + } else { + context.setContext(new MessageContext(cachedClusterState)); + } + } catch (Distribution.TooFewBucketBitsInUseException e) { + Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); + reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(reply); + return null; + } catch (Distribution.NoDistributorsAvailableException e) { + log.log(Level.FINE, "No distributors available; clearing cluster state"); + safeCachedClusterState.set(null); + sendRandomReason = "No distributors available. Sending to random distributor."; + context.setContext(createRandomDistributorTargetContext()); + } + } else { + context.setContext(createRandomDistributorTargetContext()); + sendRandomReason = "No cluster state cached. Sending to random distributor."; + } + if (context.shouldTrace(1)) { + context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); + } + return hostFetcher.getRandomTargetSpec(context); + } + + private static MessageContext createRandomDistributorTargetContext() { + return new MessageContext(null); + } + + private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) { + try { + return Optional.of(new ClusterState(reply.getSystemState())); + } catch (Exception e) { + reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); + return Optional.empty(); + } + } + + void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { + final MessageContext context = (MessageContext) routingContext.getContext(); + final Optional<ClusterState> replyState = clusterStateFromReply(reply); + if (!replyState.isPresent()) { + return; + } + final ClusterState newState = replyState.get(); + resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); + markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); + + if (context.calculatedDistributor == null) { + traceReplyFromRandomDistributor(reply, newState); + } else { + traceReplyFromSpecificDistributor(reply, context, newState); + } + updateCachedRoutingStateFromWrongDistribution(context, newState); + } + + private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { + safeCachedClusterState.set(newState); + if (newState.getClusterState().equals(State.UP)) { + hostFetcher.updateValidTargets(newState); + } + } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { + safeCachedClusterState.set(null); + } else if (context.calculatedDistributor != null) { + persistentFailureChecker.addFailure(context.calculatedDistributor); + } + } + + private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState == null) { + String msg = "Used state must be set as distributor is calculated. Bug."; + reply.getTrace().trace(1, msg); + log.log(Level.SEVERE, msg); + } else if (newState.getVersion() == context.usedState.getVersion()) { + String msg = "Message sent to distributor " + context.calculatedDistributor + + " retrieved cluster state version " + newState.getVersion() + + " which was the state we used to calculate distributor as target last time."; + reply.getTrace().trace(1, msg); + // Client load can be rejected towards distributors even with a matching cluster state version. + // This usually happens during a node fail-over transition, where the target distributor will + // reject an operation bound to a particular bucket if it does not own the bucket in _both_ + // the current and the next (transition target) state. Since it can happen during normal operation + // and will happen per client operation, we keep this as debug level to prevent spamming the logs. + log.log(Level.FINE, msg); + } else if (newState.getVersion() > context.usedState.getVersion()) { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " updated cluster state from version " + context.usedState.getVersion() + + " to " + newState.getVersion()); + } + } else { + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + + " returned older cluster state version " + newState.getVersion()); + } + } + } + + private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { + if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { + oldClusterVersionGottenCount.set(0); + safeCachedClusterState.set(null); + } + } + } + + private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { + if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(-1); + } + } else { + if (reply.getRetryDelay() <= 0.0) { + reply.setRetryDelay(0); + } + } + } + + private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { + if (!reply.getTrace().shouldTrace(1)) { + return; + } + ClusterState cachedClusterState = safeCachedClusterState.get(); + if (cachedClusterState == null) { + reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); + } else if (newState.getVersion() == cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); + } else if (newState.getVersion() > cachedClusterState.getVersion()) { + reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); + } else { + reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); + } + } + + void handleErrorReply(Reply reply, Object untypedContext) { + MessageContext messageContext = (MessageContext) untypedContext; + if (messageContext.calculatedDistributor != null) { + persistentFailureChecker.addFailure(messageContext.calculatedDistributor); + if (reply.getTrace().shouldTrace(1)) { + reply.getTrace().trace(1, "Failed with " + messageContext.toString()); } - }; + } } } + private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); + private final DistributorSelectionLogic distributorSelectionLogic; + private final Parameters parameters; + + /** Constructor used in production. */ + public ContentPolicy(String param) { + this(parse(param)); + } + public ContentPolicy(Map<String, String> params) { - super(new ContentParameters(params)); + this(new Parameters(params)); } - public ContentPolicy(String parameters) { - this(parse(parameters)); + /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ + public ContentPolicy(Parameters p) { + super(); + parameters = p; + distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); } + @Override + public void select(RoutingContext context) { + if (context.shouldTrace(1)) { + context.trace(1, "Selecting route"); + } + + BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); + if (context.hasReply()) return; + + String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); + if (context.hasReply()) return; + if (targetSpec != null) { + Route route = new Route(context.getRoute()); + route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); + context.addChild(route); + } else { + context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "Could not resolve any distributors to send to in cluster " + parameters.clusterName); + } + } + + @Override + public void merge(RoutingContext context) { + RoutingNodeIterator it = context.getChildIterator(); + Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); + if (reply == null) { + reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, + "No reply in any children, nor in the routing context: " + context)); + } + + if (reply instanceof WrongDistributionReply) { + distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); + } else if (reply.hasErrors()) { + distributorSelectionLogic.handleErrorReply(reply, context.getContext()); + } else if (reply instanceof WriteDocumentReply) { + if (context.shouldTrace(9)) { + context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); + } + } + context.setReply(reply); + } + + @Override + public void destroy() { + distributorSelectionLogic.destroy(); + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java index ca32c5722e6..f5b4920fa3f 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java @@ -263,7 +263,7 @@ public class DocumentProtocol implements Protocol { putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg)); putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory()); putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory()); - putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory()); + putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory()); putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory()); // Prepare version specifications to use when adding routable factories. diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java index 6954d8f3a1d..7b44a1a4f0d 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java @@ -16,15 +16,6 @@ public abstract class RoutingPolicyFactories { } } - static class StoragePolicyFactory implements RoutingPolicyFactory { - public DocumentProtocolRoutingPolicy createPolicy(String param) { - return new StoragePolicy(param); - } - - public void destroy() { - } - } - static class ContentPolicyFactory implements RoutingPolicyFactory { public DocumentProtocolRoutingPolicy createPolicy(String param) { return new ContentPolicy(param); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java deleted file mode 100644 index b74f7431531..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ /dev/null @@ -1,615 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.concurrent.CopyOnWriteHashMap; -import com.yahoo.document.BucketId; -import com.yahoo.document.BucketIdFactory; -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.jrt.slobrok.api.Mirror; -import java.util.logging.Level; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.routing.Hop; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNodeIterator; -import com.yahoo.messagebus.routing.VerbatimDirective; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.state.ClusterState; -import com.yahoo.vdslib.state.Node; -import com.yahoo.vdslib.state.NodeType; -import com.yahoo.vdslib.state.State; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; - -/** - * Routing policy to determine which distributor in a storage cluster to send data to. - * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to. - * - * cluster=[clusterName] (Mandatory, determines the cluster name) - * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application) - * clusterconfigid=[id] (Optional, use given config id for distribution instead of default) - * - * @author Haakon Humberset - */ -public class StoragePolicy extends SlobrokPolicy { - - private static final Logger log = Logger.getLogger(StoragePolicy.class.getName()); - public static final String owningBucketStates = "uim"; - private static final String upStates = "ui"; - - /** This class merely generates slobrok a host pattern for a given distributor. */ - public static class SlobrokHostPatternGenerator { - private final String base; - private final String all; - SlobrokHostPatternGenerator(String clusterName) { - base = "storage/cluster." + clusterName + "/distributor/"; - all = base + "*/default"; - - } - - /** - * Find host pattern of the hosts that are valid targets for this request. - * @param distributor Set to -1 if any distributor is valid target. - */ - public String getDistributorHostPattern(Integer distributor) { - return (distributor == null) ? all : (base + distributor + "/default"); - } - } - - /** Helper class to match a host pattern with node to use. */ - public abstract static class HostFetcher { - - private static class Targets { - private final List<Integer> list; - private final int total; - Targets() { - this(Collections.emptyList(), 1); - } - Targets(List<Integer> list, int total) { - this.list = list; - this.total = total; - } - } - - private final int requiredUpPercentageToSendToKnownGoodNodes; - private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets()); - protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy. - - protected HostFetcher(int percent) { - requiredUpPercentageToSendToKnownGoodNodes = percent; - } - - void updateValidTargets(ClusterState state) { - List<Integer> validRandomTargets = new ArrayList<>(); - for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) { - if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i); - } - validTargets.set(new Targets(new CopyOnWriteArrayList<>(validRandomTargets), state.getNodeCount(NodeType.DISTRIBUTOR))); - } - public abstract String getTargetSpec(Integer distributor, RoutingContext context); - String getRandomTargetSpec(RoutingContext context) { - Targets targets = validTargets.get(); - // Try to use list of random targets, if at least X % of the nodes are up - while ((targets.total != 0) && - (100 * targets.list.size() / targets.total >= requiredUpPercentageToSendToKnownGoodNodes)) - { - int randIndex = randomizer.nextInt(targets.list.size()); - String targetSpec = getTargetSpec(targets.list.get(randIndex), context); - if (targetSpec != null) { - context.trace(3, "Sending to random node seen up in cluster state"); - return targetSpec; - } - targets.list.remove(randIndex); - } - context.trace(3, "Too few nodes seen up in state. Sending totally random."); - return getTargetSpec(null, context); - } - public void close() {} - } - - /** Host fetcher using a slobrok mirror to find the hosts. */ - public static class SlobrokHostFetcher extends HostFetcher { - private final SlobrokHostPatternGenerator patternGenerator; - private final SlobrokPolicy policy; - - SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(percent); - this.patternGenerator = patternGenerator; - this.policy = policy; - } - - private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) { - return policy.lookup(context, hostPattern); - } - - private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; } - - public IMirror getMirror(RoutingContext context) { return context.getMirror(); } - - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context); - if (arr.isEmpty()) return null; - if (distributor != null) { - if (arr.size() == 1) { - return convertSlobrokNameToSessionName(arr.get(0).getSpecString()); - } else { - log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor."); - } - } else { - return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString()); - } - return null; - } - } - - static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { - - /** - * Distributor index to resolved RPC spec cache for a single given Slobrok - * update generation. Uses a thread safe COW map which will grow until stable. - */ - private static class GenerationCache { - private final int generation; - private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>(); - - GenerationCache(int generation) { - this.generation = generation; - } - - public int generation() { return this.generation; } - - public String get(Integer index) { - return targets.get(index); - } - public void put(Integer index, String target) { - targets.put(index, target); - } - } - - private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); - - TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { - super(patternGenerator, policy, percent); - } - - @Override - public String getTargetSpec(Integer distributor, RoutingContext context) { - GenerationCache cache = generationCache.get(); - int currentGeneration = getMirror(context).updates(); - // The below code might race with other threads during a generation change. That is OK, as the cache - // is thread safe and will quickly converge to a stable state for the new generation. - if (cache == null || currentGeneration != cache.generation()) { - cache = new GenerationCache(currentGeneration); - generationCache.set(cache); - } - if (distributor != null) { - return cachingGetTargetSpec(distributor, context, cache); - } - // Wildcard lookup case. Must not be cached. - return super.getTargetSpec(null, context); - } - - private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { - String cachedTarget = cache.get(distributor); - if (cachedTarget != null) { - return cachedTarget; - } - // Mirror _may_ be at a higher version if we race with generation read, but that is OK since - // we'll either way get the most up-to-date mapping and the cache will be invalidated on the - // next invocation. - String resolvedTarget = super.getTargetSpec(distributor, context); - cache.put(distributor, resolvedTarget); - return resolvedTarget; - } - - } - - /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ - public static class Parameters { - protected final String clusterName; - protected final String distributionConfigId; - protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator; - - public Parameters(Map<String, String> params) { - clusterName = params.get("cluster"); - distributionConfigId = params.get("clusterconfigid"); - slobrokHostPatternGenerator = createPatternGenerator(); - if (clusterName == null) throw new IllegalArgumentException("Required parameter cluster with clustername not set"); - } - - String getDistributionConfigId() { - return (distributionConfigId == null ? "storage/cluster." + clusterName : distributionConfigId); - } - public String getClusterName() { - return clusterName; - } - public SlobrokHostPatternGenerator createPatternGenerator() { - return new SlobrokHostPatternGenerator(getClusterName()); - } - public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { - return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); - } - public Distribution createDistribution(SlobrokPolicy policy) { - return new Distribution(getDistributionConfigId()); - } - - /** - * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the - * failure was related to node being bad. (Hard to detect from failure) - */ - int getAttemptRandomOnFailuresLimit() { return 5; } - - /** - * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the - * old one. This guards us against version resets. - */ - int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; } - - /** - * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random. - * (To avoid hitting trashing bad nodes still in slobrok) - */ - int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; } - } - - /** Helper class to get the bucket identifier of a message. */ - public static class BucketIdCalculator { - private static final BucketIdFactory factory = new BucketIdFactory(); - - private BucketId getBucketId(Message msg) { - switch (msg.getType()) { - case DocumentProtocol.MESSAGE_PUTDOCUMENT: return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId()); - case DocumentProtocol.MESSAGE_GETDOCUMENT: return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId()); - case DocumentProtocol.MESSAGE_GETBUCKETLIST: return ((GetBucketListMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_STATBUCKET: return ((StatBucketMessage)msg).getBucketId(); - case DocumentProtocol.MESSAGE_CREATEVISITOR: return ((CreateVisitorMessage)msg).getBuckets().get(0); - case DocumentProtocol.MESSAGE_REMOVELOCATION: return ((RemoveLocationMessage)msg).getBucketId(); - default: - log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported."); - return null; - } - } - - BucketId handleBucketIdCalculation(RoutingContext context) { - BucketId id = getBucketId(context.getMessage()); - if (id == null || id.getRawId() == 0) { - Reply reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message.")); - context.setReply(reply); - } - return id; - } - } - - /** Class handling the logic of picking a distributor */ - public static class DistributorSelectionLogic { - /** Class that tracks a failure of a given type per node. */ - static class InstabilityChecker { - private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>(); - private final int failureLimit; - - InstabilityChecker(int failureLimit) { this.failureLimit = failureLimit; } - - boolean tooManyFailures(int nodeIndex) { - if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) { - nodeFailures.set(nodeIndex, 0); - return true; - } else { - return false; - } - } - - void addFailure(Integer calculatedDistributor) { - while (nodeFailures.size() <= calculatedDistributor) nodeFailures.add(0); - nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1); - } - } - /** Message context class. Contains data we want to inspect about a request at reply time. */ - private static class MessageContext { - final Integer calculatedDistributor; - final ClusterState usedState; - - MessageContext() { - this(null, null); - } - MessageContext(ClusterState usedState) { - this(usedState, null); - } - MessageContext(ClusterState usedState, Integer calculatedDistributor) { - this.calculatedDistributor = calculatedDistributor; - this.usedState = usedState; - } - - public String toString() { - return "Context(Distributor " + calculatedDistributor + - ", state version " + usedState.getVersion() + ")"; - } - } - - private final HostFetcher hostFetcher; - private final Distribution distribution; - private final InstabilityChecker persistentFailureChecker; - private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null); - private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); - private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection - - DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { - try { - hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); - distribution = params.createDistribution(policy); - persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); - maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState(); - } catch (Throwable e) { - destroy(); - throw e; - } - } - - public void destroy() { - if (hostFetcher != null) { - hostFetcher.close(); - } - if (distribution != null) { - distribution.close(); - } - } - - String getTargetSpec(RoutingContext context, BucketId bucketId) { - String sendRandomReason = null; - ClusterState cachedClusterState = safeCachedClusterState.get(); - - if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node. - try{ - Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates); - // If we have had too many failures towards existing node, reset failure count and send to random - if (persistentFailureChecker.tooManyFailures(target)) { - sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state."; - target = null; - } - // If we have found a target, and the target exists in slobrok, send to it. - if (target != null) { - context.setContext(new MessageContext(cachedClusterState, target)); - String targetSpec = hostFetcher.getTargetSpec(target, context); - if (targetSpec != null) { - if (context.shouldTrace(1)) { - context.trace(1, "Using distributor " + target + " for " + - bucketId + " as our state version is " + cachedClusterState.getVersion()); - } - return targetSpec; - } else { - sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random."; - log.log(Level.FINE, "Target distributor is not in slobrok"); - } - } else { - context.setContext(new MessageContext(cachedClusterState)); - } - } catch (Distribution.TooFewBucketBitsInUseException e) { - Reply reply = new WrongDistributionReply(cachedClusterState.toString(true)); - reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(reply); - return null; - } catch (Distribution.NoDistributorsAvailableException e) { - log.log(Level.FINE, "No distributors available; clearing cluster state"); - safeCachedClusterState.set(null); - sendRandomReason = "No distributors available. Sending to random distributor."; - context.setContext(createRandomDistributorTargetContext()); - } - } else { - context.setContext(createRandomDistributorTargetContext()); - sendRandomReason = "No cluster state cached. Sending to random distributor."; - } - if (context.shouldTrace(1)) { - context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason"); - } - return hostFetcher.getRandomTargetSpec(context); - } - - private static MessageContext createRandomDistributorTargetContext() { - return new MessageContext(null); - } - - private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) { - try { - return Optional.of(new ClusterState(reply.getSystemState())); - } catch (Exception e) { - reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState()); - return Optional.empty(); - } - } - - void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) { - final MessageContext context = (MessageContext) routingContext.getContext(); - final Optional<ClusterState> replyState = clusterStateFromReply(reply); - if (!replyState.isPresent()) { - return; - } - final ClusterState newState = replyState.get(); - resetCachedStateIfClusterStateVersionLikelyRolledBack(newState); - markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState); - - if (context.calculatedDistributor == null) { - traceReplyFromRandomDistributor(reply, newState); - } else { - traceReplyFromSpecificDistributor(reply, context, newState); - } - updateCachedRoutingStateFromWrongDistribution(context, newState); - } - - private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) { - safeCachedClusterState.set(newState); - if (newState.getClusterState().equals(State.UP)) { - hostFetcher.updateValidTargets(newState); - } - } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) { - safeCachedClusterState.set(null); - } else if (context.calculatedDistributor != null) { - persistentFailureChecker.addFailure(context.calculatedDistributor); - } - } - - private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState == null) { - String msg = "Used state must be set as distributor is calculated. Bug."; - reply.getTrace().trace(1, msg); - log.log(Level.SEVERE, msg); - } else if (newState.getVersion() == context.usedState.getVersion()) { - String msg = "Message sent to distributor " + context.calculatedDistributor + - " retrieved cluster state version " + newState.getVersion() + - " which was the state we used to calculate distributor as target last time."; - reply.getTrace().trace(1, msg); - // Client load can be rejected towards distributors even with a matching cluster state version. - // This usually happens during a node fail-over transition, where the target distributor will - // reject an operation bound to a particular bucket if it does not own the bucket in _both_ - // the current and the next (transition target) state. Since it can happen during normal operation - // and will happen per client operation, we keep this as debug level to prevent spamming the logs. - log.log(Level.FINE, msg); - } else if (newState.getVersion() > context.usedState.getVersion()) { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " updated cluster state from version " + context.usedState.getVersion() + - " to " + newState.getVersion()); - } - } else { - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor + - " returned older cluster state version " + newState.getVersion()); - } - } - } - - private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) { - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) { - if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) { - oldClusterVersionGottenCount.set(0); - safeCachedClusterState.set(null); - } - } - } - - private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) { - if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(-1); - } - } else { - if (reply.getRetryDelay() <= 0.0) { - reply.setRetryDelay(0); - } - } - } - - private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) { - if (!reply.getTrace().shouldTrace(1)) { - return; - } - ClusterState cachedClusterState = safeCachedClusterState.get(); - if (cachedClusterState == null) { - reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion()); - } else if (newState.getVersion() == cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct."); - } else if (newState.getVersion() > cachedClusterState.getVersion()) { - reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion()); - } else { - reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion()); - } - } - - void handleErrorReply(Reply reply, Object untypedContext) { - MessageContext messageContext = (MessageContext) untypedContext; - if (messageContext.calculatedDistributor != null) { - persistentFailureChecker.addFailure(messageContext.calculatedDistributor); - if (reply.getTrace().shouldTrace(1)) { - reply.getTrace().trace(1, "Failed with " + messageContext.toString()); - } - } - } - } - - private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); - private final DistributorSelectionLogic distributorSelectionLogic; - private final Parameters parameters; - - /** Constructor used in production. */ - public StoragePolicy(String param) { - this(parse(param)); - } - - public StoragePolicy(Map<String, String> params) { - this(new Parameters(params)); - } - - /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ - public StoragePolicy(Parameters p) { - super(); - parameters = p; - distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); - } - - @Override - public void select(RoutingContext context) { - if (context.shouldTrace(1)) { - context.trace(1, "Selecting route"); - } - - BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); - if (context.hasReply()) return; - - String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); - if (context.hasReply()) return; - if (targetSpec != null) { - Route route = new Route(context.getRoute()); - route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec))); - context.addChild(route); - } else { - context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "Could not resolve any distributors to send to in cluster " + parameters.clusterName); - } - } - - @Override - public void merge(RoutingContext context) { - RoutingNodeIterator it = context.getChildIterator(); - Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply(); - if (reply == null) { - reply = new EmptyReply(); - reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "No reply in any children, nor in the routing context: " + context)); - } - - if (reply instanceof WrongDistributionReply) { - distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); - } else if (reply.hasErrors()) { - distributorSelectionLogic.handleErrorReply(reply, context.getContext()); - } else if (reply instanceof WriteDocumentReply) { - if (context.shouldTrace(9)) { - context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); - } - } - context.setReply(reply); - } - - @Override - public void destroy() { - distributorSelectionLogic.destroy(); - } -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java index cdc5878321a..52afcdfd77c 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -49,8 +49,8 @@ public class TargetCachingSlobrokHostFetcherTest { static class Fixture { SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class); IMirror mockMirror = mock(IMirror.class); - StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); - StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); + ContentPolicy.SlobrokHostPatternGenerator patternGenerator = new ContentPolicy.SlobrokHostPatternGenerator("foo"); + ContentPolicy.TargetCachingSlobrokHostFetcher hostFetcher = new ContentPolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); RoutingContext routingContext = mock(RoutingContext.class); Fixture() { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java index 8a6df061430..018697c0719 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/BasicTests.java @@ -15,7 +15,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -public class BasicTests extends StoragePolicyTestEnvironment { +public class BasicTests extends ContentPolicyTestEnvironment { /** Test that we can send a message through the policy. */ @Test diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java index b0cea8ee819..f324245b612 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTest.java @@ -4,7 +4,7 @@ package com.yahoo.documentapi.messagebus.protocol.test.storagepolicy; import org.junit.Ignore; import org.junit.Test; -public class StoragePolicyTest extends Simulator { +public class ContentPolicyTest extends Simulator { /** * Verify that a resent message with failures doesn't ruin overall performance. (By dumping the cached state too often * so other requests are sent to wrong target) diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java index 00a045367bb..479e0b0f422 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/ContentPolicyTestEnvironment.java @@ -10,7 +10,7 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy; import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory; -import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; +import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; import com.yahoo.documentapi.messagebus.protocol.test.PolicyTestFrame; import com.yahoo.messagebus.EmptyReply; @@ -36,7 +36,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public abstract class StoragePolicyTestEnvironment { +public abstract class ContentPolicyTestEnvironment { protected StoragePolicyTestFactory policyFactory; protected PolicyTestFrame frame; @@ -102,7 +102,7 @@ public abstract class StoragePolicyTestEnvironment { assertTrue(nodes.remove(second)); } - public static class TestHostFetcher extends StoragePolicy.HostFetcher { + public static class TestHostFetcher extends ContentPolicy.HostFetcher { private final String clusterName; private RandomGen randomizer = new RandomGen(1234); private final Set<Integer> nodes; @@ -143,7 +143,7 @@ public abstract class StoragePolicyTestEnvironment { } } - public static class TestParameters extends StoragePolicy.Parameters { + public static class TestParameters extends ContentPolicy.Parameters { private final TestHostFetcher hostFetcher; private final Distribution distribution; @@ -154,7 +154,7 @@ public abstract class StoragePolicyTestEnvironment { } @Override - public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } + public ContentPolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } @Override public Distribution createDistribution(SlobrokPolicy policy) { return distribution; } @@ -171,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment { public DocumentProtocolRoutingPolicy createPolicy(String parameters) { parameterInstances.addLast(new TestParameters(parameters, nodes)); ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); - return new StoragePolicy(parameterInstances.getLast()); + return new ContentPolicy(parameterInstances.getLast()); } public void avoidPickingAtRandom(Integer distributor) { avoidPickingAtRandom = distributor; diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java index 68405b002c8..d23dd9ea998 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/Simulator.java @@ -5,7 +5,7 @@ import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.document.DocumentId; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; +import com.yahoo.documentapi.messagebus.protocol.ContentPolicy; import com.yahoo.messagebus.routing.RoutingNode; import com.yahoo.vdslib.distribution.RandomGen; import com.yahoo.vdslib.state.ClusterState; @@ -21,7 +21,7 @@ import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public abstract class Simulator extends StoragePolicyTestEnvironment { +public abstract class Simulator extends ContentPolicyTestEnvironment { enum FailureType { TRANSIENT_ERROR, @@ -175,7 +175,7 @@ public abstract class Simulator extends StoragePolicyTestEnvironment { for (int i=0; i<params.getParallellRequests(); ++i) { RoutingNode target = targets[i]; int index = getAddress(target).getSecond(); - if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(StoragePolicy.owningBucketStates)) { + if (!params.getCurrentClusterState(null).getNodeState(new Node(NodeType.DISTRIBUTOR, index)).getState().oneOf(ContentPolicy.owningBucketStates)) { ++downnode[half]; } BadNode badNode = params.getBadNodes().get(index); diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 0c659f589d6..02bd6b297d0 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -4,13 +4,13 @@ #include <vespa/documentapi/documentapi.h> #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> #include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/routing/routingnode.h> @@ -51,7 +51,7 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); - StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, + ContentPolicy &setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); @@ -83,10 +83,10 @@ public: void requireThatExternPolicyWithUnknownPatternSelectsNone(); void requireThatExternPolicySelectsFromExternSlobrok(); void requireThatExternPolicyMergesOneReplyAsProtocol(); - void requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); - void requireThatStoragePolicyIsRandomWithoutState(); - void requireThatStoragePolicyIsTargetedWithState(); - void requireThatStoragePolicyCombinesSystemAndSlobrokState(); + void requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); + void requireThatContentPolicyIsRandomWithoutState(); + void requireThatContentPolicyIsTargetedWithState(); + void requireThatContentPolicyCombinesSystemAndSlobrokState(); }; TEST_APPHOOK(Test); @@ -128,10 +128,10 @@ Test::Main() { requireThatExternPolicySelectsFromExternSlobrok(); TEST_FLUSH(); requireThatExternPolicyMergesOneReplyAsProtocol(); TEST_FLUSH(); - requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); - requireThatStoragePolicyIsRandomWithoutState(); TEST_FLUSH(); - requireThatStoragePolicyIsTargetedWithState(); TEST_FLUSH(); - requireThatStoragePolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); + requireThatContentPolicyWithIllegalParamIsAnErrorPolicy(); TEST_FLUSH(); + requireThatContentPolicyIsRandomWithoutState(); TEST_FLUSH(); + requireThatContentPolicyIsTargetedWithState(); TEST_FLUSH(); + requireThatContentPolicyCombinesSystemAndSlobrokState(); TEST_FLUSH(); TEST_DONE(); } @@ -782,15 +782,15 @@ void Test::testLoadBalancer() { } void -Test::requireThatStoragePolicyWithIllegalParamIsAnErrorPolicy() +Test::requireThatContentPolicyWithIllegalParamIsAnErrorPolicy() { - EXPECT_TRUE(isErrorPolicy("Storage", "")); - EXPECT_TRUE(isErrorPolicy("Storage", "config=foo;slobroks=foo")); - EXPECT_TRUE(isErrorPolicy("Storage", "slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "")); + EXPECT_TRUE(isErrorPolicy("Content", "config=foo;slobroks=foo")); + EXPECT_TRUE(isErrorPolicy("Content", "slobroks=foo")); } void -Test::requireThatStoragePolicyIsRandomWithoutState() +Test::requireThatContentPolicyIsRandomWithoutState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -808,7 +808,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -826,15 +826,15 @@ Test::requireThatStoragePolicyIsRandomWithoutState() } } -StoragePolicy & -Test::setupStoragePolicy(TestFrame &frame, const string ¶m, +ContentPolicy & +Test::setupContentPolicy(TestFrame &frame, const string ¶m, const string &pattern, int32_t numEntries) { - frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Storage:%s]", param.c_str()))); + frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Content:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0)); - StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + ContentPolicy &policy = static_cast<ContentPolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); @@ -845,7 +845,7 @@ Test::setupStoragePolicy(TestFrame &frame, const string ¶m, } void -Test::requireThatStoragePolicyIsTargetedWithState() +Test::requireThatContentPolicyIsTargetedWithState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -863,7 +863,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); ASSERT_TRUE(policy.getSystemState() == nullptr); @@ -888,7 +888,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() } void -Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() +Test::requireThatContentPolicyCombinesSystemAndSlobrokState() { TestFrame frame(_repo); frame.setMessage(newPutDocumentMessage("id:ns:testdoc::")); @@ -902,7 +902,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() string param = vespalib::make_string( "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); - StoragePolicy &policy = setupStoragePolicy( + ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); ASSERT_TRUE(policy.getSystemState() == nullptr); diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 560f2f28f0e..0f86eb38aca 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -32,14 +32,14 @@ DocumentProtocol::DocumentProtocol(std::shared_ptr<const DocumentTypeRepo> repo, // When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now. putRoutingPolicyFactory("AND", std::make_shared<RoutingPolicyFactories::AndPolicyFactory>()); putRoutingPolicyFactory("Content", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); - putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); + putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::ContentPolicyFactory>()); // TODO Vespa 8: remove putRoutingPolicyFactory("DocumentRouteSelector", std::make_shared<RoutingPolicyFactories::DocumentRouteSelectorPolicyFactory>(*_repo, cfg)); putRoutingPolicyFactory("Extern", std::make_shared<RoutingPolicyFactories::ExternPolicyFactory>()); + putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); putRoutingPolicyFactory("LocalService", std::make_shared<RoutingPolicyFactories::LocalServicePolicyFactory>()); + putRoutingPolicyFactory("MessageType", std::make_shared<RoutingPolicyFactories::MessageTypePolicyFactory>()); putRoutingPolicyFactory("RoundRobin", std::make_shared<RoutingPolicyFactories::RoundRobinPolicyFactory>()); - putRoutingPolicyFactory("Storage", std::make_shared<RoutingPolicyFactories::StoragePolicyFactory>()); putRoutingPolicyFactory("SubsetService", std::make_shared<RoutingPolicyFactories::SubsetServicePolicyFactory>()); - putRoutingPolicyFactory("LoadBalancer", std::make_shared<RoutingPolicyFactories::LoadBalancerPolicyFactory>()); // Prepare version specifications to use when adding routable factories. vespalib::VersionSpecification version6(6, 221); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 26d51e702e9..83e1df02a24 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp externslobrokpolicy.cpp - storagepolicy.cpp contentpolicy.cpp messagetypepolicy.cpp documentrouteselectorpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index aea393a60af..7150794653f 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -1,12 +1,79 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "contentpolicy.h" +#include <vespa/document/base/documentid.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/error.h> +#include <vespa/documentapi/documentapi.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/config-stor-distribution.h> +#include <vespa/config/subscription/configuri.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".contentpolicy"); + +using vespalib::make_string; namespace documentapi { ContentPolicy::ContentPolicy(const string& param) - : StoragePolicy(param) -{ } + : ExternSlobrokPolicy(parse(param)), + _bucketIdFactory() +{ + std::map<string, string> params(parse(param)); + + if (params.find("cluster") != params.end()) { + _clusterName = params.find("cluster")->second; + } else { + _error = "Required parameter clustername not set"; + } + + if (params.find("clusterconfigid") != params.end()) { + _clusterConfigId = params.find("clusterconfigid")->second; + } +} + +namespace { + class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> + { + public: + CallBack(ContentPolicy & policy) : _policy(policy) { } + void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { + _policy.configure(std::move(config)); + } + private: + ContentPolicy & _policy; + }; +} +string ContentPolicy::init() +{ + string error = ExternSlobrokPolicy::init(); + if (!error.empty()) { + return error; + } + + if (_clusterConfigId.empty()) { + _clusterConfigId = createConfigId(_clusterName); + } + + using storage::lib::Distribution; + config::ConfigUri uri(_clusterConfigId); + if (!_configSources.empty()) { + _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + } else { + _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + } + _callBack = std::make_unique<CallBack>(*this); + _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); + _configFetcher->start(); + return ""; +} + +ContentPolicy::~ContentPolicy() = default; string ContentPolicy::createConfigId(const string & clusterName) const @@ -14,4 +81,177 @@ ContentPolicy::createConfigId(const string & clusterName) const return clusterName; } +string +ContentPolicy::createPattern(const string & clusterName, int distributor) const +{ + vespalib::asciistream ost; + + ost << "storage/cluster." << clusterName << "/distributor/"; + + if (distributor == -1) { + ost << '*'; + } else { + ost << distributor; + } + ost << "/default"; + return ost.str(); +} + +void +ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) +{ + try { + _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); + } catch (const std::exception& e) { + LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); + throw e; + } +} + +void +ContentPolicy::doSelect(mbus::RoutingContext &context) +{ + const mbus::Message &msg = context.getMessage(); + + int distributor = -1; + + if (_state.get()) { + document::BucketId id; + switch(msg.getType()) { + case DocumentProtocol::MESSAGE_PUTDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); + break; + + case DocumentProtocol::MESSAGE_GETDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); + break; + + case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: + id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); + break; + + case DocumentProtocol::MESSAGE_STATBUCKET: + id = static_cast<const StatBucketMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_GETBUCKETLIST: + id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); + break; + + case DocumentProtocol::MESSAGE_CREATEVISITOR: + id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; + break; + + case DocumentProtocol::MESSAGE_REMOVELOCATION: + id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); + break; + + default: + LOG(error, "Message type '%d' not supported.", msg.getType()); + return; + } + + // _P_A_R_A_N_O_I_A_ + if (id.getRawId() == 0) { + mbus::Reply::UP reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, + "No bucket id available in message.")); + context.setReply(std::move(reply)); + return; + } + + // Pick a distributor using ideal state algorithm + try { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { + _distribution = std::move(_nextDistribution); + _nextDistribution.reset(); + } + assert(_distribution.get()); + distributor = _distribution->getIdealDistributorNode(*_state, id); + } catch (storage::lib::TooFewBucketBitsInUseException& e) { + auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); + reply->addError(mbus::Error( + DocumentProtocol::ERROR_WRONG_DISTRIBUTION, + "Too few distribution bits used for given cluster state")); + context.setReply(std::move(reply)); + return; + } catch (storage::lib::NoDistributorsAvailableException& e) { + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target + _state.reset(); + distributor = -1; + } + } + + mbus::Hop hop = getRecipient(context, distributor); + + if (distributor != -1 && !hop.hasDirectives()) { + hop = getRecipient(context, -1); + } + + if (hop.hasDirectives()) { + mbus::Route route = context.getRoute(); + route.setHop(0, hop); + context.addChild(route); + } else { + context.setError( + mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, + make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); + } +} + +mbus::Hop +ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) +{ + slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); + + if (!entries.empty()) { + return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); + } + + return mbus::Hop(); +} + +void +ContentPolicy::merge(mbus::RoutingContext &context) +{ + mbus::RoutingNodeIterator it = context.getChildIterator(); + mbus::Reply::UP reply = it.removeReply(); + + if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { + updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); + } else if (reply->hasErrors()) { + _state.reset(); + } + + context.setReply(std::move(reply)); +} + +void +ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) +{ + std::unique_ptr<storage::lib::ClusterState> newState( + new storage::lib::ClusterState(wdr.getSystemState())); + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { + wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", + _state->getVersion(), newState->getVersion())); + } else { + wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); + } + + _state = std::move(newState); + } else { + wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " + "while old state had version %d. New states should not have a lower version than the old.", + newState->getVersion(), _state->getVersion())); + _state.reset(); + } +} + } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index 4b2f356c740..e29fbb75524 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -1,17 +1,62 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "storagepolicy.h" +#include "externslobrokpolicy.h" +#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/document/bucket/bucketidfactory.h> +#include <vespa/messagebus/routing/hop.h> +#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/config/helper/configfetcher.h> + +namespace config { + class ICallback; + class ConfigFetcher; +} + +namespace storage { +namespace lib { + class Distribution; + class ClusterState; +} +} namespace documentapi { -class ContentPolicy : public StoragePolicy +class ContentPolicy : public ExternSlobrokPolicy { +private: + document::BucketIdFactory _bucketIdFactory; + std::unique_ptr<storage::lib::ClusterState> _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr<config::ICallback> _callBack; + std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::unique_ptr<storage::lib::Distribution> _distribution; + std::unique_ptr<storage::lib::Distribution> _nextDistribution; + + mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + public: ContentPolicy(const string& param); + ~ContentPolicy(); + void doSelect(mbus::RoutingContext &context) override; + void merge(mbus::RoutingContext &context) override; + + void updateStateFromReply(WrongDistributionReply& reply); + + /** + * @return a pointer to the system state registered with this policy. If + * we haven't received a system state yet, returns NULL. + */ + const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + + virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); + string init() override; + private: - string createConfigId(const string & clusterName) const override; + string createConfigId(const string & clusterName) const; + string createPattern(const string & clusterName, int distributor) const; }; } - diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp deleted file mode 100644 index 3fc1df0352a..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "storagepolicy.h" -#include <vespa/document/base/documentid.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/error.h> -#include <vespa/documentapi/documentapi.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config-stor-distribution.h> -#include <vespa/config/subscription/configuri.h> -#include <cassert> - -#include <vespa/log/log.h> -LOG_SETUP(".storagepolicy"); - -using vespalib::make_string; - -namespace documentapi { - -StoragePolicy::StoragePolicy(const string& param) - : ExternSlobrokPolicy(parse(param)), - _bucketIdFactory() -{ - std::map<string, string> params(parse(param)); - - if (params.find("cluster") != params.end()) { - _clusterName = params.find("cluster")->second; - } else { - _error = "Required parameter clustername not set"; - } - - if (params.find("clusterconfigid") != params.end()) { - _clusterConfigId = params.find("clusterconfigid")->second; - } -} - -namespace { - class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> - { - public: - CallBack(StoragePolicy & policy) : _policy(policy) { } - void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { - _policy.configure(std::move(config)); - } - private: - StoragePolicy & _policy; - }; -} -string StoragePolicy::init() -{ - string error = ExternSlobrokPolicy::init(); - if (!error.empty()) { - return error; - } - - if (_clusterConfigId.empty()) { - _clusterConfigId = createConfigId(_clusterName); - } - - using storage::lib::Distribution; - config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); - } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); - } - _callBack = std::make_unique<CallBack>(*this); - _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); - _configFetcher->start(); - return ""; -} - -StoragePolicy::~StoragePolicy() = default; - -string -StoragePolicy::createConfigId(const string & clusterName) const -{ - return "storage/cluster." + clusterName; -} - -string -StoragePolicy::createPattern(const string & clusterName, int distributor) const -{ - vespalib::asciistream ost; - - ost << "storage/cluster." << clusterName << "/distributor/"; - - if (distributor == -1) { - ost << '*'; - } else { - ost << distributor; - } - ost << "/default"; - return ost.str(); -} - -void -StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) -{ - try { - _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); - } catch (const std::exception& e) { - LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); - throw e; - } -} - -void -StoragePolicy::doSelect(mbus::RoutingContext &context) -{ - const mbus::Message &msg = context.getMessage(); - - int distributor = -1; - - if (_state.get()) { - document::BucketId id; - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const PutDocumentMessage&>(msg).getDocument().getId()); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const GetDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const RemoveDocumentMessage&>(msg).getDocumentId()); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = _bucketIdFactory.getBucketId(static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId()); - break; - - case DocumentProtocol::MESSAGE_STATBUCKET: - id = static_cast<const StatBucketMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_GETBUCKETLIST: - id = static_cast<const GetBucketListMessage&>(msg).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_CREATEVISITOR: - id = static_cast<const CreateVisitorMessage&>(msg).getBuckets()[0]; - break; - - case DocumentProtocol::MESSAGE_REMOVELOCATION: - id = static_cast<const RemoveLocationMessage&>(msg).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - - // _P_A_R_A_N_O_I_A_ - if (id.getRawId() == 0) { - mbus::Reply::UP reply(new mbus::EmptyReply()); - reply->addError(mbus::Error(mbus::ErrorCode::APP_FATAL_ERROR, - "No bucket id available in message.")); - context.setReply(std::move(reply)); - return; - } - - // Pick a distributor using ideal state algorithm - try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); - } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); - reply->addError(mbus::Error( - DocumentProtocol::ERROR_WRONG_DISTRIBUTION, - "Too few distribution bits used for given cluster state")); - context.setReply(std::move(reply)); - return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target - _state.reset(); - distributor = -1; - } - } - - mbus::Hop hop = getRecipient(context, distributor); - - if (distributor != -1 && !hop.hasDirectives()) { - hop = getRecipient(context, -1); - } - - if (hop.hasDirectives()) { - mbus::Route route = context.getRoute(); - route.setHop(0, hop); - context.addChild(route); - } else { - context.setError( - mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, - make_string("Could not resolve a distributor to send to in cluster %s", _clusterName.c_str())); - } -} - -mbus::Hop -StoragePolicy::getRecipient(mbus::RoutingContext& context, int distributor) -{ - slobrok::api::IMirrorAPI::SpecList entries = lookup(context, createPattern(_clusterName, distributor)); - - if (!entries.empty()) { - return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); - } - - return mbus::Hop(); -} - -void -StoragePolicy::merge(mbus::RoutingContext &context) -{ - mbus::RoutingNodeIterator it = context.getChildIterator(); - mbus::Reply::UP reply = it.removeReply(); - - if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); - } else if (reply->hasErrors()) { - _state.reset(); - } - - context.setReply(std::move(reply)); -} - -void -StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) -{ - std::unique_ptr<storage::lib::ClusterState> newState( - new storage::lib::ClusterState(wdr.getSystemState())); - if (!_state || newState->getVersion() >= _state->getVersion()) { - if (_state) { - wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", - _state->getVersion(), newState->getVersion())); - } else { - wdr.getTrace().trace(1, make_string("System state set to version %u", newState->getVersion())); - } - - _state = std::move(newState); - } else { - wdr.getTrace().trace(1, make_string("System state cleared because system state returned had version %d, " - "while old state had version %d. New states should not have a lower version than the old.", - newState->getVersion(), _state->getVersion())); - _state.reset(); - } -} - -} // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h deleted file mode 100644 index 5cd2efcbbd3..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.h +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "externslobrokpolicy.h" -#include <vespa/documentapi/messagebus/messages/wrongdistributionreply.h> -#include <vespa/vdslib/distribution/distribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/messagebus/routing/hop.h> -#include <vespa/config/helper/ifetchercallback.h> -#include <vespa/config/helper/configfetcher.h> - -namespace config { - class ICallback; - class ConfigFetcher; -} - -namespace storage { -namespace lib { - class Distribution; - class ClusterState; -} -} - -namespace documentapi { - -class StoragePolicy : public ExternSlobrokPolicy -{ -private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr<storage::lib::ClusterState> _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr<config::ICallback> _callBack; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<storage::lib::Distribution> _distribution; - std::unique_ptr<storage::lib::Distribution> _nextDistribution; - - mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); - -public: - StoragePolicy(const string& param); - ~StoragePolicy(); - void doSelect(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; - - void updateStateFromReply(WrongDistributionReply& reply); - - /** - * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. - */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } - - virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config); - string init() override; - -private: - virtual string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; -}; - -} - diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 2c244c63046..f945fe8cd02 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -1,16 +1,15 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "routingpolicyfactories.h" #include <vespa/documentapi/messagebus/policies/andpolicy.h> +#include <vespa/documentapi/messagebus/policies/contentpolicy.h> #include <vespa/documentapi/messagebus/policies/documentrouteselectorpolicy.h> #include <vespa/documentapi/messagebus/policies/errorpolicy.h> #include <vespa/documentapi/messagebus/policies/externpolicy.h> +#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> +#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> -#include <vespa/documentapi/messagebus/policies/storagepolicy.h> -#include <vespa/documentapi/messagebus/policies/contentpolicy.h> -#include <vespa/documentapi/messagebus/policies/messagetypepolicy.h> -#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> using namespace documentapi; @@ -21,17 +20,6 @@ RoutingPolicyFactories::AndPolicyFactory::createPolicy(const string ¶m) cons } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::StoragePolicyFactory::createPolicy(const string ¶m) const -{ - mbus::IRoutingPolicy::UP ret(new StoragePolicy(param)); - string error = static_cast<StoragePolicy&>(*ret).getError(); - if (!error.empty()) { - ret.reset(new ErrorPolicy(error)); - } - return ret; -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::MessageTypePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new MessageTypePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index e2bf5119c58..533ad93e644 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -16,10 +16,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class StoragePolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class MessageTypePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index 1f5d60a630a..a4b5422d502 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler { } /** - * Conveniece method for acknowledging a message back to the sender. + * Convenience method for acknowledging a message for its sender. * * This is equivalent to: * <pre> @@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler { * Messages should be acknowledged when * <ul> * <li>this destination has safely and permanently applied the message, or - * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message + * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message. * </ul> * * @param msg The message to acknowledge back to the sender. @@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler { } /** - * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it - * used to reach the detination. + * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it + * used to reach the destination. * * @param reply The reply, created from the message this is a reply to. */ diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java index 4aa85c1ea87..475dea3d7ac 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java @@ -78,9 +78,9 @@ public final class Error { @Override public String toString() { String name = ErrorCode.getName(code); - return "[" + - (name != null ? name : code) + " @ " + - (service != null ? service : "localhost") + - "]: " + message; + return "[" + + name + " @ " + + (service != null ? service : "localhost") + + "]: " + message; } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index 88a6d5376b1..f3c6422fbfd 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -97,36 +97,25 @@ public class LocalNetwork implements Network { msg.setRetryEnabled(envelope.msg.getRetryEnabled()); msg.setRetry(envelope.msg.getRetry()); msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); - msg.pushHandler(new ReplyHandler() { - - @Override - public void handleReply(Reply reply) { - new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); - } - }); - owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress()) - .getSessionName()); + msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); + owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); } }); } private void receiveLater(ReplyEnvelope envelope) { byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); - executor.execute(new Runnable() { - - @Override - public void run() { - Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); - reply.setRetryDelay(envelope.reply.getRetryDelay()); - reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); - for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - Error error = envelope.reply.getError(i); - reply.addError(new Error(error.getCode(), - error.getMessage(), - error.getService() != null ? error.getService() : envelope.sender.hostId)); - } - owner.deliverReply(reply, envelope.parent.recipient); + executor.execute(() -> { + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + reply.setRetryDelay(envelope.reply.getRetryDelay()); + reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); + for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { + Error error = envelope.reply.getError(i); + reply.addError(new Error(error.getCode(), + error.getMessage(), + error.getService() != null ? error.getService() : envelope.sender.hostId)); } + owner.deliverReply(reply, envelope.parent.recipient); }); } @@ -137,23 +126,16 @@ public class LocalNetwork implements Network { return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode); } - @SuppressWarnings("unchecked") private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { - if (toDecode.length == 0) { - return clazz.cast(new EmptyReply()); - } - return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); + return clazz.cast(toDecode.length == 0 ? new EmptyReply() + : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); } @Override - public void sync() { - - } + public void sync() { } @Override - public void shutdown() { - - } + public void shutdown() { } @Override public String getConnectionSpec() { @@ -178,7 +160,7 @@ public class LocalNetwork implements Network { } void send() { - LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); + ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this); } } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancer.java index 8c9e54a2ae4..ee02beb168f 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancer.java @@ -13,6 +13,7 @@ import com.yahoo.vespa.hosted.provision.node.Agent; import java.time.Duration; import java.util.HashSet; +import java.util.Optional; import java.util.Set; /** @@ -46,8 +47,8 @@ public class SwitchRebalancer extends NodeMover<Move> { protected Move suggestedMove(Node node, Node fromHost, Node toHost, NodeList allNodes) { NodeList clusterNodes = clusterOf(node, allNodes); NodeList clusterHosts = allNodes.parentsOf(clusterNodes); - if (isBalanced(clusterNodes, clusterHosts)) return Move.empty(); - if (switchInUse(toHost, clusterHosts)) return Move.empty(); + if (onExclusiveSwitch(node, clusterHosts)) return Move.empty(); + if (!increasesExclusiveSwitches(clusterNodes, clusterHosts, toHost)) return Move.empty(); return new Move(node, fromHost, toHost); } @@ -65,29 +66,31 @@ public class SwitchRebalancer extends NodeMover<Move> { .cluster(cluster); } - /** Returns whether switch of host is already in use by given cluster */ - private boolean switchInUse(Node host, NodeList clusterHosts) { - if (host.switchHostname().isEmpty()) return false; - for (var clusterHost : clusterHosts) { - if (clusterHost.switchHostname().isEmpty()) continue; - if (clusterHost.switchHostname().get().equals(host.switchHostname().get())) return true; - } - return false; + /** Returns whether allocatedNode is on an exclusive switch */ + private boolean onExclusiveSwitch(Node allocatedNode, NodeList clusterHosts) { + Optional<String> allocatedSwitch = clusterHosts.parentOf(allocatedNode).flatMap(Node::switchHostname); + if (allocatedSwitch.isEmpty()) return true; + return clusterHosts.stream() + .flatMap(host -> host.switchHostname().stream()) + .filter(switchHostname -> switchHostname.equals(allocatedSwitch.get())) + .count() == 1; } - /** Returns whether given cluster nodes are balanced optimally on exclusive switches */ - private boolean isBalanced(NodeList clusterNodes, NodeList clusterHosts) { - Set<String> switches = new HashSet<>(); - int exclusiveSwitches = 0; + /** Returns whether allocating a node on toHost would increase the number of exclusive switches */ + private boolean increasesExclusiveSwitches(NodeList clusterNodes, NodeList clusterHosts, Node toHost) { + if (toHost.switchHostname().isEmpty()) return false; + Set<String> activeSwitches = new HashSet<>(); + int unknownSwitches = 0; for (var host : clusterHosts) { if (host.switchHostname().isEmpty()) { - exclusiveSwitches++; // Unknown switch counts as exclusive + unknownSwitches++; } else { - switches.add(host.switchHostname().get()); + activeSwitches.add(host.switchHostname().get()); } } - exclusiveSwitches += switches.size(); - return clusterNodes.size() <= exclusiveSwitches; + int exclusiveSwitches = unknownSwitches + activeSwitches.size(); + return clusterNodes.size() > exclusiveSwitches && + !activeSwitches.contains(toHost.switchHostname().get()); } } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancerTest.java index 3662aee474d..a44f566d380 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/maintenance/SwitchRebalancerTest.java @@ -21,11 +21,15 @@ import com.yahoo.vespa.hosted.provision.testutils.MockDeployer.ClusterContext; import org.junit.Test; import java.time.Duration; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; /** * @author mpolden @@ -78,14 +82,15 @@ public class SwitchRebalancerTest { rebalancer.maintain(); NodeList allNodes = tester.nodeRepository().list(); NodeList clusterNodes = allNodes.owner(app).cluster(cluster).state(Node.State.active); - assertEquals("Node is retired in " + cluster, 1, clusterNodes.retired().size()); + NodeList retired = clusterNodes.retired(); + assertEquals("Node is retired in " + cluster, 1, retired.size()); assertEquals("Cluster " + cluster + " allocates nodes on distinct switches", 2, tester.switchesOf(clusterNodes, allNodes).size()); // Retired node becomes inactive and makes zone stable try (var lock = tester.provisioner().lock(app)) { NestedTransaction removeTransaction = new NestedTransaction(); - tester.nodeRepository().deactivate(clusterNodes.retired().asList(), new ApplicationTransaction(lock, removeTransaction)); + tester.nodeRepository().deactivate(retired.asList(), new ApplicationTransaction(lock, removeTransaction)); removeTransaction.commit(); } } @@ -95,6 +100,57 @@ public class SwitchRebalancerTest { assertNoMoves(rebalancer, tester); } + @Test + public void rebalance_does_not_move_node_already_on_exclusive_switch() { + ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); + ClusterSpec spec = ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("c1")).vespaVersion("1").build(); + Capacity capacity = Capacity.from(new ClusterResources(4, 1, new NodeResources(4, 8, 50, 1))); + MockDeployer deployer = deployer(tester, capacity, spec); + SwitchRebalancer rebalancer = new SwitchRebalancer(tester.nodeRepository(), Duration.ofDays(1), new TestMetric(), deployer); + + // Provision initial hosts on two switches + NodeResources hostResources = new NodeResources(8, 16, 500, 10); + List<Node> hosts0 = tester.makeReadyNodes(4, hostResources, NodeType.host, 5); + hosts0.sort(Comparator.comparing(Node::hostname)); + tester.activateTenantHosts(); + String switch0 = "switch0"; + String switch1 = "switch1"; + tester.patchNode(hosts0.get(0), (host) -> host.withSwitchHostname(switch0)); + tester.patchNodes(hosts0.subList(1, hosts0.size()), (host) -> host.withSwitchHostname(switch1)); + + // Deploy application + deployer.deployFromLocalActive(app).get().activate(); + tester.assertSwitches(Set.of(switch0, switch1), app, spec.id()); + List<Node> nodesOnExclusiveSwitch = tester.activeNodesOn(switch0, app, spec.id()); + assertEquals(1, nodesOnExclusiveSwitch.size()); + assertEquals(3, tester.activeNodesOn(switch1, app, spec.id()).size()); + + // Another host becomes available on a new host + List<Node> hosts2 = tester.makeReadyNodes(1, hostResources, NodeType.host, 5); + tester.activateTenantHosts(); + String switch2 = "switch2"; + tester.patchNodes(hosts2, (host) -> host.withSwitchHostname(switch2)); + + // Rebalance + tester.clock().advance(SwitchRebalancer.waitTimeAfterPreviousDeployment); + rebalancer.maintain(); + NodeList activeNodes = tester.nodeRepository().list().owner(app).cluster(spec.id()).state(Node.State.active); + NodeList retired = activeNodes.retired(); + assertEquals("Node is retired", 1, retired.size()); + assertFalse("Retired node was not on exclusive switch", nodesOnExclusiveSwitch.contains(retired.first().get())); + tester.assertSwitches(Set.of(switch0, switch1, switch2), app, spec.id()); + // Retired node becomes inactive and makes zone stable + try (var lock = tester.provisioner().lock(app)) { + NestedTransaction removeTransaction = new NestedTransaction(); + tester.nodeRepository().deactivate(retired.asList(), new ApplicationTransaction(lock, removeTransaction)); + removeTransaction.commit(); + } + + // Next iteration does nothing + tester.clock().advance(SwitchRebalancer.waitTimeAfterPreviousDeployment); + assertNoMoves(rebalancer, tester); + } + private void assertNoMoves(SwitchRebalancer rebalancer, ProvisioningTester tester) { NodeList nodes0 = tester.nodeRepository().list(Node.State.active).owner(app); rebalancer.maintain(); @@ -103,13 +159,17 @@ public class SwitchRebalancerTest { assertEquals("No nodes are retired", List.of(), nodes1.retired().asList()); } - private static MockDeployer deployer(ProvisioningTester tester, ClusterSpec.Id cluster1, ClusterSpec.Id cluster2) { - NodeResources resources = new NodeResources(2, 4, 50, 1); - Capacity capacity = Capacity.from(new ClusterResources(2, 1, resources)); - ClusterSpec spec1 = ClusterSpec.request(ClusterSpec.Type.container, cluster1).vespaVersion("1").build(); - ClusterSpec spec2 = ClusterSpec.request(ClusterSpec.Type.content, cluster2).vespaVersion("1").build(); - List<ClusterContext> clusterContexts = List.of(new ClusterContext(app, spec1, capacity), - new ClusterContext(app, spec2, capacity)); + private static MockDeployer deployer(ProvisioningTester tester, ClusterSpec.Id containerCluster, ClusterSpec.Id contentCluster) { + return deployer(tester, + Capacity.from(new ClusterResources(2, 1, new NodeResources(4, 8, 50, 1))), + ClusterSpec.request(ClusterSpec.Type.container, containerCluster).vespaVersion("1").build(), + ClusterSpec.request(ClusterSpec.Type.content, contentCluster).vespaVersion("1").build()); + } + + private static MockDeployer deployer(ProvisioningTester tester, Capacity capacity, ClusterSpec first, ClusterSpec... rest) { + List<ClusterContext> clusterContexts = Stream.concat(Stream.of(first), Stream.of(rest)) + .map(spec -> new ClusterContext(app, spec, capacity)) + .collect(Collectors.toList()); ApplicationContext context = new ApplicationContext(app, clusterContexts); return new MockDeployer(tester.provisioner(), tester.clock(), Map.of(app, context)); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java index e5fd00005a4..8b2febf37b1 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningCompleteHostCalculatorTest.java @@ -51,7 +51,6 @@ public class DockerProvisioningCompleteHostCalculatorTest { 7, 1, 0.7, 4.6, 14.3, 1.0, app1, cluster1); - System.out.println("---------------------redeploying the same---------------------"); tester.activate(app1, cluster1, Capacity.from(new ClusterResources(7, 1, newMinResources), new ClusterResources(7, 1, newMaxResources))); tester.assertNodes("Redeploying the same ranges does not cause changes", @@ -88,12 +87,6 @@ public class DockerProvisioningCompleteHostCalculatorTest { } NodeResources realResourcesOf(NodeResources advertisedResources) { - var r = advertisedResources.withMemoryGb(advertisedResources.memoryGb() - - memoryOverhead(advertisedResourcesOf(hostFlavor).memoryGb(), advertisedResources, false)) - .withDiskGb(advertisedResources.diskGb() - - diskOverhead(advertisedResourcesOf(hostFlavor).diskGb(), advertisedResources, false)); - System.out.println(" real given " + advertisedResources + ": " + r); - System.out.println(" adv. given those: " + realToRequest(r, false)); return advertisedResources.withMemoryGb(advertisedResources.memoryGb() - memoryOverhead(advertisedResourcesOf(hostFlavor).memoryGb(), advertisedResources, false)) .withDiskGb(advertisedResources.diskGb() - diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java index b2529963a9f..91c9f7e50ac 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTester.java @@ -574,6 +574,16 @@ public class ProvisioningTester { assertEquals(expectedSwitches, switchesOf(activeNodes, allNodes)); } + public List<Node> activeNodesOn(String switchHostname, ApplicationId application, ClusterSpec.Id cluster) { + NodeList allNodes = nodeRepository.list(); + NodeList activeNodes = allNodes.state(Node.State.active).owner(application).cluster(cluster); + return activeNodes.stream().filter(node -> { + Optional<String> allocatedSwitchHostname = allNodes.parentOf(node).flatMap(Node::switchHostname); + return allocatedSwitchHostname.isPresent() && + allocatedSwitchHostname.get().equals(switchHostname); + }).collect(Collectors.toList()); + } + public Set<String> switchesOf(NodeList applicationNodes, NodeList allNodes) { assertTrue("All application nodes are children", applicationNodes.stream().allMatch(node -> node.parentHostname().isPresent())); Set<String> switches = new HashSet<>(); diff --git a/screwdriver.yaml b/screwdriver.yaml index 6b3795e6da6..f772c120e96 100644 --- a/screwdriver.yaml +++ b/screwdriver.yaml @@ -15,6 +15,7 @@ jobs: screwdriver.cd/timeout: 600 environment: + USER_SHELL_BIN: bash LOCAL_MVN_REPO: "/tmp/vespa/mvnrepo" VESPA_MAVEN_EXTRA_OPTS: "-Dmaven.repo.local=/tmp/vespa/mvnrepo -Dmaven.javadoc.skip=true -Dmaven.source.skip=true" CCACHE_TMP_DIR: "/tmp/ccache_tmp" diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 37174903697..fbbc7c366fd 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -23,9 +23,8 @@ DistributorNode::DistributorNode( StorageLink::UP communicationManager, std::unique_ptr<IStorageChainBuilder> storage_chain_builder) : StorageNode(configUri, context, generationFetcher, - std::unique_ptr<HostInfo>(new HostInfo()), - communicationManager.get() == 0 ? NORMAL - : SINGLE_THREADED_TEST_MODE), + std::make_unique<HostInfo>(), + !communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE), _threadPool(framework::TickingThreadPool::createDefault("distributor")), _context(context), _lastUniqueTimestampRequested(0), @@ -36,16 +35,9 @@ DistributorNode::DistributorNode( if (storage_chain_builder) { set_storage_chain_builder(std::move(storage_chain_builder)); } - try{ + try { initialize(); - } catch (const vespalib::NetworkSetupFailureException & e) { - LOG(warning, "Network failure: '%s'", e.what()); - throw; } catch (const vespalib::Exception & e) { - LOG(error, "Caught exception %s during startup. Calling destruct " - "functions in hopes of dying gracefully.", - e.getMessage().c_str()); - requestShutdown("Failed to initialize: " + e.getMessage()); shutdownDistributor(); throw; } diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 74052932853..79205b8b513 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -369,13 +369,13 @@ StorageNode::removeConfigSubscriptions() void StorageNode::shutdown() { - // Try to shut down in opposite order of initialize. Bear in mind that - // we might be shutting down after init exception causing only parts - // of the server to have initialize + // Try to shut down in opposite order of initialize. Bear in mind that + // we might be shutting down after init exception causing only parts + // of the server to have initialize LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!_attemptedStopped) { - LOG(warning, "Storage killed before requestShutdown() was called. No " - "reason has been given for why we're stopping."); + LOG(debug, "Storage killed before requestShutdown() was called. No " + "reason has been given for why we're stopping."); } // Remove the subscription to avoid more callbacks from config removeConfigSubscriptions(); @@ -401,12 +401,12 @@ StorageNode::shutdown() _context.getComponentRegister().getMetricManager().stop(); } - // Delete the status web server before the actual status providers, to - // ensure that web server does not query providers during shutdown + // Delete the status web server before the actual status providers, to + // ensure that web server does not query providers during shutdown _statusWebServer.reset(); - // For this to be safe, noone can touch the state updater after we start - // deleting the storage chain + // For this to be safe, no-one can touch the state updater after we start + // deleting the storage chain LOG(debug, "Removing state updater pointer as we're about to delete it."); if (_chain) { LOG(debug, "Deleting storage chain"); diff --git a/storageserver/src/apps/storaged/storage.cpp b/storageserver/src/apps/storaged/storage.cpp index 428067e3059..8fbeeb836da 100644 --- a/storageserver/src/apps/storaged/storage.cpp +++ b/storageserver/src/apps/storaged/storage.cpp @@ -187,7 +187,7 @@ int StorageApp::Main() // main loop - wait for termination signal while (!_process->getNode().attemptedStopped()) { if (_process->configUpdated()) { - LOG(debug, "Config updated. Progagating config updates"); + LOG(debug, "Config updated. Propagating config updates"); ResumeGuard guard(_process->getNode().pause()); _process->updateConfig(); } @@ -197,7 +197,7 @@ int StorageApp::Main() handleSignals(); } LOG(debug, "Server was attempted stopped, shutting down"); - // Create guard that will forcifully kill storage if destruction takes longer + // Create guard that will forcefully kill storage if destruction takes longer // time than given timeout. vespalib::ShutdownGuard shutdownGuard(getMaxShutDownTime()); LOG(debug, "Attempting proper shutdown"); diff --git a/travis/detect-what-to-build.sh b/travis/detect-what-to-build.sh index 12bf892d419..5fb18093037 100755 --- a/travis/detect-what-to-build.sh +++ b/travis/detect-what-to-build.sh @@ -10,16 +10,17 @@ if [[ $TRAVIS_PULL_REQUEST == false ]]; then return 0 fi -# Future use -#JSON=$(curl -sLf https://api.github.com/repos/$TRAVIS_REPO_SLUG/pulls/$TRAVIS_PULL_REQUEST) -#PR_TITLE=$(jq -re '.title' <<< "$JSON") +JSON=$(curl -sLf https://api.github.com/repos/$TRAVIS_REPO_SLUG/pulls/$TRAVIS_PULL_REQUEST) +PR_TITLE=$(jq -re '.title' <<< "$JSON") JSON=$(curl -sLf https://api.github.com/repos/$TRAVIS_REPO_SLUG/pulls/$TRAVIS_PULL_REQUEST/commits) COMMITS=$(jq -re '.[].sha' <<< "$JSON") FILES=$(for C in $COMMITS; do JSON=$(curl -sLf https://api.github.com/repos/$TRAVIS_REPO_SLUG/commits/$C); jq -re '.files[].filename' <<< "$JSON"; done) -if [[ -z $FILES ]]; then +if [[ $PR_TITLE =~ \[run-systemtest\] ]]; then + SHOULD_BUILD=systemtest +elif [[ -z $FILES ]]; then SHOULD_BUILD=all elif ! grep -v -E "(\.h|\.hh|\.hxx|\.c|\.cpp|\.cxx)$" <<< "$FILES" &> /dev/null; then SHOULD_BUILD=cpp diff --git a/travis/travis-build.sh b/travis/travis-build.sh index 47d1496b0cf..23f4fbb0772 100755 --- a/travis/travis-build.sh +++ b/travis/travis-build.sh @@ -49,4 +49,31 @@ case $SHOULD_BUILD in ;; esac +if [[ $SHOULD_BUILD == systemtest ]]; then + yum -y --setopt=skip_missing_names_on_install=False install \ + zstd \ + devtoolset-9-gcc-c++ \ + devtoolset-9-libatomic-devel \ + devtoolset-9-binutils \ + libxml2-devel \ + rh-ruby25-rubygems-devel \ + rh-ruby25-ruby-devel \ + rh-ruby25 \ + rh-ruby25-rubygem-net-telnet + + source /opt/rh/rh-ruby25/enable + gem install libxml-ruby gnuplot distribution test-unit builder concurrent-ruby + + cd $HOME + git clone https://github.com/vespa-engine/system-test + export SYSTEM_TEST_DIR=$(pwd)/system-test + export RUBYLIB="$SYSTEM_TEST_DIR/lib:$SYSTEM_TEST_DIR/tests" + useradd vespa + export USER=$(whoami) + + $SYSTEM_TEST_DIR/lib/node_server.rb & + NODE_SERVER_PID=$! + sleep 3 + ruby $SYSTEM_TEST_DIR/tests/search/basicsearch/basic_search.rb || (/opt/vespa/bin/vespa-logfmt -N && false) +fi diff --git a/vespa-athenz/CMakeLists.txt b/vespa-athenz/CMakeLists.txt index bb5a1f5b6de..c9c9f83efee 100644 --- a/vespa-athenz/CMakeLists.txt +++ b/vespa-athenz/CMakeLists.txt @@ -1,2 +1,3 @@ # Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. install_fat_java_artifact(vespa-athenz) +install_config_definitions() diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 5a370209f1c..47530e2c6e2 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -377,10 +377,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { .map(parameters::withRoute) .orElse(parameters); break; - case CONDITION: - parameters = getProperty(request, ROUTE).map(parameters::withRoute) - .orElse(parameters); - break; case FIELD_SET: parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet) .orElse(parameters); @@ -702,7 +698,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.reader = reader; } - /** Write is complete when we have stored the buffer — call completion handler. */ + /** Write is complete when we have stored the buffer — call completion handler. */ @Override public void write(ByteBuffer buf, CompletionHandler handler) { try { @@ -948,18 +944,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { static class StorageCluster { private final String name; - private final String configId; private final Map<String, String> documentBuckets; - StorageCluster(String name, String configId, Map<String, String> documentBuckets) { + StorageCluster(String name, Map<String, String> documentBuckets) { this.name = requireNonNull(name); - this.configId = requireNonNull(configId); this.documentBuckets = Map.copyOf(documentBuckets); } String name() { return name; } - String configId() { return configId; } - String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } + String route() { return "[Content:cluster=" + name() + "]"; } Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } } @@ -968,7 +961,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return clusters.storage().stream() .collect(toUnmodifiableMap(storage -> storage.name(), storage -> new StorageCluster(storage.name(), - storage.configid(), buckets.cluster(storage.name()) .documentType().entrySet().stream() .collect(toMap(entry -> entry.getKey(), diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 826821981ba..449daa4970a 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -113,7 +113,6 @@ public class DocumentV1ApiTest { } final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", - "config-id", Map.of("music", "default"))); ManualClock clock; MockDocumentAccess access; @@ -139,7 +138,7 @@ public class DocumentV1ApiTest { public void testResolveCluster() { assertEquals("content", DocumentV1ApiHandler.resolveCluster(Optional.empty(), clusters).name()); - assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", + assertEquals("[Content:cluster=content]", DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters).route()); try { DocumentV1ApiHandler.resolveCluster(Optional.empty(), Map.of()); @@ -157,8 +156,8 @@ public class DocumentV1ApiTest { } try { Map<String, StorageCluster> twoClusters = new TreeMap<>(); - twoClusters.put("one", new StorageCluster("one", "one-config", Map.of())); - twoClusters.put("two", new StorageCluster("two", "two-config", Map.of())); + twoClusters.put("one", new StorageCluster("one", Map.of())); + twoClusters.put("two", new StorageCluster("two", Map.of())); DocumentV1ApiHandler.resolveCluster(Optional.empty(), twoClusters); fail("More than one cluster and no document type should fail"); } @@ -193,7 +192,7 @@ public class DocumentV1ApiTest { // GET at root is a visit. Numeric parameters have an upper bound. access.expect(parameters -> { - assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", parameters.getRoute().toString()); + assertEquals("[Content:cluster=content]", parameters.getRoute().toString()); assertEquals("default", parameters.getBucketSpace()); assertEquals(1024, parameters.getMaxTotalHits()); assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); @@ -275,7 +274,7 @@ public class DocumentV1ApiTest { // GET with full document ID is a document get operation which returns 404 when no document is found access.session.expect((id, parameters) -> { assertEquals(doc1.getId(), id); - assertEquals(parameters().withRoute("[Storage:cluster=content;clusterconfigid=config-id]").withFieldSet("go"), parameters); + assertEquals(parameters().withRoute("[Content:cluster=content]").withFieldSet("go"), parameters); parameters.responseHandler().get().handleResponse(new DocumentResponse(0, null)); return new Result(Result.ResultType.SUCCESS, null); }); diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java index 95bd9cf1cb5..c2f99689caf 100644 --- a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java @@ -2,14 +2,8 @@ package com.yahoo.vespaclient; public class ClusterDef { - public ClusterDef(String name, String configId) { - this.name = name; - this.configId = configId; - } - - String name; - String configId; - + private final String name; + public ClusterDef(String name) { this.name = name; } public String getName() { return name; } - public String getConfigId() { return configId; } + public String getRoute() { return "[Content:cluster=" + name + "]"; } } diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java index 4525ffcae39..2c3a0b72ed5 100644 --- a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java +++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java @@ -4,40 +4,38 @@ package com.yahoo.vespaclient; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.config.subscription.ConfigGetter; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** A list of content clusters, either obtained from a list, a given config or by self-subscribing */ public class ClusterList { - List<ClusterDef> contentClusters = new ArrayList<>(); + private final List<ClusterDef> contentClusters; public ClusterList() { - this(new ArrayList<>()); + this(List.of()); } public ClusterList(List<ClusterDef> contentClusters) { - this.contentClusters = contentClusters; + this.contentClusters = List.copyOf(contentClusters); } public ClusterList(String configId) { - configure(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId)); + this(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId)); } public ClusterList(ClusterListConfig config) { - configure(config); + this(parse(config)); } - private void configure(ClusterListConfig config) { - contentClusters.clear(); // TODO: Create a new - for (int i = 0; i < config.storage().size(); i++) - contentClusters.add(new ClusterDef(config.storage(i).name(), config.storage(i).configid())); + public List<ClusterDef> getStorageClusters() { + return contentClusters; } - /** Returns a reference to the mutable list */ - public List<ClusterDef> getStorageClusters() { - return contentClusters; // TODO: Use immutable list + private static List<ClusterDef> parse(ClusterListConfig config) { + return config.storage().stream() + .map(storage -> new ClusterDef(storage.name())) + .collect(Collectors.toUnmodifiableList()); } } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java b/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java index 6c8296d7979..ebed1685a5f 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespaget/DocumentRetriever.java @@ -116,7 +116,7 @@ public class DocumentRetriever { "The Vespa cluster contains the content clusters %s, not %s. Please select a valid vespa cluster.", names, clusterName)); } - return String.format("[Storage:cluster=%s;clusterconfigid=%s]", clusterDef.getName(), clusterDef.getConfigId()); + return clusterDef.getRoute(); } private LoadType resolveLoadType(String loadTypeName) throws DocumentRetrieverException { diff --git a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java index 88eed9dfc59..b1f91e44e5c 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespavisit/VdsVisit.java @@ -600,7 +600,7 @@ public class VdsVisit { names + ". Please use the -c option to select one of them as a target for visiting."); } - return "[Storage:cluster=" + found.getName() + ";clusterconfigid=" + found.getConfigId() + "]"; + return found.getRoute(); } protected static void verbosePrintParameters(VdsVisitParameters vdsParams, PrintStream out) { diff --git a/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java b/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java index d8b5c267bf2..8820658fa02 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespaget/DocumentRetrieverTest.java @@ -234,14 +234,14 @@ public class DocumentRetrieverTest { @Test public void testClusterLookup() throws DocumentRetrieverException { - final String cluster = "storage", configId = "content/cluster.foo/storage", - expectedRoute = "[Storage:cluster=storage;clusterconfigid=content/cluster.foo/storage]"; + final String cluster = "storage", + expectedRoute = "[Content:cluster=storage]"; ClientParameters params = createParameters() .setCluster(cluster) .build(); - ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef(cluster, configId))); + ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef(cluster))); DocumentRetriever documentRetriever = createDocumentRetriever(params, clusterList); documentRetriever.retrieveDocuments(); @@ -258,7 +258,7 @@ public class DocumentRetrieverTest { .setCluster("invalidclustername") .build(); - ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef("storage", "content/cluster.foo/storage"))); + ClusterList clusterList = new ClusterList(Collections.singletonList(new ClusterDef("storage"))); DocumentRetriever documentRetriever = createDocumentRetriever(params, clusterList); documentRetriever.retrieveDocuments(); diff --git a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java index 4c8fbb1beee..f4d58a671f1 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespavisit/VdsVisitTestCase.java @@ -228,17 +228,17 @@ public class VdsVisitTestCase { @Test public void testAutoSelectClusterRoute() throws Exception { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); + clusterDefs.add(new ClusterDef("storage")); ClusterList clusterList = new ClusterList(clusterDefs); String route = VdsVisit.resolveClusterRoute(clusterList, null); - assertEquals("[Storage:cluster=storage;clusterconfigid=content/cluster.foo/storage]", route); + assertEquals("[Content:cluster=storage]", route); } @Test public void testBadClusterName() throws Exception { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); + clusterDefs.add(new ClusterDef("storage")); ClusterList clusterList = new ClusterList(clusterDefs); try { VdsVisit.resolveClusterRoute(clusterList, "borkbork"); @@ -252,8 +252,8 @@ public class VdsVisitTestCase { @Test public void testRequireClusterOptionIfMultipleClusters() { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); - clusterDefs.add(new ClusterDef("storage2", "content/cluster.bar/storage")); + clusterDefs.add(new ClusterDef("storage")); + clusterDefs.add(new ClusterDef("storage2")); ClusterList clusterList = new ClusterList(clusterDefs); try { VdsVisit.resolveClusterRoute(clusterList, null); @@ -265,12 +265,12 @@ public class VdsVisitTestCase { @Test public void testExplicitClusterOptionWithMultipleClusters() { List<ClusterDef> clusterDefs = new ArrayList<>(); - clusterDefs.add(new ClusterDef("storage", "content/cluster.foo/storage")); - clusterDefs.add(new ClusterDef("storage2", "content/cluster.bar/storage")); + clusterDefs.add(new ClusterDef("storage")); + clusterDefs.add(new ClusterDef("storage2")); ClusterList clusterList = new ClusterList(clusterDefs); String route = VdsVisit.resolveClusterRoute(clusterList, "storage2"); - assertEquals("[Storage:cluster=storage2;clusterconfigid=content/cluster.bar/storage]", route); + assertEquals("[Content:cluster=storage2]", route); } @Test |