summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java8
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java9
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusParams.java17
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java22
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java35
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java22
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/MessageTypePolicy.java11
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java34
-rw-r--r--vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java139
-rw-r--r--vdslib/src/test/java/com/yahoo/vdslib/distribution/CrossPlatformTestFactory.java5
10 files changed, 223 insertions, 79 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
index b79bfd41dc7..e8962aa83c5 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/DocumentAccessProvider.java
@@ -7,6 +7,8 @@ import com.yahoo.component.AbstractComponent;
import com.yahoo.container.di.componentgraph.Provider;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
+import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
/**
@@ -20,8 +22,10 @@ public class DocumentAccessProvider extends AbstractComponent implements Provide
@Inject
public DocumentAccessProvider(DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig,
- SlobroksConfig slobroksConfig, MessagebusConfig messagebusConfig) {
- this.access = new VespaDocumentAccess(documentmanagerConfig, loadTypeConfig, slobroksConfig, messagebusConfig);
+ SlobroksConfig slobroksConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
+ this.access = new VespaDocumentAccess(documentmanagerConfig, loadTypeConfig, slobroksConfig, messagebusConfig,
+ policiesConfig, distributionConfig);
}
@Override
diff --git a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
index 2918ffb2c80..6a106552a12 100644
--- a/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
+++ b/container-core/src/main/java/com/yahoo/container/core/documentapi/VespaDocumentAccess.java
@@ -20,6 +20,8 @@ import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
+import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
/**
@@ -39,9 +41,12 @@ public class VespaDocumentAccess extends DocumentAccess {
VespaDocumentAccess(DocumentmanagerConfig documentmanagerConfig,
LoadTypeConfig loadTypeConfig,
SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig) {
+ MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
super(new DocumentAccessParams().setDocumentmanagerConfig(documentmanagerConfig));
- this.parameters = new MessageBusParams(new LoadTypeSet(loadTypeConfig));
+ this.parameters = new MessageBusParams(new LoadTypeSet(loadTypeConfig))
+ .setDocumentProtocolPoliciesConfig(policiesConfig, distributionConfig);
this.parameters.setDocumentmanagerConfig(documentmanagerConfig);
this.parameters.getRPCNetworkParams().setSlobroksConfig(slobroksConfig);
this.parameters.getMessageBusParams().setMessageBusConfig(messagebusConfig);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusParams.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusParams.java
index a838f3b8723..824b1144a67 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusParams.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusParams.java
@@ -3,8 +3,15 @@ package com.yahoo.documentapi.messagebus;
import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.vespa.config.content.DistributionConfig;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
/**
* @author Einar M R Rosenvinge
@@ -13,6 +20,8 @@ public class MessageBusParams extends DocumentAccessParams {
private String routingConfigId = null;
private String protocolConfigId = null;
+ private DocumentProtocolPoliciesConfig policiesConfig = null;
+ private DistributionConfig distributionConfig = null;
private String route = "route:default";
private String routeForGet = "route:default-get";
private int traceLevel = 0;
@@ -79,6 +88,14 @@ public class MessageBusParams extends DocumentAccessParams {
return this;
}
+ /** Sets the config used by the {@link DocumentProtocol} policies. */
+ public MessageBusParams setDocumentProtocolPoliciesConfig(DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this.policiesConfig = requireNonNull(policiesConfig);
+ this.distributionConfig = requireNonNull(distributionConfig);
+ return this;
+ }
+
/**
* Sets the name of the route to send appropriate requests to. This is a convenience method for prefixing a route
* with "route:", and using {@link #setRoute} instead.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index fdb59941a31..f8e6989bbfa 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -6,7 +6,6 @@ import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
-import java.util.logging.Level;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
@@ -22,6 +21,7 @@ import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.config.content.DistributionConfig;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +32,7 @@ import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -224,12 +225,20 @@ public class ContentPolicy extends SlobrokPolicy {
protected final String clusterName;
protected final String distributionConfigId;
+ protected final DistributionConfig distributionConfig;
protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;
public Parameters(Map<String, String> params) {
+ this(params, null);
+ }
+
+ private Parameters(Map<String, String> params, DistributionConfig config) {
clusterName = params.get("cluster");
if (clusterName == null)
throw new IllegalArgumentException("Required parameter 'cluster', the name of the content cluster, not set");
+ distributionConfig = config;
+ if (distributionConfig != null && distributionConfig.cluster(clusterName) == null)
+ throw new IllegalArgumentException("Distribution config for cluster '" + clusterName + "' not found");
distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove
slobrokHostPatternGenerator = createPatternGenerator();
}
@@ -247,7 +256,8 @@ public class ContentPolicy extends SlobrokPolicy {
return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
}
public Distribution createDistribution(SlobrokPolicy policy) {
- return new Distribution(getDistributionConfigId());
+ return distributionConfig == null ? new Distribution(getDistributionConfigId())
+ : new Distribution(distributionConfig.cluster(clusterName));
}
/**
@@ -550,12 +560,8 @@ public class ContentPolicy extends SlobrokPolicy {
private final Parameters parameters;
/** Constructor used in production. */
- public ContentPolicy(String param) {
- this(parse(param));
- }
-
- public ContentPolicy(Map<String, String> params) {
- this(new Parameters(params));
+ public ContentPolicy(String param, DistributionConfig config) {
+ this(new Parameters(parse(param), config));
}
/** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index 2680ed011af..9a889f72115 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -11,10 +11,12 @@ import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.Routable;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.messagebus.routing.RoutingNodeIterator;
import com.yahoo.messagebus.routing.RoutingPolicy;
import com.yahoo.text.Utf8String;
+import com.yahoo.vespa.config.content.DistributionConfig;
import java.util.Collections;
import java.util.HashSet;
@@ -24,6 +26,8 @@ import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static java.util.Objects.requireNonNull;
+
/**
* Implements the message bus protocol that is used by all components of Vespa.
*
@@ -243,27 +247,34 @@ public class DocumentProtocol implements Protocol {
this(docMan, configId, new LoadTypeSet());
}
+ public DocumentProtocol(DocumentTypeManager documentTypeManager, LoadTypeSet loadTypes,
+ DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
+ this(requireNonNull(documentTypeManager), null, requireNonNull(loadTypes),
+ requireNonNull(policiesConfig), requireNonNull(distributionConfig));
+ }
+
public DocumentProtocol(DocumentTypeManager docMan, String configId, LoadTypeSet set) {
- // Prepare config string for routing policy factories.
- String cfg = (configId == null ? "client" : configId);
- if (docMan != null) {
+ this(docMan, configId == null ? "client" : configId, set, null, null);
+ }
+
+ private DocumentProtocol(DocumentTypeManager docMan, String configId, LoadTypeSet set,
+ DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) {
+ if (docMan != null)
this.docMan = docMan;
- } else {
- this.docMan = new DocumentTypeManager();
- DocumentTypeManagerConfigurer.configure(this.docMan, cfg);
- }
- routableRepository = new RoutableRepository(set);
+ else
+ DocumentTypeManagerConfigurer.configure(this.docMan = new DocumentTypeManager(), configId);
+ this.routableRepository = new RoutableRepository(set);
// When adding factories to this list, please KEEP THEM ORDERED alphabetically like they are now.
putRoutingPolicyFactory("AND", new RoutingPolicyFactories.AndPolicyFactory());
- putRoutingPolicyFactory("Content", new RoutingPolicyFactories.ContentPolicyFactory());
- putRoutingPolicyFactory("DocumentRouteSelector", new RoutingPolicyFactories.DocumentRouteSelectorPolicyFactory(cfg));
+ putRoutingPolicyFactory("Content", new RoutingPolicyFactories.ContentPolicyFactory(distributionConfig));
+ putRoutingPolicyFactory("DocumentRouteSelector", new RoutingPolicyFactories.DocumentRouteSelectorPolicyFactory(configId, policiesConfig));
putRoutingPolicyFactory("Extern", new RoutingPolicyFactories.ExternPolicyFactory());
putRoutingPolicyFactory("LocalService", new RoutingPolicyFactories.LocalServicePolicyFactory());
- putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg));
+ putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(configId, policiesConfig));
putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory());
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
- putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory());
+ putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.ContentPolicyFactory(distributionConfig));
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
// Prepare version specifications to use when adding routable factories.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java
index 8fbd1548f68..6151daf043f 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentRouteSelectorPolicy.java
@@ -5,13 +5,15 @@ import com.yahoo.config.subscription.ConfigSubscriber;
import com.yahoo.document.DocumentGet;
import com.yahoo.document.select.DocumentSelector;
import com.yahoo.document.select.Result;
-import java.util.logging.Level;
+import com.yahoo.document.select.parser.ParseException;
import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -30,6 +32,24 @@ public class DocumentRouteSelectorPolicy
private String error = "Not configured.";
private ConfigSubscriber subscriber;
+ /** This policy is constructed with the proper config at its time of creation. */
+ public DocumentRouteSelectorPolicy(DocumentProtocolPoliciesConfig config) {
+ Map<String, DocumentSelector> selectors = new HashMap<>();
+ config.cluster().forEach((name, cluster) -> {
+ try {
+ selectors.put(name, new DocumentSelector(cluster.selector()));
+ }
+ catch (ParseException e) {
+ throw new IllegalArgumentException("Error parsing selector '" +
+ cluster.selector() +
+ "' for route '" + name +"'",
+ e);
+ }
+ });
+ this.config = Map.copyOf(selectors);
+ this.error = null;
+ }
+
/**
* This policy is constructed with a configuration identifier that can be subscribed to for the document selector
* config. If the string is either null or empty it will default to the proper one.
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/MessageTypePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/MessageTypePolicy.java
index 4226c1e6cac..026a26cfc0c 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/MessageTypePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/MessageTypePolicy.java
@@ -2,13 +2,17 @@
package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.vespa.config.content.MessagetyperouteselectorpolicyConfig;
+
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
/**
* @author baldersheim
*/
@@ -18,6 +22,13 @@ public class MessageTypePolicy implements DocumentProtocolRoutingPolicy, ConfigS
private ConfigSubscriber subscriber;
private volatile Route defaultRoute;
+ MessageTypePolicy(DocumentProtocolPoliciesConfig.Cluster config) {
+ configRef.set(config.route().stream()
+ .collect(toUnmodifiableMap(route -> route.messageType(),
+ route -> Route.parse(route.name()))));
+ defaultRoute = Route.parse(config.defaultRoute());
+ }
+
MessageTypePolicy(String configId) {
subscriber = new ConfigSubscriber();
subscriber.subscribe(this, MessagetyperouteselectorpolicyConfig.class, configId);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
index a09faea73d2..8535fa610dd 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
+import com.yahoo.messagebus.documentapi.DocumentProtocolPoliciesConfig;
+import com.yahoo.vespa.config.content.DistributionConfig;
+
/**
* @author Simon Thoresen Hult
* @author jonmv
@@ -16,37 +19,54 @@ class RoutingPolicyFactories {
}
static class ContentPolicyFactory implements RoutingPolicyFactory {
+ private final DistributionConfig distributionConfig;
+ public ContentPolicyFactory(DistributionConfig config) { this.distributionConfig = config; }
public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new ContentPolicy(param);
+ return new ContentPolicy(param, distributionConfig);
}
}
static class MessageTypePolicyFactory implements RoutingPolicyFactory {
private final String configId;
+ private final DocumentProtocolPoliciesConfig config;
- public MessageTypePolicyFactory(String configId) {
+ public MessageTypePolicyFactory(String configId, DocumentProtocolPoliciesConfig config) {
this.configId = configId;
+ this.config = config;
}
public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new MessageTypePolicy((param == null || param.isEmpty()) ? configId : param);
- }
+ if (config != null) {
+ if (config.cluster(param) == null)
+ return new ErrorPolicy("No message type config for cluster '" + param + "'");
- public void destroy() {
+ return new MessageTypePolicy(config.cluster(param));
+ }
+ return new MessageTypePolicy(param == null || param.isEmpty() ? configId : param);
}
}
static class DocumentRouteSelectorPolicyFactory implements RoutingPolicyFactory {
private final String configId;
+ private final DocumentProtocolPoliciesConfig config;
- public DocumentRouteSelectorPolicyFactory(String configId) {
+ public DocumentRouteSelectorPolicyFactory(String configId, DocumentProtocolPoliciesConfig config) {
this.configId = configId;
+ this.config = config;
}
public DocumentProtocolRoutingPolicy createPolicy(String param) {
- DocumentRouteSelectorPolicy ret = new DocumentRouteSelectorPolicy((param == null || param.isEmpty()) ?
+ if (config != null) {
+ try {
+ return new DocumentRouteSelectorPolicy(config);
+ }
+ catch (IllegalArgumentException e) {
+ return new ErrorPolicy(e.getMessage());
+ }
+ }
+ DocumentRouteSelectorPolicy ret = new DocumentRouteSelectorPolicy(param == null || param.isEmpty() ?
configId : param);
String error = ret.getError();
if (error != null) {
diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
index 16f18cf8f8b..26e8f074b49 100644
--- a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
+++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java
@@ -9,6 +9,7 @@ import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.StorDistributionConfig;
import com.yahoo.document.BucketId;
@@ -52,61 +53,103 @@ public class Distribution {
return config.getAcquire().redundancy;
}
- private ConfigSubscriber.SingleSubscriber<StorDistributionConfig> configSubscriber = new ConfigSubscriber.SingleSubscriber<>() {
- private int[] getGroupPath(String path) {
- if (path.equals("invalid")) { return new int[0]; }
- StringTokenizer st = new StringTokenizer(path, ".");
- int[] p = new int[st.countTokens()];
- for (int i=0; i<p.length; ++i) {
- p[i] = Integer.valueOf(st.nextToken());
- }
- return p;
+ private static int[] getGroupPath(String path) {
+ if (path.equals("invalid")) { return new int[0]; }
+ StringTokenizer st = new StringTokenizer(path, ".");
+ int[] p = new int[st.countTokens()];
+ for (int i=0; i<p.length; ++i) {
+ p[i] = Integer.valueOf(st.nextToken());
}
+ return p;
+ }
- @Override
- public void configure(StorDistributionConfig config) {
- try{
- Group root = null;
- for (int i=0; i<config.group().size(); ++i) {
- StorDistributionConfig.Group cg = config.group(i);
- int[] path = new int[0];
- if (root != null) {
- path = getGroupPath(cg.index());
- }
- boolean isLeafGroup = (cg.nodes().size() > 0);
- Group group;
- int index = (path.length == 0 ? 0 : path[path.length - 1]);
- if (isLeafGroup) {
- group = new Group(index, cg.name());
- List<ConfiguredNode> nodes = new ArrayList<>();
- for (StorDistributionConfig.Group.Nodes node : cg.nodes()) {
- nodes.add(new ConfiguredNode(node.index(), node.retired()));
- }
- group.setNodes(nodes);
- } else {
- group = new Group(index, cg.name(), new Group.Distribution(cg.partitions(), config.redundancy()));
+ // NOTE: keep in sync with the below
+ private ConfigSubscriber.SingleSubscriber<StorDistributionConfig> configSubscriber = config -> {
+ try {
+ Group root = null;
+ for (int i=0; i<config.group().size(); ++i) {
+ StorDistributionConfig.Group cg = config.group(i);
+ int[] path = new int[0];
+ if (root != null) {
+ path = getGroupPath(cg.index());
+ }
+ boolean isLeafGroup = (cg.nodes().size() > 0);
+ Group group;
+ int index = (path.length == 0 ? 0 : path[path.length - 1]);
+ if (isLeafGroup) {
+ group = new Group(index, cg.name());
+ List<ConfiguredNode> nodes = new ArrayList<>();
+ for (StorDistributionConfig.Group.Nodes node : cg.nodes()) {
+ nodes.add(new ConfiguredNode(node.index(), node.retired()));
}
- group.setCapacity(cg.capacity());
- if (path.length == 0) {
- root = group;
- } else {
- Group parent = root;
- for (int j=0; j<path.length - 1; ++j) {
- parent = parent.getSubgroups().get(path[j]);
- }
- parent.addSubGroup(group);
+ group.setNodes(nodes);
+ } else {
+ group = new Group(index, cg.name(), new Group.Distribution(cg.partitions(), config.redundancy()));
+ }
+ group.setCapacity(cg.capacity());
+ if (path.length == 0) {
+ root = group;
+ } else {
+ Group parent = root;
+ for (int j=0; j<path.length - 1; ++j) {
+ parent = parent.getSubgroups().get(path[j]);
}
+ parent.addSubGroup(group);
}
- if (root == null)
- throw new IllegalStateException("Config does not specify a root group");
- root.calculateDistributionHashValues();
- Distribution.this.config.setRelease(new Config(root, config.redundancy(), config.distributor_auto_ownership_transfer_on_whole_group_down()));
- } catch (ParseException e) {
- throw new IllegalStateException("Failed to parse config", e);
}
+ if (root == null)
+ throw new IllegalStateException("Config does not specify a root group");
+ root.calculateDistributionHashValues();
+ Distribution.this.config.setRelease(new Config(root, config.redundancy(), config.distributor_auto_ownership_transfer_on_whole_group_down()));
+ } catch (ParseException e) {
+ throw new IllegalStateException("Failed to parse config", e);
}
};
+ // TODO jonmv: De-dupe with this.configSubscriber once common config is used
+ private void configure(DistributionConfig.Cluster config) {
+ try {
+ Group root = null;
+ for (int i=0; i<config.group().size(); ++i) {
+ DistributionConfig.Cluster.Group cg = config.group(i);
+ int[] path = new int[0];
+ if (root != null) {
+ path = getGroupPath(cg.index());
+ }
+ boolean isLeafGroup = (cg.nodes().size() > 0);
+ Group group;
+ int index = (path.length == 0 ? 0 : path[path.length - 1]);
+ if (isLeafGroup) {
+ group = new Group(index, cg.name());
+ List<ConfiguredNode> nodes = new ArrayList<>();
+ for (DistributionConfig.Cluster.Group.Nodes node : cg.nodes()) {
+ nodes.add(new ConfiguredNode(node.index(), node.retired()));
+ }
+ group.setNodes(nodes);
+ } else {
+ group = new Group(index, cg.name(), new Group.Distribution(cg.partitions(), config.redundancy()));
+ }
+ group.setCapacity(cg.capacity());
+ if (path.length == 0) {
+ root = group;
+ } else {
+ Group parent = root;
+ for (int j=0; j<path.length - 1; ++j) {
+ parent = parent.getSubgroups().get(path[j]);
+ }
+ parent.addSubGroup(group);
+ }
+ }
+ if (root == null)
+ throw new IllegalStateException("Config does not specify a root group");
+ root.calculateDistributionHashValues();
+ Distribution.this.config.setRelease(new Config(root, config.redundancy(), true));
+ } catch (ParseException e) {
+ throw new IllegalStateException("Failed to parse config", e);
+ }
+ }
+
+
public Distribution(String configId) {
int mask = 0;
for (int i=0; i<=64; ++i) {
@@ -131,6 +174,10 @@ public class Distribution {
configSubscriber.configure(config);
}
+ public Distribution(DistributionConfig.Cluster config) {
+ configure(config);
+ }
+
public void close() {
if (configSub!=null) {
configSub.close();
diff --git a/vdslib/src/test/java/com/yahoo/vdslib/distribution/CrossPlatformTestFactory.java b/vdslib/src/test/java/com/yahoo/vdslib/distribution/CrossPlatformTestFactory.java
index 94f7d7a8c94..7dadd9560b5 100644
--- a/vdslib/src/test/java/com/yahoo/vdslib/distribution/CrossPlatformTestFactory.java
+++ b/vdslib/src/test/java/com/yahoo/vdslib/distribution/CrossPlatformTestFactory.java
@@ -1,7 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vdslib.distribution;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
/**
* Helper class to implement unit tests that should produce the same result in different implementations.