summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 15:09:24 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-26 15:09:24 +0200
commitafba2a1b55183e7fc89da97a2f8535365021e723 (patch)
tree9095d4fbef86d5b8f332aacfd89682838449bc61 /documentapi
parent04bed07efb4b196047681f91f79bd531966cd37c (diff)
Get rid of very complicated and inherently thread unsafe code as external slobrok/configserver support is long gone.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java139
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java1
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java129
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java9
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java24
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java63
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java44
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java2
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java6
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java11
11 files changed, 110 insertions, 320 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java
deleted file mode 100644
index b88c8cc3b00..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/AsyncInitializationPolicy.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.routing.RoutingContext;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.logging.Logger;
-
-/**
- * Abstract class for routing policies that need asynchronous initialization.
- * This is recommended if the routing policy needs configuration, or synchronization with
- * other sources. If this policy is not used in those cases, the messagebus thread might hang
- * waiting for the other sources, causing messages to other routes to be blocked.
- */
-public abstract class AsyncInitializationPolicy implements DocumentProtocolRoutingPolicy, Runnable {
-
- protected enum InitState {
- NOT_STARTED,
- RUNNING,
- DONE
- }
-
- private static final Logger log = Logger.getLogger(AsyncInitializationPolicy.class.getName());
-
- private InitState initState;
- private ScheduledThreadPoolExecutor executor;
- private Exception initException;
- private boolean syncInit = true;
-
- public static Map<String, String> parse(String param) {
- Map<String, String> map = new TreeMap<>();
-
- if (param != null) {
- String[] p = param.split(";");
- for (String s : p) {
- String[] keyValue = s.split("=");
-
- if (keyValue.length == 1) {
- map.put(keyValue[0], "true");
- } else if (keyValue.length == 2) {
- map.put(keyValue[0], keyValue[1]);
- }
- }
- }
-
- return map;
- }
-
- AsyncInitializationPolicy() {
- initState = InitState.NOT_STARTED;
- }
-
- synchronized void needAsynchronousInitialization() {
- syncInit = false;
- }
-
- public abstract void init();
-
- public abstract void doSelect(RoutingContext routingContext);
-
- private synchronized void checkStartInit() {
- if (initState == InitState.NOT_STARTED) {
- if (syncInit) {
- init();
- initState = InitState.DONE;
- } else {
- executor = new ScheduledThreadPoolExecutor(1);
- executor.execute(this);
- initState = InitState.RUNNING;
- }
- }
- }
-
- @Override
- public void select(RoutingContext routingContext) {
- synchronized (this) {
- if (initException != null) {
- Reply reply = new EmptyReply();
- reply.addError(new com.yahoo.messagebus.Error(ErrorCode.POLICY_ERROR, "Policy threw exception during init:" + exceptionMessageWithTrace(initException)));
- routingContext.setReply(reply);
- return;
- }
-
- checkStartInit();
-
- if (initState == InitState.RUNNING) {
- Reply reply = new EmptyReply();
- reply.addError(new com.yahoo.messagebus.Error(ErrorCode.SESSION_BUSY, "Policy is waiting to be initialized."));
- routingContext.setReply(reply);
- return;
- }
- }
-
- doSelect(routingContext);
- }
-
- public void run() {
- try {
- init();
- } catch (Exception e) {
- log.log(LogLevel.WARNING,"Init threw exception",e);
- initException = e;
- }
-
- synchronized (this) {
- initState = InitState.DONE;
- this.notifyAll();
- }
- }
-
- @Override
- public synchronized void destroy() {
- if (executor != null) {
- executor.shutdownNow();
- }
- }
-
-
- private static String exceptionMessageWithTrace(Exception e) {
- StringWriter sw = new StringWriter();
- try (PrintWriter pw = new PrintWriter(sw)) {
- e.printStackTrace(pw);
- pw.flush();
- }
- return sw.toString();
-
- }
-
-
-
-}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
index 3ece2217601..d6e20b9d57f 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
@@ -33,7 +33,7 @@ public class ContentPolicy extends StoragePolicy {
}
public ContentPolicy(Map<String, String> params) {
- super(new ContentParameters(params), params);
+ super(new ContentParameters(params));
}
public ContentPolicy(String parameters) {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java
index 77bc904ab12..450f53c4440 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocolRoutingPolicy.java
@@ -3,7 +3,6 @@ package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.messagebus.routing.RoutingPolicy;
-
/**
* @author thomasg
*/
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java
deleted file mode 100644
index e791cdcf008..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ExternalSlobrokPolicy.java
+++ /dev/null
@@ -1,129 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.config.subscription.ConfigSourceSet;
-import com.yahoo.config.subscription.ConfigSubscriber;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Transport;
-import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.jrt.slobrok.api.SlobrokList;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.cloud.config.SlobroksConfig;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Abstract class for policies that allow you to specify which slobrok to use for the
- * routing.
- */
-public abstract class ExternalSlobrokPolicy extends AsyncInitializationPolicy implements ConfigSubscriber.SingleSubscriber<SlobroksConfig> {
- String error;
- private Supervisor orb = null;
- private final AtomicReference<Mirror> safeMirror = new AtomicReference<>(null);
- private SlobrokList slobroks = null;
- private boolean firstTry = true;
- private ConfigSubscriber subscriber;
- String[] configSources = null;
- private final static String slobrokConfigId = "admin/slobrok.0";
-
-
- ExternalSlobrokPolicy(Map<String, String> param) {
- super();
-
- String conf = param.get("config");
- if (conf != null) {
- configSources = conf.split(",");
- }
-
- String slbrk = param.get("slobroks");
- if (slbrk != null) {
- slobroks = new SlobrokList();
- slobroks.setup(slbrk.split(","));
- }
-
- if (slobroks != null || configSources != null) {
- needAsynchronousInitialization();
- }
- }
-
- @Override
- public synchronized void init() {
- if (slobroks != null) {
- orb = new Supervisor(new Transport());
- safeMirror.set(new Mirror(orb, slobroks));
- }
-
- if (configSources != null) {
- if (safeMirror.get() == null) {
- orb = new Supervisor(new Transport());
- subscriber = subscribe(slobrokConfigId, new ConfigSourceSet(configSources));
- }
- }
- }
-
- private ConfigSubscriber subscribe(String configId, final ConfigSourceSet configSourceSet) {
- ConfigSubscriber subscriber = new ConfigSubscriber(configSourceSet);
- subscriber.subscribe(this, SlobroksConfig.class, configId);
- return subscriber;
- }
-
- public IMirror getMirror() {
- return safeMirror.get();
- }
-
- public List<Mirror.Entry> lookup(RoutingContext context, String pattern) {
- IMirror mirror1 = getMirror();
- if (mirror1 == null) {
- mirror1 = context.getMirror();
- }
-
- List<Mirror.Entry> arr = mirror1.lookup(pattern);
-
- if (arr.isEmpty()) {
- synchronized(this) {
- if (firstTry) {
- try {
- int count = 0;
- while (arr.isEmpty() && count < 100) {
- Thread.sleep(50);
- arr = mirror1.lookup(pattern);
- count++;
- }
- } catch (InterruptedException e) {
- }
- firstTry = true;
- }
-
- }
- }
-
- return arr;
- }
-
- @Override
- public synchronized void configure(SlobroksConfig config) {
- String[] slist = new String[config.slobrok().size()];
-
- for(int i = 0; i < config.slobrok().size(); i++) {
- slist[i] = config.slobrok(i).connectionspec();
- }
- if (slobroks == null) {
- slobroks = new SlobrokList();
- }
- slobroks.setup(slist);
- if (safeMirror.get() == null) {
- safeMirror.set(new Mirror(orb, slobroks));
- }
-
- }
-
- @Override
- public synchronized void destroy() {
- if (subscriber!=null) subscriber.close();
- }
-
-}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
index a2875f14ab5..054b120cf81 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancer.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
+import com.google.common.util.concurrent.AtomicDouble;
import com.yahoo.jrt.slobrok.api.Mirror;
import java.util.List;
@@ -28,10 +29,10 @@ public class LoadBalancer {
}
/** Statistics on each node we are load balancing over. Populated lazily. */
- private List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>();
+ private final List<NodeMetrics> nodeWeights = new CopyOnWriteArrayList<>();
- private String cluster;
- private double position = 0.0;
+ private final String cluster;
+ private final AtomicDouble safePosition = new AtomicDouble(0.0);
public LoadBalancer(String cluster) {
this.cluster = cluster;
@@ -67,6 +68,7 @@ public class LoadBalancer {
double weightSum = 0.0;
Node selectedNode = null;
+ double position = safePosition.get();
for (Mirror.Entry entry : choices) {
NodeMetrics nodeMetrics = getNodeMetrics(entry);
@@ -82,6 +84,7 @@ public class LoadBalancer {
selectedNode = new Node(choices.get(0), getNodeMetrics(choices.get(0)));
}
position += 1.0;
+ safePosition.set(position);
selectedNode.metrics.sent.incrementAndGet();
return selectedNode;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
index 9cf82144e71..3d129684465 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/LoadBalancerPolicy.java
@@ -23,32 +23,28 @@ import java.util.Map;
*
* @author <a href="mailto:humbe@yahoo-inc.com">Haakon Humberset</a>
*/
-public class LoadBalancerPolicy extends ExternalSlobrokPolicy {
+public class LoadBalancerPolicy extends SlobrokPolicy {
private final String session;
private final String pattern;
- private LoadBalancer loadBalancer;
+ private final LoadBalancer loadBalancer;
LoadBalancerPolicy(String param) {
this(parse(param));
}
private LoadBalancerPolicy(Map<String, String> params) {
- super(params);
+ super();
String cluster = params.get("cluster");
session = params.get("session");
if (cluster == null) {
- error = "Required parameter pattern not set";
- pattern = null;
- return;
+ throw new IllegalArgumentException("Required parameter 'cluster' not set");
}
if (session == null) {
- error = "Required parameter session not set";
- pattern = null;
- return;
+ throw new IllegalArgumentException("Required parameter 'session' not set");
}
pattern = cluster + "/*/" + session;
@@ -56,7 +52,7 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy {
}
@Override
- public void doSelect(RoutingContext context) {
+ public void select(RoutingContext context) {
LoadBalancer.Node node = getRecipient(context);
if (node != null) {
@@ -65,8 +61,7 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy {
route.setHop(0, Hop.parse(node.entry.getSpec() + "/" + session));
context.addChild(route);
} else {
- context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "Could not resolve any nodes to send to in pattern " + pattern);
+ context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE, "Could not resolve any nodes to send to in pattern " + pattern);
}
}
@@ -95,4 +90,9 @@ public class LoadBalancerPolicy extends ExternalSlobrokPolicy {
context.setReply(reply);
}
+
+ @Override
+ public void destroy() {
+
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java
new file mode 100644
index 00000000000..864c45218d3
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SlobrokPolicy.java
@@ -0,0 +1,63 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.messagebus.routing.RoutingContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Abstract class for policies that allow you to specify which slobrok to use for the
+ * routing.
+ */
+public abstract class SlobrokPolicy implements DocumentProtocolRoutingPolicy {
+ private boolean firstTry = true;
+
+ protected List<Mirror.Entry> lookup(RoutingContext context, String pattern) {
+ IMirror mirror1 = context.getMirror();
+
+ List<Mirror.Entry> arr = mirror1.lookup(pattern);
+
+ if (arr.isEmpty()) {
+ synchronized(this) {
+ if (firstTry) {
+ try {
+ int count = 0;
+ while (arr.isEmpty() && count < 100) {
+ Thread.sleep(50);
+ arr = mirror1.lookup(pattern);
+ count++;
+ }
+ } catch (InterruptedException e) {
+ }
+ firstTry = true;
+ }
+ }
+ }
+
+ return arr;
+ }
+
+ public static Map<String, String> parse(String param) {
+ Map<String, String> map = new TreeMap<>();
+
+ if (param != null) {
+ String[] p = param.split(";");
+ for (String s : p) {
+ String[] keyValue = s.split("=");
+
+ if (keyValue.length == 1) {
+ map.put(keyValue[0], "true");
+ } else if (keyValue.length == 2) {
+ map.put(keyValue[0], keyValue[1]);
+ }
+ }
+ }
+
+ return map;
+ }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 60589432ea7..918bd193d89 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -2,7 +2,6 @@
package com.yahoo.documentapi.messagebus.protocol;
import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.jrt.slobrok.api.IMirror;
@@ -46,7 +45,7 @@ import java.util.logging.Logger;
*
* @author Haakon Humberset
*/
-public class StoragePolicy extends ExternalSlobrokPolicy {
+public class StoragePolicy extends SlobrokPolicy {
private static final Logger log = Logger.getLogger(StoragePolicy.class.getName());
public static final String owningBucketStates = "uim";
@@ -123,9 +122,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
/** Host fetcher using a slobrok mirror to find the hosts. */
public static class SlobrokHostFetcher extends HostFetcher {
private final SlobrokHostPatternGenerator patternGenerator;
- private final ExternalSlobrokPolicy policy;
+ private final SlobrokPolicy policy;
- SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) {
+ SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
super(percent);
this.patternGenerator = patternGenerator;
this.policy = policy;
@@ -182,7 +181,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);
- TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy, int percent) {
+ TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
super(patternGenerator, policy, percent);
}
@@ -240,13 +239,11 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
public SlobrokHostPatternGenerator createPatternGenerator() {
return new SlobrokHostPatternGenerator(getClusterName());
}
- public HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) {
+ public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
}
- public Distribution createDistribution(ExternalSlobrokPolicy policy) {
- return (policy.configSources != null ?
- new Distribution(getDistributionConfigId(), new ConfigSourceSet(policy.configSources))
- : new Distribution(getDistributionConfigId()));
+ public Distribution createDistribution(SlobrokPolicy policy) {
+ return new Distribution(getDistributionConfigId());
}
/**
@@ -351,7 +348,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection
- DistributorSelectionLogic(Parameters params, ExternalSlobrokPolicy policy) {
+ DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
this.hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
this.distribution = params.createDistribution(policy);
persistentFailureChecker = new InstabilityChecker(params.getAttemptRandomOnFailuresLimit());
@@ -534,7 +531,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
- private final AtomicReference<DistributorSelectionLogic> distributorSelectionLogic = new AtomicReference<>();
+ private final DistributorSelectionLogic distributorSelectionLogic;
private final Parameters parameters;
/** Constructor used in production. */
@@ -543,23 +540,18 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
public StoragePolicy(Map<String, String> params) {
- this(new Parameters(params), params);
+ this(new Parameters(params));
}
/** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
- public StoragePolicy(Parameters p, Map<String, String> params) {
- super(params);
+ public StoragePolicy(Parameters p) {
+ super();
parameters = p;
+ distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
}
@Override
- public void init() {
- super.init();
- this.distributorSelectionLogic.set(new DistributorSelectionLogic(parameters, this));
- }
-
- @Override
- public void doSelect(RoutingContext context) {
+ public void select(RoutingContext context) {
if (context.shouldTrace(1)) {
context.trace(1, "Selecting route");
}
@@ -567,7 +559,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
if (context.hasReply()) return;
- String targetSpec = distributorSelectionLogic.get().getTargetSpec(context, bucketId);
+ String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
if (context.hasReply()) return;
if (targetSpec != null) {
Route route = new Route(context.getRoute());
@@ -590,9 +582,9 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
if (reply instanceof WrongDistributionReply) {
- distributorSelectionLogic.get().handleWrongDistribution((WrongDistributionReply) reply, context);
+ distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
} else if (reply.hasErrors()) {
- distributorSelectionLogic.get().handleErrorReply(reply, context.getContext());
+ distributorSelectionLogic.handleErrorReply(reply, context.getContext());
} else if (reply instanceof WriteDocumentReply) {
if (context.shouldTrace(9)) {
context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
@@ -603,6 +595,6 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
@Override
public void destroy() {
- distributorSelectionLogic.get().destroy();
+ distributorSelectionLogic.destroy();
}
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
index 20b3c2fb6b4..404a3660208 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
@@ -47,7 +47,7 @@ public class TargetCachingSlobrokHostFetcherTest {
}
static class Fixture {
- ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class);
+ SlobrokPolicy mockSlobrokPolicy = mock(SlobrokPolicy.class);
IMirror mockMirror = mock(IMirror.class);
StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo");
StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy, 60);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
index fd9d3f78ca8..fd5f43c23c1 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
@@ -59,9 +59,11 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* @author Simon Thoresen Hult
@@ -101,7 +103,7 @@ public class PolicyTestCase {
policy = new DocumentProtocol(manager).createPolicy("SubsetService", null);
assertTrue(policy instanceof SubsetServicePolicy);
- policy = new DocumentProtocol(manager).createPolicy("LoadBalancer", null);
+ policy = new DocumentProtocol(manager).createPolicy("LoadBalancer", "cluster=docproc/cluster.default;session=chain.default");
assertTrue(policy instanceof LoadBalancerPolicy);
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
index 304630861cf..6a3e6a172c3 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/storagepolicy/StoragePolicyTestEnvironment.java
@@ -5,10 +5,9 @@ import com.yahoo.collections.Pair;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
-import com.yahoo.documentapi.messagebus.protocol.AsyncInitializationPolicy;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolRoutingPolicy;
-import com.yahoo.documentapi.messagebus.protocol.ExternalSlobrokPolicy;
+import com.yahoo.documentapi.messagebus.protocol.SlobrokPolicy;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RoutingPolicyFactory;
import com.yahoo.documentapi.messagebus.protocol.StoragePolicy;
@@ -149,16 +148,16 @@ public abstract class StoragePolicyTestEnvironment {
private final Distribution distribution;
public TestParameters(String parameters, Set<Integer> nodes) {
- super(AsyncInitializationPolicy.parse(parameters));
+ super(SlobrokPolicy.parse(parameters));
hostFetcher = new TestHostFetcher(getClusterName(), nodes);
distribution = new Distribution(Distribution.getDefaultDistributionConfig(2, 10));
}
@Override
- public StoragePolicy.HostFetcher createHostFetcher(ExternalSlobrokPolicy policy, int percent) { return hostFetcher; }
+ public StoragePolicy.HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) { return hostFetcher; }
@Override
- public Distribution createDistribution(ExternalSlobrokPolicy policy) { return distribution; }
+ public Distribution createDistribution(SlobrokPolicy policy) { return distribution; }
}
public static class StoragePolicyTestFactory implements RoutingPolicyFactory {
@@ -172,7 +171,7 @@ public abstract class StoragePolicyTestEnvironment {
public DocumentProtocolRoutingPolicy createPolicy(String parameters) {
parameterInstances.addLast(new TestParameters(parameters, nodes));
((TestHostFetcher) parameterInstances.getLast().createHostFetcher(null, 60)).setAvoidPickingAtRandom(avoidPickingAtRandom);
- return new StoragePolicy(parameterInstances.getLast(), AsyncInitializationPolicy.parse(parameters));
+ return new StoragePolicy(parameterInstances.getLast());
}
public void avoidPickingAtRandom(Integer distributor) {
avoidPickingAtRandom = distributor;