diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 15:09:24 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-26 15:09:24 +0200 |
commit | afba2a1b55183e7fc89da97a2f8535365021e723 (patch) | |
tree | 9095d4fbef86d5b8f332aacfd89682838449bc61 | |
parent | 04bed07efb4b196047681f91f79bd531966cd37c (diff) |
Get rid of very complicated and inherently thread unsafe code as external slobrok/configserver support is long gone.
12 files changed, 134 insertions, 338 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java deleted file mode 100644 index b88c8cc3b00..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.routing.RoutingContext; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.logging.Logger; - -/** - * Abstract class for routing policies that need asynchronous initialization. - * This is recommended if the routing policy needs configuration, or synchronization with - * other sources. If this policy is not used in those cases, the messagebus thread might hang - * waiting for the other sources, causing messages to other routes to be blocked. - */ -public abstract class AsyncInitializationPolicy implements DocumentProtocolRoutingPolicy, Runnable { - - protected enum InitState { - NOT_STARTED, - RUNNING, - DONE - } - - private static final Logger log = Logger.getLogger(AsyncInitializationPolicy.class.getName()); - - private InitState initState; - private ScheduledThreadPoolExecutor executor; - private Exception initException; - private boolean syncInit = true; - - public static Map<String, String> parse(String param) { - Map<String, String> map = new TreeMap<>(); - - if (param != null) { - String[] p = param.split(";"); - for (String s : p) { - String[] keyValue = s.split("="); - - if (keyValue.length == 1) { - map.put(keyValue[0], "true"); - } else if (keyValue.length == 2) { - map.put(keyValue[0], keyValue[1]); - } - } - } - - return map; - } - - AsyncInitializationPolicy() { - initState = InitState.NOT_STARTED; - } - - synchronized void needAsynchronousInitialization() { - syncInit = false; - } - - public abstract void init(); - - public abstract void doSelect(RoutingContext routingContext); - - private synchronized void checkStartInit() { - if (initState == InitState.NOT_STARTED) { - if (syncInit) { - init(); - initState = InitState.DONE; - } else { - executor = new ScheduledThreadPoolExecutor(1); - executor.execute(this); - initState = InitState.RUNNING; - } - } - } - - @Override - public void select(RoutingContext routingContext) { - synchronized (this) { - if (initException != null) { - Reply reply = new EmptyReply(); - reply.addError(new com.yahoo.messagebus.Error(ErrorCode.POLICY_ERROR, "Policy threw exception during init:" + exceptionMessageWithTrace(initException))); - routingContext.setReply(reply); - return; - } - - checkStartInit(); - - if (initState == InitState.RUNNING) { - Reply reply = new EmptyReply(); - reply.addError(new com.yahoo.messagebus.Error(ErrorCode.SESSION_BUSY, "Policy is waiting to be initialized.")); - routingContext.setReply(reply); - return; - } - } - - doSelect(routingContext); - } - - public void run() { - try { - init(); - } catch (Exception e) { - log.log(LogLevel.WARNING,"Init threw exception",e); - initException = e; - } - - synchronized (this) { - initState = InitState.DONE; - this.notifyAll(); - } - } - - @Override - public synchronized void destroy() { - if (executor != null) { - executor.shutdownNow(); - } - } - - - private static String exceptionMessageWithTrace(Exception e) { - StringWriter sw = new StringWriter(); - try (PrintWriter pw = new PrintWriter(sw)) { - e.printStackTrace(pw); - pw.flush(); - } - return sw.toString(); - - } - - - -} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java index 3ece2217601..d6e20b9d57f 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java @@ -33,7 +33,7 @@ public class ContentPolicy extends StoragePolicy { } public ContentPolicy(Map<String, String> params) { - super(new ContentParameters(params), params); + super(new ContentParameters(params)); } public ContentPolicy(String parameters) { diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java index 77bc904ab12..450f53c4440 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java @@ -3,7 +3,6 @@ package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.messagebus.routing.RoutingPolicy; - /** * @author thomasg */ diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java deleted file mode 100644 index e791cdcf008..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.config.subscription.ConfigSubscriber; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Transport; -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.jrt.slobrok.api.Mirror; -import com.yahoo.jrt.slobrok.api.SlobrokList; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.cloud.config.SlobroksConfig; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Abstract class for policies that allow you to specify which slobrok to use for the - * routing. - */ -public abstract class ExternalSlobrokPolicy extends AsyncInitializationPolicy implements ConfigSubscriber.SingleSubscriber<SlobroksConfig> { - String error; - private Supervisor orb = null; - private final AtomicReference<Mirror> safeMirror = new AtomicReference<>(null); - private SlobrokList slobroks = null; - private boolean firstTry = true; - private ConfigSubscriber subscriber; - String[] configSources = null; - private final static String slobrokConfigId = "admin/slobrok.0"; - - - ExternalSlobrokPolicy(Map<String, String> param) { - super(); - - String conf = param.get("config"); - if (conf != null) { - configSources = conf.split(","); - } - - String slbrk = param.get("slobroks"); - if (slbrk != null) { - slobroks = new SlobrokList(); - slobroks.setup(slbrk.split(",")); - } - - if (slobroks != null || configSources != null) { - needAsynchronousInitialization(); - } - } - - @Override - public synchronized void init() { - if (slobroks != null) { - orb = new Supervisor(new Transport()); - safeMirror.set(new Mirror(orb, slobroks)); - } - - if (configSources != null) { - if (safeMirror.get() == null) { - orb = new Supervisor(new Transport()); - subscriber = subscribe(slobrokConfigId, new ConfigSourceSet(configSources)); - } - } - } - - private ConfigSubscriber subscribe(String configId, final ConfigSourceSet configSourceSet) { - ConfigSubscriber subscriber = new ConfigSubscriber(configSourceSet); - subscriber.subscribe(this, SlobroksConfig.class, configId); - return subscriber; - } - - public IMirror getMirror() { - return safeMirror.get(); - } - - public List<Mirror.Entry> lookup(RoutingContext context, String pattern) { - IMirror mirror1 = getMirror(); - if (mirror1 == null) { - mirror1 = context.getMirror(); - } - - List<Mirror.Entry> arr = mirror1.lookup(pattern); - - if (arr.isEmpty()) { - synchronized(this) { - if (firstTry) { - try { - int count = 0; - while (arr.isEmpty() && count < 100) { - Thread.sleep(50); - arr = mirror1.lookup(pattern); - count++; - } - } catch (InterruptedException e) { - } - firstTry = true; - } - - } - } - - return arr; - } - - @Override - public synchronized void configure(SlobroksConfig config) { - String[] slist = new String[config.slobrok().size()]; - - for(int i = 0; i < config.slobrok().size(); i++) { - slist[i] = config.slobrok(i).connectionspec(); - } - if (slobroks == null) { - slobroks = new SlobrokList(); - } - slobroks.setup(slist); - if (safeMirror.get() == null) { - safeMirror.set(new Mirror(orb, slobroks)); - } - - } - - @Override - public synchronized void destroy() { - if (subscriber!=null) subscriber.close(); - } - -} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java index a2875f14ab5..054b120cf81 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; +import com.google.common.util.concurrent.AtomicDouble; import com.yahoo.jrt.slobrok.api.Mirror; import java.util.List; @@ -28,10 +29,10 @@ public class LoadBalancer { } /** Statistics on each node we are load balancing over. Populated lazily. */ - private List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>(); + private final List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>(); - private String cluster; - private double position = 0.0; + private final String cluster; + private final AtomicDouble safePosition = new AtomicDouble(0.0); public LoadBalancer(String cluster) { this.cluster = cluster; @@ -67,6 +68,7 @@ public class LoadBalancer { double weightSum = 0.0; Node selectedNode = null; + double position = safePosition.get(); for (Mirror.Entry entry : choices) { NodeMetrics nodeMetrics = getNodeMetrics(entry); @@ -82,6 +84,7 @@ public class LoadBalancer { selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0))); } position += 1.0; + safePosition.set(position); selectedNode.metrics.sent.incrementAndGet(); return selectedNode; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java index 9cf82144e71..3d129684465 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java @@ -23,32 +23,28 @@ import java.util.Map; * * @author <a href="mailto:humbe@yahoo-inc.com">Haakon Humberset</a> */ -public class LoadBalancerPolicy extends ExternalSlobrokPolicy { +public class LoadBalancerPolicy extends SlobrokPolicy { private final String session; private final String pattern; - private LoadBalancer loadBalancer; + private final LoadBalancer loadBalancer; LoadBalancerPolicy(String param) { this(parse(param)); } private LoadBalancerPolicy(Map<String, String> params) { - super(params); + super(); String cluster = params.get("cluster"); session = params.get("session"); if (cluster == null) { - error = "Required parameter pattern not set"; - pattern = null; - return; + throw new IllegalArgumentException("Required parameter 'cluster' not set"); } if (session == null) { - error = "Required parameter session not set"; - pattern = null; - return; + throw new IllegalArgumentException("Required parameter 'session' not set"); } pattern = cluster + "/*/" + session; @@ -56,7 +52,7 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy { } @Override - public void doSelect(RoutingContext context) { + public void select(RoutingContext context) { LoadBalancer.Node node = getRecipient(context); if (node != null) { @@ -65,8 +61,7 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy { route.setHop(0, Hop.parse(node.entry.getSpec() + "/" + session)); context.addChild(route); } else { - context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "Could not resolve any nodes to send to in pattern " + pattern); + context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, "Could not resolve any nodes to send to in pattern " + pattern); } } @@ -95,4 +90,9 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy { context.setReply(reply); } + + @Override + public void destroy() { + + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java new file mode 100644 index 00000000000..864c45218d3 --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java @@ -0,0 +1,63 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.messagebus.routing.RoutingContext; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Abstract class for policies that allow you to specify which slobrok to use for the + * routing. + */ +public abstract class SlobrokPolicy implements DocumentProtocolRoutingPolicy { + private boolean firstTry = true; + + protected List<Mirror.Entry> lookup(RoutingContext context, String pattern) { + IMirror mirror1 = context.getMirror(); + + List<Mirror.Entry> arr = mirror1.lookup(pattern); + + if (arr.isEmpty()) { + synchronized(this) { + if (firstTry) { + try { + int count = 0; + while (arr.isEmpty() && count < 100) { + Thread.sleep(50); + arr = mirror1.lookup(pattern); + count++; + } + } catch (InterruptedException e) { + } + firstTry = true; + } + } + } + + return arr; + } + + public static Map<String, String> parse(String param) { + Map<String, String> map = new TreeMap<>(); + + if (param != null) { + String[] p = param.split(";"); + for (String s : p) { + String[] keyValue = s.split("="); + + if (keyValue.length == 1) { + map.put(keyValue[0], "true"); + } else if (keyValue.length == 2) { + map.put(keyValue[0], keyValue[1]); + } + } + } + + return map; + } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 60589432ea7..918bd193d89 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -2,7 +2,6 @@ package com.yahoo.documentapi.messagebus.protocol; import com.yahoo.concurrent.CopyOnWriteHashMap; -import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.jrt.slobrok.api.IMirror; @@ -46,7 +45,7 @@ import java.util.logging.Logger; * * @author Haakon Humberset */ -public class StoragePolicy extends ExternalSlobrokPolicy { +public class StoragePolicy extends SlobrokPolicy { private static final Logger log = Logger.getLogger(StoragePolicy.class.getName()); public static final String owningBucketStates = "uim"; @@ -123,9 +122,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy { /** Host fetcher using a slobrok mirror to find the hosts. */ public static class SlobrokHostFetcher extends HostFetcher { private final SlobrokHostPatternGenerator patternGenerator; - private final ExternalSlobrokPolicy policy; + private final SlobrokPolicy policy; - SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) { + SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { super(percent); this.patternGenerator = patternGenerator; this.policy = policy; @@ -182,7 +181,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null); - TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) { + TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) { super(patternGenerator, policy, percent); } @@ -240,13 +239,11 @@ public class StoragePolicy extends ExternalSlobrokPolicy { public SlobrokHostPatternGenerator createPatternGenerator() { return new SlobrokHostPatternGenerator(getClusterName()); } - public HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) { + public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent); } - public Distribution createDistribution(ExternalSlobrokPolicy policy) { - return (policy.configSources != null ? - new Distribution(getDistributionConfigId(), new ConfigSourceSet(policy.configSources)) - : new Distribution(getDistributionConfigId())); + public Distribution createDistribution(SlobrokPolicy policy) { + return new Distribution(getDistributionConfigId()); } /** @@ -351,7 +348,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0); private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection - DistributorSelectionLogic(Parameters params, ExternalSlobrokPolicy policy) { + DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) { this.hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes()); this.distribution = params.createDistribution(policy); persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit()); @@ -534,7 +531,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator(); - private final AtomicReference<DistributorSelectionLogic> distributorSelectionLogic = new AtomicReference<>(); + private final DistributorSelectionLogic distributorSelectionLogic; private final Parameters parameters; /** Constructor used in production. */ @@ -543,23 +540,18 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } public StoragePolicy(Map<String, String> params) { - this(new Parameters(params), params); + this(new Parameters(params)); } /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */ - public StoragePolicy(Parameters p, Map<String, String> params) { - super(params); + public StoragePolicy(Parameters p) { + super(); parameters = p; + distributorSelectionLogic = new DistributorSelectionLogic(parameters, this); } @Override - public void init() { - super.init(); - this.distributorSelectionLogic.set(new DistributorSelectionLogic(parameters, this)); - } - - @Override - public void doSelect(RoutingContext context) { + public void select(RoutingContext context) { if (context.shouldTrace(1)) { context.trace(1, "Selecting route"); } @@ -567,7 +559,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context); if (context.hasReply()) return; - String targetSpec = distributorSelectionLogic.get().getTargetSpec(context, bucketId); + String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId); if (context.hasReply()) return; if (targetSpec != null) { Route route = new Route(context.getRoute()); @@ -590,9 +582,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } if (reply instanceof WrongDistributionReply) { - distributorSelectionLogic.get().handleWrongDistribution((WrongDistributionReply) reply, context); + distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context); } else if (reply.hasErrors()) { - distributorSelectionLogic.get().handleErrorReply(reply, context.getContext()); + distributorSelectionLogic.handleErrorReply(reply, context.getContext()); } else if (reply instanceof WriteDocumentReply) { if (context.shouldTrace(9)) { context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp()); @@ -603,6 +595,6 @@ public class StoragePolicy extends ExternalSlobrokPolicy { @Override public void destroy() { - distributorSelectionLogic.get().destroy(); + distributorSelectionLogic.destroy(); } } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java index 20b3c2fb6b4..404a3660208 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -47,7 +47,7 @@ public class TargetCachingSlobrokHostFetcherTest { } static class Fixture { - ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class); + SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class); IMirror mockMirror = mock(IMirror.class); StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java index fd9d3f78ca8..fd5f43c23c1 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java @@ -59,9 +59,11 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * @author Simon Thoresen Hult @@ -101,7 +103,7 @@ public class PolicyTestCase { policy = new DocumentProtocol(manager).createPolicy("SubsetService", null); assertTrue(policy instanceof SubsetServicePolicy); - policy = new DocumentProtocol(manager).createPolicy("LoadBalancer", null); + policy = new DocumentProtocol(manager).createPolicy("LoadBalancer", "cluster=docproc/cluster.default;session=chain.default"); assertTrue(policy instanceof LoadBalancerPolicy); } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java index 304630861cf..6a3e6a172c3 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java @@ -5,10 +5,9 @@ import com.yahoo.collections.Pair; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentTypeManagerConfigurer; -import com.yahoo.documentapi.messagebus.protocol.AsyncInitializationPolicy; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy; -import com.yahoo.documentapi.messagebus.protocol.ExternalSlobrokPolicy; +import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory; import com.yahoo.documentapi.messagebus.protocol.StoragePolicy; @@ -149,16 +148,16 @@ public abstract class StoragePolicyTestEnvironment { private final Distribution distribution; public TestParameters(String parameters, Set<Integer> nodes) { - super(AsyncInitializationPolicy.parse(parameters)); + super(SlobrokPolicy.parse(parameters)); hostFetcher = new TestHostFetcher(getClusterName(), nodes); distribution = new Distribution(Distribution.getDefaultDistributionConfig(2, 10)); } @Override - public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) { return hostFetcher; } + public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; } @Override - public Distribution createDistribution(ExternalSlobrokPolicy policy) { return distribution; } + public Distribution createDistribution(SlobrokPolicy policy) { return distribution; } } public static class StoragePolicyTestFactory implements RoutingPolicyFactory { @@ -172,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment { public DocumentProtocolRoutingPolicy createPolicy(String parameters) { parameterInstances.addLast(new TestParameters(parameters, nodes)); ((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom); - return new StoragePolicy(parameterInstances.getLast(), AsyncInitializationPolicy.parse(parameters)); + return new StoragePolicy(parameterInstances.getLast()); } public void avoidPickingAtRandom(Integer distributor) { avoidPickingAtRandom = distributor; diff --git a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java index 7d6d75e0ee2..0063011e41c 100644 --- a/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java +++ b/vdslib/src/main/java/com/yahoo/vdslib/distribution/Distribution.java @@ -3,13 +3,27 @@ package com.yahoo.vdslib.distribution; import com.yahoo.collections.BobHash; import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.DiskState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; import com.yahoo.vespa.config.content.StorDistributionConfig; -import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.vdslib.state.*; import com.yahoo.document.BucketId; -import java.util.*; import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.TreeMap; +import java.util.TreeSet; public class Distribution { @@ -27,7 +41,7 @@ public class Distribution { return redundancy; } - private ConfigSubscriber.SingleSubscriber<StorDistributionConfig> configSubscriber = new ConfigSubscriber.SingleSubscriber<StorDistributionConfig>() { + private ConfigSubscriber.SingleSubscriber<StorDistributionConfig> configSubscriber = new ConfigSubscriber.SingleSubscriber<>() { private int[] getGroupPath(String path) { if (path.equals("invalid")) { return new int[0]; } StringTokenizer st = new StringTokenizer(path, "."); @@ -65,7 +79,6 @@ public class Distribution { if (path.length == 0) { root = group; } else { - assert(root != null); Group parent = root; for (int j=0; j<path.length - 1; ++j) { parent = parent.getSubgroups().get(path[j]); @@ -90,19 +103,12 @@ public class Distribution { }; public Distribution(String configId) { - this(configId, null); - } - public Distribution(String configId, ConfigSourceSet configSources) { int mask = 0; for (int i=0; i<=64; ++i) { distributionBitMasks[i] = mask; mask = (mask << 1) | 1; } - if (configSources==null) { - configSub = new ConfigSubscriber(); - } else { - configSub = new ConfigSubscriber(configSources); - } + configSub = new ConfigSubscriber(); configSub.subscribe(configSubscriber, StorDistributionConfig.class, configId); } @@ -213,7 +219,7 @@ public class Distribution { return group.compareTo(o.group); } } - public void getIdealGroups(BucketId bucketId, ClusterState clusterState, Group parent, + private void getIdealGroups(BucketId bucketId, ClusterState clusterState, Group parent, int redundancy, List<ResultGroup> results) { if (parent.isLeafGroup()) { results.add(new ResultGroup(parent, redundancy)); @@ -311,7 +317,7 @@ public class Distribution { return idealDisk; } - public List<Integer> getIdealStorageNodes(ClusterState clusterState, BucketId bucket, + List<Integer> getIdealStorageNodes(ClusterState clusterState, BucketId bucket, String upStates) throws TooFewBucketBitsInUseException { List<Integer> resultNodes = new ArrayList<>(); @@ -399,12 +405,12 @@ public class Distribution { } public static class TooFewBucketBitsInUseException extends Exception { - public TooFewBucketBitsInUseException(String message) { + TooFewBucketBitsInUseException(String message) { super(message); } } public static class NoDistributorsAvailableException extends Exception { - public NoDistributorsAvailableException(String message) { + NoDistributorsAvailableException(String message) { super(message); } } @@ -504,7 +510,7 @@ public class Distribution { public static String getSimpleGroupConfig(int redundancy, int nodeCount) { return getSimpleGroupConfig(redundancy, nodeCount, StorDistributionConfig.Disk_distribution.Enum.MODULO_BID); } - public static String getSimpleGroupConfig(int redundancy, int nodeCount, StorDistributionConfig.Disk_distribution.Enum diskDistribution) { + private static String getSimpleGroupConfig(int redundancy, int nodeCount, StorDistributionConfig.Disk_distribution.Enum diskDistribution) { StringBuilder sb = new StringBuilder(); sb.append("raw:redundancy ").append(redundancy).append("\n").append("group[4]\n"); |