summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java3
-rw-r--r--config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java1
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java6
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java2
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java7
-rwxr-xr-xconfig-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java4
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java30
-rw-r--r--config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java10
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java2
-rwxr-xr-xconfig-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java2
-rw-r--r--configdefinitions/src/vespa/zookeeper-server.def3
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java2
-rw-r--r--container-search/src/main/resources/configdefinitions/search.config.qr-start.def2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java75
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java3
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java21
-rw-r--r--searchcore/src/apps/tests/persistenceconformance_test.cpp8
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/field_merger.h1
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp23
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h21
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp17
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp96
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h20
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/arrayqueue.hpp6
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/pom.xml9
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java8
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java45
-rw-r--r--zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java258
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/pom.xml9
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java8
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java47
-rw-r--r--zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java258
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java11
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java66
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java2
-rw-r--r--zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java69
43 files changed, 364 insertions, 839 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
index 6108c39f9d3..6b46e0957ff 100644
--- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
+++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java
@@ -82,7 +82,8 @@ public interface ModelContext {
@ModelFeatureFlag(owners = {"baldersheim"}) default boolean skipMbusReplyThread() { throw new UnsupportedOperationException("TODO specify default value"); }
@ModelFeatureFlag(owners = {"baldersheim"}) default boolean useAsyncMessageHandlingOnSchedule() { throw new UnsupportedOperationException("TODO specify default value"); }
@ModelFeatureFlag(owners = {"baldersheim"}) default double feedConcurrency() { throw new UnsupportedOperationException("TODO specify default value"); }
- @ModelFeatureFlag(owners = {"baldersheim"}) default int metricsproxyNumThreads() { throw new UnsupportedOperationException("TODO specify default value"); }
+ @ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.527") default int metricsproxyNumThreads() { return defaultPoolNumThreads(); }
+ @ModelFeatureFlag(owners = {"baldersheim"}) default int defaultPoolNumThreads() { return 2; }
@ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.527") default int largeRankExpressionLimit() { return 8192; }
@ModelFeatureFlag(owners = {"baldersheim"}) default int maxUnCommittedMemory() { return 130000; }
@ModelFeatureFlag(owners = {"baldersheim"}) default int maxConcurrentMergesPerNode() { return 16; }
diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
index c148bb0e6e4..40b88372348 100644
--- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
+++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java
@@ -118,7 +118,6 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea
@Override public double resourceLimitDisk() { return resourceLimitDisk; }
@Override public double resourceLimitMemory() { return resourceLimitMemory; }
@Override public double minNodeRatioPerGroup() { return minNodeRatioPerGroup; }
- @Override public int metricsproxyNumThreads() { return 1; }
@Override public double containerShutdownTimeout() { return containerShutdownTimeout; }
@Override public boolean containerDumpHeapOnShutdownTimeout() { return containerDumpHeapOnShutdownTimeout; }
@Override public int distributorMergeBusyWait() { return distributorMergeBusyWait; }
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java
index 43f66f2c727..75b13a89e83 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java
@@ -17,7 +17,7 @@ import java.util.Optional;
public class LogserverContainerCluster extends ContainerCluster<LogserverContainer> {
public LogserverContainerCluster(AbstractConfigProducer<?> parent, String name, DeployState deployState) {
- super(parent, name, name, deployState, true);
+ super(parent, name, name, deployState, true, deployState.featureFlags().defaultPoolNumThreads());
addDefaultHandlersWithVip();
addLogHandler();
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
index 0b99496a9b4..6e6f027b520 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java
@@ -50,12 +50,14 @@ public class ClusterControllerCluster extends AbstractConfigProducer<ClusterCont
public void getConfig(ZookeeperServerConfig.Builder builder) {
builder.clientPort(ZK_CLIENT_PORT);
builder.juteMaxBuffer(1024 * 1024); // 1 Mb should be more than enough for cluster controller
+ boolean oldQuorumExists = containerCluster.getContainers().stream() // More than half the previous hosts must be present in the new config for quorum to persist.
+ .filter(container -> previousHosts.contains(container.getHostName())) // Set intersection is symmetric.
+ .count() > previousHosts.size() / 2;
for (ClusterControllerContainer container : containerCluster.getContainers()) {
ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder();
serverBuilder.hostname(container.getHostName());
serverBuilder.id(container.index());
- serverBuilder.joining( ! previousHosts.isEmpty() && ! previousHosts.contains(container.getHostName()));
- serverBuilder.retired(container.isRetired());
+ serverBuilder.joining(oldQuorumExists && ! previousHosts.contains(container.getHostName()));
builder.server(serverBuilder);
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
index 728e46f2ff7..a7f3a6224f2 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java
@@ -22,7 +22,7 @@ public class ClusterControllerContainerCluster extends ContainerCluster<ClusterC
public ClusterControllerContainerCluster(
AbstractConfigProducer<?> parent, String subId, String name, DeployState deployState) {
- super(parent, subId, name, deployState, false);
+ super(parent, subId, name, deployState, false, deployState.featureFlags().defaultPoolNumThreads());
addDefaultHandlersWithVip();
this.reindexingContext = createReindexingContext(deployState);
setJvmGCOptions(deployState.getProperties().jvmGCOptions(Optional.of(ClusterSpec.Type.admin)));
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java
index dd6f77ed093..a29647b062a 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java
@@ -87,7 +87,7 @@ public class MetricsProxyContainerCluster extends ContainerCluster<MetricsProxyC
private final ApplicationId applicationId;
public MetricsProxyContainerCluster(AbstractConfigProducer<?> parent, String name, DeployState deployState) {
- super(parent, name, name, deployState, true);
+ super(parent, name, name, deployState, true, deployState.featureFlags().defaultPoolNumThreads());
this.parent = parent;
applicationId = deployState.getProperties().applicationId();
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
index 438e143bdfd..89c455269f4 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
@@ -94,7 +94,7 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat
private List<ApplicationClusterEndpoint> endpointList = List.of();
public ApplicationContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState) {
- super(parent, configSubId, clusterId, deployState, true);
+ super(parent, configSubId, clusterId, deployState, true, 10);
this.tlsClientAuthority = deployState.tlsClientAuthority();
previousHosts = deployState.getPreviousModel().stream()
.map(Model::allocatedHosts)
@@ -309,9 +309,8 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat
ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder();
serverBuilder.hostname(container.getHostName())
.id(container.index())
- .joining( ! previousHosts.isEmpty() &&
- ! previousHosts.contains(container.getHostName()))
- .retired(container.isRetired());
+ .joining(!previousHosts.isEmpty() &&
+ !previousHosts.contains(container.getHostName()));
builder.server(serverBuilder);
builder.dynamicReconfiguration(true);
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java
index c73a3b2a676..7010d7b3d4e 100755
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java
@@ -160,7 +160,7 @@ public abstract class ContainerCluster<CONTAINER extends Container>
private boolean deferChangesUntilRestart = false;
- public ContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState, boolean zooKeeperLocalhostAffinity) {
+ public ContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState, boolean zooKeeperLocalhostAffinity, int defaultPoolNumThreads) {
super(parent, configSubId);
this.name = clusterId;
this.isHostedVespa = stateIsHosted(deployState);
@@ -176,7 +176,7 @@ public abstract class ContainerCluster<CONTAINER extends Container>
addComponent(new StatisticsComponent());
addSimpleComponent(AccessLog.class);
- addComponent(new DefaultThreadpoolProvider(this, deployState.featureFlags().metricsproxyNumThreads()));
+ addComponent(new DefaultThreadpoolProvider(this, defaultPoolNumThreads));
addSimpleComponent(com.yahoo.concurrent.classlock.ClassLocking.class);
addSimpleComponent("com.yahoo.container.jdisc.metric.MetricConsumerProviderProvider");
addSimpleComponent("com.yahoo.container.jdisc.metric.MetricProvider");
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java b/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java
index e0d4f3c0692..0b37abaded9 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java
@@ -5,7 +5,6 @@ import com.yahoo.container.bundle.BundleInstantiationSpecification;
import com.yahoo.container.handler.ThreadPoolProvider;
import com.yahoo.container.handler.ThreadpoolConfig;
import com.yahoo.osgi.provider.model.ComponentModel;
-import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainerCluster;
import com.yahoo.vespa.model.container.component.SimpleComponent;
/**
@@ -16,38 +15,27 @@ import com.yahoo.vespa.model.container.component.SimpleComponent;
class DefaultThreadpoolProvider extends SimpleComponent implements ThreadpoolConfig.Producer {
private final ContainerCluster<?> cluster;
- private final int metricsproxyNumThreads;
+ private final int defaultWorkerThreads;
- DefaultThreadpoolProvider(ContainerCluster<?> cluster, int metricsproxyNumThreads) {
+ DefaultThreadpoolProvider(ContainerCluster<?> cluster, int defaultWorkerThreads) {
super(new ComponentModel(
BundleInstantiationSpecification.getFromStrings(
"default-threadpool",
ThreadPoolProvider.class.getName(),
null)));
this.cluster = cluster;
- this.metricsproxyNumThreads = metricsproxyNumThreads;
- }
-
- private int defaultThreadsByClusterType() {
- if (cluster instanceof MetricsProxyContainerCluster) {
- return metricsproxyNumThreads;
- }
- return 10;
+ this.defaultWorkerThreads = defaultWorkerThreads;
}
@Override
public void getConfig(ThreadpoolConfig.Builder builder) {
- if (!(cluster instanceof ApplicationContainerCluster)) {
+ if (cluster instanceof ApplicationContainerCluster) {
+ // Core pool size of 2xcores, and max of 100xcores and using a synchronous Q
+ // This is the deafault pool used by both federation and generally when you ask for an Executor.
+ builder.corePoolSize(-2).maxthreads(-100).queueSize(0);
+ } else {
// Container clusters such as logserver, metricsproxy and clustercontroller
- int defaultWorkerThreads = defaultThreadsByClusterType();
- builder.maxthreads(defaultWorkerThreads);
- builder.corePoolSize(defaultWorkerThreads);
- builder.queueSize(50);
- return;
+ builder.corePoolSize(defaultWorkerThreads).maxthreads(defaultWorkerThreads).queueSize(50);
}
-
- // Core pool size of 2xcores, and max of 100xcores and using a synchronous Q
- // This is the deafault pool used by both federation and generally when you ask for an Executor.
- builder.corePoolSize(-2).maxthreads(-100).queueSize(0);
}
}
diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
index ba70b7493a2..10f883bdc75 100644
--- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
+++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java
@@ -2054,7 +2054,7 @@ public class ModelProvisioningTest {
assertTrue("Initial servers are not joining", config.build().server().stream().noneMatch(ZookeeperServerConfig.Server::joining));
}
{
- VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(3), true, false, false, 0, Optional.of(model), new DeployState.Builder(), "node-1-3-10-04", "node-1-3-10-03");
+ VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(5), true, false, false, 0, Optional.of(model), new DeployState.Builder());
ApplicationContainerCluster cluster = nextModel.getContainerClusters().get("zk");
ZookeeperServerConfig.Builder config = new ZookeeperServerConfig.Builder();
cluster.getContainers().forEach(c -> c.getConfig(config));
@@ -2067,14 +2067,6 @@ public class ModelProvisioningTest {
4, true),
config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id,
ZookeeperServerConfig.Server::joining)));
- assertEquals("Retired nodes are retired",
- Map.of(0, false,
- 1, true,
- 2, true,
- 3, false,
- 4, false),
- config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id,
- ZookeeperServerConfig.Server::retired)));
}
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java
index 474013c17fc..41c7f6d72e2 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java
@@ -397,7 +397,7 @@ public class ClusterControllerTestCase extends DomBuilderTest {
assertEquals("-XX:+UseG1GC -XX:MaxTenuringThreshold=15", qrStartConfig.jvm().gcopts());
assertEquals(512, qrStartConfig.jvm().stacksize());
assertEquals(0, qrStartConfig.jvm().directMemorySizeCache());
- assertEquals(75, qrStartConfig.jvm().baseMaxDirectMemorySize());
+ assertEquals(16, qrStartConfig.jvm().baseMaxDirectMemorySize());
assertReindexingConfigPresent(model);
assertReindexingConfiguredOnAdminCluster(model);
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java
index eb1cf668cc9..14a90130a57 100755
--- a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java
@@ -190,7 +190,7 @@ public class ContainerClusterTest {
root.freezeModelTopology();
ThreadpoolConfig threadpoolConfig = root.getConfig(ThreadpoolConfig.class, "container0/component/default-threadpool");
- assertEquals(10, threadpoolConfig.maxthreads());
+ assertEquals(2, threadpoolConfig.maxthreads());
assertEquals(50, threadpoolConfig.queueSize());
}
diff --git a/configdefinitions/src/vespa/zookeeper-server.def b/configdefinitions/src/vespa/zookeeper-server.def
index d80ccc4d042..04632ffd35f 100644
--- a/configdefinitions/src/vespa/zookeeper-server.def
+++ b/configdefinitions/src/vespa/zookeeper-server.def
@@ -32,13 +32,10 @@ juteMaxBuffer int default=52428800
myid int restart
server[].id int
server[].hostname string
-server[].clientPort int default=2181
server[].quorumPort int default=2182
server[].electionPort int default=2183
# Whether this server is joining an existing cluster
server[].joining bool default=false
-# Whether this server is retired, and about to be removed
-server[].retired bool default=false
# Needed when upgrading from ZooKeeper 3.4 to 3.5, see https://issues.apache.org/jira/browse/ZOOKEEPER-3056,
# and in general where there is a zookeeper ensemble running that has had few transactions.
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
index 5d944df2f30..db9869fa5f2 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java
@@ -273,7 +273,7 @@ public class ModelContextImpl implements ModelContext {
@Override public double resourceLimitDisk() { return resourceLimitDisk; }
@Override public double resourceLimitMemory() { return resourceLimitMemory; }
@Override public double minNodeRatioPerGroup() { return minNodeRatioPerGroup; }
- @Override public int metricsproxyNumThreads() { return metricsproxyNumThreads; }
+ @Override public int defaultPoolNumThreads() { return metricsproxyNumThreads; }
@Override public double containerShutdownTimeout() { return containerShutdownTimeout; }
@Override public boolean containerDumpHeapOnShutdownTimeout() { return containerDumpHeapOnShutdownTimeout; }
@Override public int distributorMergeBusyWait() { return distributorMergeBusyWait; }
diff --git a/container-search/src/main/resources/configdefinitions/search.config.qr-start.def b/container-search/src/main/resources/configdefinitions/search.config.qr-start.def
index e2856e137f0..c58f9944d61 100644
--- a/container-search/src/main/resources/configdefinitions/search.config.qr-start.def
+++ b/container-search/src/main/resources/configdefinitions/search.config.qr-start.def
@@ -24,7 +24,7 @@ jvm.stacksize int default=512 restart
jvm.compressedClassSpaceSize int default=32 restart
## Base value of maximum direct memory size (in megabytes)
-jvm.baseMaxDirectMemorySize int default=75 restart
+jvm.baseMaxDirectMemorySize int default=16 restart
## Amount of direct memory used for caching. (in megabytes)
jvm.directMemorySizeCache int default=0 restart
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java
deleted file mode 100644
index 553f2ffba36..00000000000
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hosted.node.admin.task.util.file;
-
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A very simple template engine when there's little complexity and lots of Velocity special characters $ and #,
- * i.e. typically shell script.
- *
- * @author hakonhall
- */
-public class Templar {
- private final String template;
-
- private static final String prefix = "<%=";
- private static final String suffix = "%>";
-
- private final Map<String, String> settings = new HashMap<>();
-
- public static Templar fromUtf8File(Path path) {
- return new Templar(new UnixPath(path).readUtf8File());
- }
-
- public Templar(String template) {
- this.template = template;
- }
-
- public Templar set(String name, String value) {
- settings.put(name, value);
- return this;
- }
-
- public String resolve() {
- StringBuilder text = new StringBuilder(template.length() * 2);
-
- int start= 0;
- int end;
-
- for (; start < template.length(); start = end) {
- int prefixStart = template.indexOf(prefix, start);
-
-
- if (prefixStart == -1) {
- text.append(template, start, template.length());
- break;
- } else {
- text.append(template, start, prefixStart);
- }
-
- int suffixStart = template.indexOf(suffix, prefixStart + prefix.length());
- if (suffixStart == -1) {
- throw new IllegalArgumentException("Prefix at offset " + prefixStart + " is not terminated");
- }
-
- int prefixEnd = prefixStart + prefix.length();
- String name = template.substring(prefixEnd, suffixStart).trim();
- String value = settings.get(name);
- if (value == null) {
- throw new IllegalArgumentException("No value is set for name '" + name + "' at offset " + prefixEnd);
- }
-
- text.append(value);
-
- end = suffixStart + suffix.length();
- }
-
- return text.toString();
- }
-
- public FileWriter getFileWriterTo(Path path) {
- return new FileWriter(path, this::resolve);
- }
-}
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
index ea55af17a0e..6d80ac2cad9 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java
@@ -26,12 +26,15 @@ import java.util.Optional;
* id: a valid Java identifier
* </pre>
*
+ * <p>Other directive delimiters than "%{" and "}" may be used, see {@link TemplateDescriptor}.</p>
+ *
* <p>Fill the template with variable values ({@link #set(String, String) set()}, set if conditions
* ({@link #set(String, boolean)}), add list elements ({@link #add(String) add()}, etc, and finally
* render it as a String ({@link #render()}).</p>
*
* <p>To reuse a template, create the template and work on snapshots of that ({@link #snapshot()}).</p>
*
+ * @see TemplateDescriptor
* @author hakonhall
*/
public class Template implements Form {
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java
deleted file mode 100644
index ed410ffc1d1..00000000000
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.hosted.node.admin.task.util.file;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author hakonhall
- */
-public class TemplarTest {
- @Test
- public void test() {
- Templar templar = new Templar("x y <%= foo %>, some other <%=bar%> text");
- templar.set("foo", "fidelity")
- .set("bar", "halimov")
- .set("not", "used");
-
- assertEquals("x y fidelity, some other halimov text", templar.resolve());
- }
-} \ No newline at end of file
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp
index 214c57557bc..385d0eb363e 100644
--- a/searchcore/src/apps/tests/persistenceconformance_test.cpp
+++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp
@@ -178,6 +178,12 @@ private:
MockSharedThreadingService _shared_service;
storage::spi::dummy::DummyBucketExecutor _bucketExecutor;
+ static std::shared_ptr<ProtonConfig> make_proton_config() {
+ ProtonConfigBuilder proton_config;
+ proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY;
+ return std::make_shared<ProtonConfig>(proton_config);
+ }
+
public:
DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort);
~DocumentDBFactory() override;
@@ -196,7 +202,7 @@ public:
TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB());
DocumentDBConfigHelper mgr(spec, docType.getName());
auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(),
- std::make_shared<ProtonConfig>(),
+ make_proton_config(),
std::make_shared<FiledistributorrpcConfig>(),
std::make_shared<BucketspacesConfig>(),
tuneFileDocDB, HwInfo());
diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
index 5017a7d5192..bbb4efc03b1 100644
--- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h
+++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h
@@ -53,7 +53,6 @@ class FieldMerger
std::unique_ptr<FieldWriter> _writer;
State _state;
bool _failed;
- bool _force_small_merge_chunk;
void make_tmp_dirs();
bool clean_tmp_dirs();
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index ef7f8bfb0f6..243935d4013 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture)
EXPECT_EQUAL(42, tv->_val);
}
+TEST_F("require that task with same component id are serialized when executed with list", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0);
+ ISequencedTaskExecutor::TaskList list;
+ list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); }));
+ list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); }));
+ f._threads->executeTasks(std::move(list));
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync_all();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
TEST_F("require that task with different component ids are not serialized", Fixture)
{
int tryCnt = 0;
@@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
namespace {
-int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
+int
+detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
{
int tryCnt = 0;
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
@@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
return tryCnt;
}
-vespalib::string makeAltComponentId(Fixture &f)
+vespalib::string
+makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index dd71380f64a..56352ff3c0d 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -30,6 +30,28 @@ TEST("test that all tasks are executed") {
EXPECT_EQUAL(10000u, counter);
}
+TEST("test that executor can overflow") {
+ constexpr size_t NUM_TASKS = 1000;
+ std::atomic<uint64_t> counter(0);
+ vespalib::Gate gate;
+ SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms);
+ executor.execute(makeLambdaTask([&gate] { gate.await();}));
+
+ for(size_t i(0); i < NUM_TASKS; i++) {
+ executor.execute(makeLambdaTask([&counter, i] {
+ EXPECT_EQUAL(i, counter);
+ counter++;
+ }));
+ }
+ EXPECT_EQUAL(0u, counter);
+ ExecutorStats stats = executor.getStats();
+ EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks);
+ EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max());
+ gate.countDown();
+ executor.sync();
+ EXPECT_EQUAL(NUM_TASKS, counter);
+}
+
void verifyResizeTaskLimit(bool up) {
std::mutex lock;
std::condition_variable cond;
@@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) {
constexpr uint32_t INITIAL = 20;
const uint32_t INITIAL_2inN = roundUp2inN(INITIAL);
double waterMarkRatio = 0.5;
- SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms);
+ SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms);
EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit());
EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark());
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index c54f182891c..b31d72da3b1 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
+void
+ISequencedTaskExecutor::executeTasks(TaskList tasks) {
+ for (auto & task : tasks) {
+ executeTask(task.first, std::move(task.second));
+ }
+}
+
ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const {
- vespalib::hash<vespalib::stringref> hashfun;
+ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const {
+ hash<stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 3fe6fb5d678..ff90556e3e4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -14,7 +14,7 @@ namespace vespalib {
* Interface class to run multiple tasks in parallel, but tasks with same
* id has to be run in sequence.
*/
-class ISequencedTaskExecutor : public vespalib::IWakeup
+class ISequencedTaskExecutor : public IWakeup
{
public:
class ExecutorId {
@@ -28,6 +28,7 @@ public:
private:
uint32_t _id;
};
+ using TaskList = std::vector<std::pair<ExecutorId, Executor::Task::UP>>;
ISequencedTaskExecutor(uint32_t numExecutors);
virtual ~ISequencedTaskExecutor();
@@ -40,7 +41,7 @@ public:
virtual ExecutorId getExecutorId(uint64_t componentId) const = 0;
uint32_t getNumExecutors() const { return _numExecutors; }
- ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
+ ExecutorId getExecutorIdFromName(stringref componentId) const;
/**
* Returns an executor id that is NOT equal to the given executor id,
@@ -58,7 +59,13 @@ public:
* @param id which internal executor to use
* @param task unique pointer to the task to be executed
*/
- virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
+ virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0;
+ /**
+ * Schedule a list of tasks to run after all previously scheduled tasks with
+ * same id. Default is to just iterate and execute one by one, but implementations
+ * that can schedule all in one go more efficiently can implement this one.
+ */
+ virtual void executeTasks(TaskList tasks);
/**
* Call this one to ensure you get the attention of the workers.
*/
@@ -74,7 +81,7 @@ public:
*/
template <class FunctionType>
void executeLambda(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
* Wait for all scheduled tasks to complete.
@@ -83,7 +90,7 @@ public:
virtual void setTaskLimit(uint32_t taskLimit) = 0;
- virtual vespalib::ExecutorStats getStats() = 0;
+ virtual ExecutorStats getStats() = 0;
/**
* Wrap lambda function into a task and schedule it to be run.
@@ -96,7 +103,7 @@ public:
template <class FunctionType>
void execute(uint64_t componentId, FunctionType &&function) {
ExecutorId id = getExecutorId(componentId);
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
@@ -109,7 +116,7 @@ public:
*/
template <class FunctionType>
void execute(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
private:
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 76b0235301b..58ae862f7c6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms));
} else {
executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
index 5ae8e96b606..d81b8ec1db6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
@@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu
SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default;
void
-SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task)
{
++_executeCnt;
{
@@ -28,6 +28,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta
}
void
+SequencedTaskExecutorObserver::executeTasks(TaskList tasks)
+{
+ _executeCnt += tasks.size();
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ for (const auto & task : tasks) {
+ _executeHistory.emplace_back(task.first.getId());
+ }
+ }
+ _executor.executeTasks(std::move(tasks));
+}
+
+void
SequencedTaskExecutorObserver::sync_all()
{
++_syncCnt;
@@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
_executor.setTaskLimit(taskLimit);
}
-vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ExecutorStats SequencedTaskExecutorObserver::getStats() {
return _executor.getStats();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
index 7e2bf968952..1d54283c393 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
@@ -24,10 +24,11 @@ public:
~SequencedTaskExecutorObserver() override;
ExecutorId getExecutorId(uint64_t componentId) const override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void executeTask(ExecutorId id, Executor::Task::UP task) override;
+ void executeTasks(TaskList tasks) override;
void sync_all() override;
void setTaskLimit(uint32_t taskLimit) override;
- vespalib::ExecutorStats getStats() override;
+ ExecutorStats getStats() override;
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index a99bce0a705..21ed90c3d22 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -7,12 +7,12 @@
namespace vespalib {
SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
- : SingleExecutor(func, taskLimit, taskLimit/10, 100ms)
+ : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms)
{ }
-SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)
- : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0),
- _taskLimit(vespalib::roundUp2inN(taskLimit)),
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime)
+ : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0),
+ _taskLimit(vespalib::roundUp2inN(reservedQueueSize)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
@@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_wp(0),
_watermark(_taskLimit.load()*_watermarkRatio),
_reactionTime(reactionTime),
- _closed(false)
+ _closed(false),
+ _overflow()
{
- assert(taskLimit >= watermark);
+ assert(reservedQueueSize >= watermark);
+ if ( ! isQueueSizeHard) {
+ _overflow = std::make_unique<ArrayQueue<Task::UP>>();
+ }
_thread.start();
}
@@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) {
if (_closed) {
return task;
}
- wait_for_room(guard);
- wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
+ task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task));
+ if (task) {
+ wp = move_to_main_q(guard, std::move(task));
+ } else {
+ wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard);
+ }
}
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
@@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) {
return task;
}
+uint64_t
+SingleExecutor::numTasks() {
+ if (_overflow) {
+ Lock guard(_mutex);
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ } else {
+ return num_tasks_in_main_q();
+ }
+}
+
+uint64_t
+SingleExecutor::move_to_main_q(Lock &, Task::UP task) {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ return wp;
+}
+
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
_wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
@@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) {
void
SingleExecutor::drain(Lock & lock) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
- while (numTasks() > 0) {
+ while (numTasks(lock) > 0) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
}
@@ -97,7 +121,7 @@ SingleExecutor::wakeup() {
SingleExecutor &
SingleExecutor::sync() {
Lock lock(_mutex);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
+ uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
while (wp > _rp.load(std::memory_order_acquire)) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
@@ -119,7 +143,7 @@ SingleExecutor::run() {
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
Lock lock(_mutex);
- if (numTasks() <= 0) {
+ if (numTasks(lock) <= 0) {
steady_time now = steady_clock::now();
_threadIdleTracker.set_idle(now);
_consumerCondition.wait_until(lock, now + _reactionTime);
@@ -134,6 +158,22 @@ void
SingleExecutor::drain_tasks() {
while (numTasks() > 0) {
run_tasks_till(_wp.load(std::memory_order_acquire));
+ move_overflow_to_main_q();
+ }
+}
+
+void
+SingleExecutor::move_overflow_to_main_q()
+{
+ if ( ! _overflow) return;
+ Lock guard(_mutex);
+ move_overflow_to_main_q(guard);
+}
+void
+SingleExecutor::move_overflow_to_main_q(Lock & guard) {
+ while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) {
+ move_to_main_q(guard, std::move(_overflow->front()));
+ _overflow->pop();
}
}
@@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) {
}
}
-void
-SingleExecutor::wait_for_room(Lock & lock) {
+Executor::Task::UP
+SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed);
if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) {
- drain(lock);
+ drain(guard);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
_watermark = _taskLimit * _watermarkRatio;
}
- _queueSize.add(numTasks());
- while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, _reactionTime, wp - get_watermark());
+ uint64_t numTaskInQ = numTasks(guard);
+ _queueSize.add(numTaskInQ);
+ if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) {
+ if (_overflow) {
+ _overflow->push(std::move(task));
+ } else {
+ while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) {
+ sleepProducer(guard, _reactionTime, wp - get_watermark());
+ }
+ }
+ } else {
+ if (_overflow && !_overflow->empty()) {
+ _overflow->push(std::move(task));
+ }
+ }
+ if (_overflow && !_overflow->empty()) {
+ assert(!task);
+ move_overflow_to_main_q(guard);
}
+ return task;
}
ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
- uint64_t accepted = _wp.load(std::memory_order_relaxed);
+ uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
steady_time now = steady_clock::now();
_idleTracker.was_idle(_threadIdleTracker.reset(now));
ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index e76e3f17a41..4fdc217e701 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -19,8 +20,8 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- SingleExecutor(init_fun_t func, uint32_t taskLimit);
- SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -39,12 +40,22 @@ private:
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
- void wait_for_room(Lock & guard);
+ Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task);
+ uint64_t move_to_main_q(Lock & guard, Task::UP task);
+ void move_overflow_to_main_q();
+ void move_overflow_to_main_q(Lock & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
- uint64_t numTasks() const {
+ uint64_t numTasks();
+ uint64_t numTasks(Lock & guard) const {
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ }
+ uint64_t num_tasks_in_overflow_q(Lock &) const {
+ return _overflow ? _overflow->size() : 0;
+ }
+ uint64_t num_tasks_in_main_q() const {
return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
}
const double _watermarkRatio;
@@ -67,6 +78,7 @@ private:
std::atomic<uint32_t> _watermark;
const duration _reactionTime;
bool _closed;
+ std::unique_ptr<ArrayQueue<Task::UP>> _overflow;
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 3d24ee87879..5dafd9c5eda 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor
public:
using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>;
UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove)
- : _to_remove(to_remove)
+ : _to_remove(to_remove)
{}
void process(spi::DocEntry& entry) override {
diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
index 73e70e7fd89..8f3dd8ab006 100644
--- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
+++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
@@ -2,11 +2,11 @@
#pragma once
-#include <stdint.h>
-#include <stdlib.h>
+#include "traits.h"
+#include <cstdint>
+#include <cstdlib>
#include <cassert>
#include <algorithm>
-#include "traits.h"
namespace vespalib {
diff --git a/zookeeper-server/zookeeper-server-3.6.3/pom.xml b/zookeeper-server/zookeeper-server-3.6.3/pom.xml
index a8ad183de4e..f7e6f512f7c 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/pom.xml
+++ b/zookeeper-server/zookeeper-server-3.6.3/pom.xml
@@ -11,9 +11,6 @@
<artifactId>zookeeper-server-3.6.3</artifactId>
<packaging>container-plugin</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <zookeeper.version>3.6.3</zookeeper.version>
- </properties>
<dependencies>
<dependency>
<groupId>com.yahoo.vespa</groupId>
@@ -35,7 +32,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <version>3.6.3</version>
<exclusions>
<!--
Container provides wiring for all common log libraries
@@ -90,6 +87,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -99,9 +97,6 @@
<configuration>
<redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile>
<forkMode>once</forkMode>
- <systemPropertyVariables>
- <zk-version>${zookeeper.version}</zk-version>
- </systemPropertyVariables>
</configuration>
</plugin>
<plugin>
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 246911fdfc7..c002ffa72ce 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
- private QuorumPeer peer;
+ private final AtomicReference<QuorumPeer> peer = new AtomicReference<>();
@Inject
public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
- peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
+ reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set);
}
@Override
public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
+ peer.get().shutdown(Duration.ofMinutes(1));
}
@Override
public void start(Path configFilePath) {
- peer.start(configFilePath);
+ peer.get().start(configFilePath);
}
@Override
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index f0a95b70e96..27aa18c64c7 100644
--- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -2,15 +2,11 @@
package com.yahoo.vespa.zookeeper;
import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ ZooKeeperAdmin zooKeeperAdmin = null;
+ try {
+ zooKeeperAdmin = createAdmin(connectionSpec);
long fromConfig = -1;
// Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0)
- log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
- byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
+ byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null);
log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
-
- // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- zooKeeperAdmin.delete(node, -1);
-
- log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
- }
- catch ( KeeperException.ReconfigInProgress
- | KeeperException.ConnectionLossException
- | KeeperException.NewConfigNoQuorum e) {
- throw new ReconfigException(e);
- }
- catch (KeeperException | IOException | InterruptedException e) {
+ } catch (KeeperException e) {
+ if (retryOn(e))
+ throw new ReconfigException(e);
+ else
+ throw new RuntimeException(e);
+ } catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ if (zooKeeperAdmin != null) {
+ try {
+ zooKeeperAdmin.close();
+ } catch (InterruptedException e) { /* ignore */}
+ }
}
}
@@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
(event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig());
}
+ private static boolean retryOn(KeeperException e) {
+ return e instanceof KeeperException.ReconfigInProgress ||
+ e instanceof KeeperException.ConnectionLossException ||
+ e instanceof KeeperException.NewConfigNoQuorum;
+ }
+
}
diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
deleted file mode 100644
index 922c389f94a..00000000000
--- a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.net.HostName;
-import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer;
-import com.yahoo.vespa.zookeeper.Reconfigurer;
-import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl;
-import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.IntStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.junit.Assert.assertEquals;
-
-public class VespaZooKeeperTest {
-
- static final Path tempDirRoot = getTmpDir();
- static final List<Integer> ports = new ArrayList<>();
-
- /**
- * Performs dynamic reconfiguration of ZooKeeper servers.
- *
- * First, a cluster of 3 servers is set up, and some data is written to it.
- * Then, 3 new servers are added, and the first 3 marked for retirement;
- * this should force the quorum to move the 3 new servers, but not disconnect the old ones.
- * Next, the old servers are removed.
- * Then, the cluster is reduced to size 1.
- * Finally, the cluster grows to size 3 again.
- *
- * Throughout all of this, quorum should remain, and the data should remain the same.
- */
- @Test(timeout = 120_000)
- @Ignore // Unstable, some ZK server keeps resetting connections sometimes.
- public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException {
- List<ZooKeeper> keepers = new ArrayList<>();
- for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper());
- for (int i = 0; i < 8; i++) keepers.get(i).run();
-
- // Start the first three servers.
- List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0);
- for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i);
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for all servers to be up and running.
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Write data to verify later.
- String path = writeData(configs.get(0));
-
- // Let three new servers join, causing the three older ones to retire and leave the ensemble.
- configs = getConfigs(0, 3, 3, 3);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // The existing servers can't reconfigure and leave before the joiners are up.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for new quorum to be established.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
- // Old servers are removed.
- configs = getConfigs(3, 0, 3, 0);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // Old servers shut down, while the newer servers remain.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Ensure old servers shut down properly.
- for (int i = 0; i < 3; i++) keepers.get(i).await();
- // Ensure new servers have reconfigured.
- for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
-
- // Cluster shrinks to a single server.
- configs = getConfigs(5, 0, 1, 0);
- for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // We let the remaining server reconfigure the others out before they die.
- for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 3; i < 5; i++) keepers.get(i).await();
- verifyData(path, configs.get(5));
-
- // Cluster grows to 3 servers again.
- configs = getConfigs(5, 0, 3, 2);
- for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Wait for the joiners.
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- verifyData(path, configs.get(7));
-
- // Let the remaining servers terminate.
- for (int i = 5; i < 8; i++) keepers.get(i).config = null;
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 8; i++) keepers.get(i).await();
- }
-
- static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- ZooKeeperAdmin admin = createAdmin(config);
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- String read = new String(admin.getData(node, false, new Stat()), UTF_8);
- assertEquals("hi", read);
- return node;
- }
-
- static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- for (int i = 0; i < 10; i++) {
- try {
- assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8));
- return;
- }
- catch (KeeperException.ConnectionLossException e) {
- e.printStackTrace();
- Thread.sleep(10 << i);
- }
- }
- }
-
- static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException {
- return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(),
- 10_000,
- System.err::println,
- new ZkClientConfigBuilder().toConfig());
- }
-
- static class ZooKeeper {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- final Phaser phaser = new Phaser(2);
- final AtomicReference<Future<?>> future = new AtomicReference<>();
- ZookeeperServerConfig config;
-
- void run() {
- future.set(executor.submit(() -> {
- Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl());
- phaser.arriveAndAwaitAdvance();
- while (config != null) {
- new ReconfigurableVespaZooKeeperServer(reconfigurer, config);
- phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here
- phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff
- }
- reconfigurer.deconstruct();
- }));
- }
-
- void await() throws ExecutionException, InterruptedException, TimeoutException {
- future.get().get(30, SECONDS);
- }
- }
-
- static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) {
- return IntStream.rangeClosed(1, removed + retired + active)
- .mapToObj(id -> getConfig(removed, retired, active, joining, id))
- .collect(toList());
- }
-
- // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed.
- static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) {
- if (id <= removed)
- return null;
-
- Path tempDir = tempDirRoot.resolve("zookeeper-" + id);
- return new ZookeeperServerConfig.Builder()
- .clientPort(getPorts(id).get(0))
- .dataDir(tempDir.toString())
- .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString())
- .myid(id)
- .myidFile(tempDir.resolve("myid").toString())
- .dynamicReconfiguration(true)
- .server(IntStream.rangeClosed(removed + 1, removed + retired + active)
- .mapToObj(i -> new ZookeeperServerConfig.Server.Builder()
- .id(i)
- .clientPort(getPorts(i).get(0))
- .electionPort(getPorts(i).get(1))
- .quorumPort(getPorts(i).get(2))
- .hostname("localhost")
- .joining(i - removed > retired + active - joining)
- .retired(i - removed <= retired))
- .collect(toList()))
- .build();
- }
-
- static List<Integer> getPorts(int id) {
- if (ports.size() < id * 3) {
- int previousPort;
- if (ports.isEmpty()) {
- String[] version = System.getProperty("zk-version").split("\\.");
- int versionPortOffset = 0;
- for (String part : version)
- versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part));
- previousPort = 20000 + versionPortOffset % 30000;
- }
- else
- previousPort = ports.get(ports.size() - 1);
-
- for (int i = 0; i < 3; i++)
- ports.add(previousPort = nextPort(previousPort));
- }
- return ports.subList(id * 3 - 3, id * 3);
- }
-
- static int nextPort(int previousPort) {
- for (int j = 1; j <= 30000; j++) {
- int port = (previousPort + j);
- while (port > 50000)
- port -= 30000;
-
- try (ServerSocket socket = new ServerSocket(port)) {
- return socket.getLocalPort();
- }
- catch (IOException e) {
- System.err.println("Could not bind port " + port + ": " + e);
- }
- }
- throw new RuntimeException("No free ports");
- }
-
- static Path getTmpDir() {
- try {
- Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test");
- tempDir.toFile().deleteOnExit();
- return tempDir.toAbsolutePath();
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.7.0/pom.xml b/zookeeper-server/zookeeper-server-3.7.0/pom.xml
index 01fd83a496b..ac7db35e6af 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/pom.xml
+++ b/zookeeper-server/zookeeper-server-3.7.0/pom.xml
@@ -11,9 +11,6 @@
<artifactId>zookeeper-server-3.7.0</artifactId>
<packaging>container-plugin</packaging>
<version>7-SNAPSHOT</version>
- <properties>
- <zookeeper.version>3.7.0</zookeeper.version>
- </properties>
<dependencies>
<dependency>
<groupId>com.yahoo.vespa</groupId>
@@ -35,7 +32,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
+ <version>3.7.0</version>
<exclusions>
<!--
Container provides wiring for all common log libraries
@@ -90,6 +87,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -99,9 +97,6 @@
<configuration>
<redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile>
<forkMode>once</forkMode>
- <systemPropertyVariables>
- <zk-version>${zookeeper.version}</zk-version>
- </systemPropertyVariables>
</configuration>
</plugin>
<plugin>
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 246911fdfc7..c002ffa72ce 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
- private QuorumPeer peer;
+ private final AtomicReference<QuorumPeer> peer = new AtomicReference<>();
@Inject
public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
- peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
+ reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set);
}
@Override
public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
+ peer.get().shutdown(Duration.ofMinutes(1));
}
@Override
public void start(Path configFilePath) {
- peer.start(configFilePath);
+ peer.get().start(configFilePath);
}
@Override
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index ae7bf8d84f5..27aa18c64c7 100644
--- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -2,15 +2,11 @@
package com.yahoo.vespa.zookeeper;
import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ ZooKeeperAdmin zooKeeperAdmin = null;
+ try {
+ zooKeeperAdmin = createAdmin(connectionSpec);
long fromConfig = -1;
- // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0).
- log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
- byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
+ // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0)
+ byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null);
log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
-
- // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- zooKeeperAdmin.delete(node, -1);
-
- log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
- }
- catch ( KeeperException.ReconfigInProgress
- | KeeperException.ConnectionLossException
- | KeeperException.NewConfigNoQuorum e) {
- throw new ReconfigException(e);
- }
- catch (KeeperException | IOException | InterruptedException e) {
+ } catch (KeeperException e) {
+ if (retryOn(e))
+ throw new ReconfigException(e);
+ else
+ throw new RuntimeException(e);
+ } catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
+ } finally {
+ if (zooKeeperAdmin != null) {
+ try {
+ zooKeeperAdmin.close();
+ } catch (InterruptedException e) { /* ignore */}
+ }
}
}
@@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
(event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig());
}
+ private static boolean retryOn(KeeperException e) {
+ return e instanceof KeeperException.ReconfigInProgress ||
+ e instanceof KeeperException.ConnectionLossException ||
+ e instanceof KeeperException.NewConfigNoQuorum;
+ }
+
}
diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
deleted file mode 100644
index 922c389f94a..00000000000
--- a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.net.HostName;
-import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer;
-import com.yahoo.vespa.zookeeper.Reconfigurer;
-import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl;
-import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.IntStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.junit.Assert.assertEquals;
-
-public class VespaZooKeeperTest {
-
- static final Path tempDirRoot = getTmpDir();
- static final List<Integer> ports = new ArrayList<>();
-
- /**
- * Performs dynamic reconfiguration of ZooKeeper servers.
- *
- * First, a cluster of 3 servers is set up, and some data is written to it.
- * Then, 3 new servers are added, and the first 3 marked for retirement;
- * this should force the quorum to move the 3 new servers, but not disconnect the old ones.
- * Next, the old servers are removed.
- * Then, the cluster is reduced to size 1.
- * Finally, the cluster grows to size 3 again.
- *
- * Throughout all of this, quorum should remain, and the data should remain the same.
- */
- @Test(timeout = 120_000)
- @Ignore // Unstable, some ZK server keeps resetting connections sometimes.
- public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException {
- List<ZooKeeper> keepers = new ArrayList<>();
- for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper());
- for (int i = 0; i < 8; i++) keepers.get(i).run();
-
- // Start the first three servers.
- List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0);
- for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i);
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for all servers to be up and running.
- for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Write data to verify later.
- String path = writeData(configs.get(0));
-
- // Let three new servers join, causing the three older ones to retire and leave the ensemble.
- configs = getConfigs(0, 3, 3, 3);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // The existing servers can't reconfigure and leave before the joiners are up.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Wait for new quorum to be established.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
- // Old servers are removed.
- configs = getConfigs(3, 0, 3, 0);
- for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i);
- // Old servers shut down, while the newer servers remain.
- for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Ensure old servers shut down properly.
- for (int i = 0; i < 3; i++) keepers.get(i).await();
- // Ensure new servers have reconfigured.
- for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
-
- // Verify written data is preserved.
- verifyData(path, configs.get(3));
-
-
- // Cluster shrinks to a single server.
- configs = getConfigs(5, 0, 1, 0);
- for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // We let the remaining server reconfigure the others out before they die.
- for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 3; i < 5; i++) keepers.get(i).await();
- verifyData(path, configs.get(5));
-
- // Cluster grows to 3 servers again.
- configs = getConfigs(5, 0, 3, 2);
- for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i);
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- // Wait for the joiners.
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- verifyData(path, configs.get(7));
-
- // Let the remaining servers terminate.
- for (int i = 5; i < 8; i++) keepers.get(i).config = null;
- for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance();
- for (int i = 5; i < 8; i++) keepers.get(i).await();
- }
-
- static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- ZooKeeperAdmin admin = createAdmin(config);
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- String read = new String(admin.getData(node, false, new Stat()), UTF_8);
- assertEquals("hi", read);
- return node;
- }
-
- static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException {
- for (int i = 0; i < 10; i++) {
- try {
- assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8));
- return;
- }
- catch (KeeperException.ConnectionLossException e) {
- e.printStackTrace();
- Thread.sleep(10 << i);
- }
- }
- }
-
- static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException {
- return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(),
- 10_000,
- System.err::println,
- new ZkClientConfigBuilder().toConfig());
- }
-
- static class ZooKeeper {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- final Phaser phaser = new Phaser(2);
- final AtomicReference<Future<?>> future = new AtomicReference<>();
- ZookeeperServerConfig config;
-
- void run() {
- future.set(executor.submit(() -> {
- Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl());
- phaser.arriveAndAwaitAdvance();
- while (config != null) {
- new ReconfigurableVespaZooKeeperServer(reconfigurer, config);
- phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here
- phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff
- }
- reconfigurer.deconstruct();
- }));
- }
-
- void await() throws ExecutionException, InterruptedException, TimeoutException {
- future.get().get(30, SECONDS);
- }
- }
-
- static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) {
- return IntStream.rangeClosed(1, removed + retired + active)
- .mapToObj(id -> getConfig(removed, retired, active, joining, id))
- .collect(toList());
- }
-
- // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed.
- static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) {
- if (id <= removed)
- return null;
-
- Path tempDir = tempDirRoot.resolve("zookeeper-" + id);
- return new ZookeeperServerConfig.Builder()
- .clientPort(getPorts(id).get(0))
- .dataDir(tempDir.toString())
- .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString())
- .myid(id)
- .myidFile(tempDir.resolve("myid").toString())
- .dynamicReconfiguration(true)
- .server(IntStream.rangeClosed(removed + 1, removed + retired + active)
- .mapToObj(i -> new ZookeeperServerConfig.Server.Builder()
- .id(i)
- .clientPort(getPorts(i).get(0))
- .electionPort(getPorts(i).get(1))
- .quorumPort(getPorts(i).get(2))
- .hostname("localhost")
- .joining(i - removed > retired + active - joining)
- .retired(i - removed <= retired))
- .collect(toList()))
- .build();
- }
-
- static List<Integer> getPorts(int id) {
- if (ports.size() < id * 3) {
- int previousPort;
- if (ports.isEmpty()) {
- String[] version = System.getProperty("zk-version").split("\\.");
- int versionPortOffset = 0;
- for (String part : version)
- versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part));
- previousPort = 20000 + versionPortOffset % 30000;
- }
- else
- previousPort = ports.get(ports.size() - 1);
-
- for (int i = 0; i < 3; i++)
- ports.add(previousPort = nextPort(previousPort));
- }
- return ports.subList(id * 3 - 3, id * 3);
- }
-
- static int nextPort(int previousPort) {
- for (int j = 1; j <= 30000; j++) {
- int port = (previousPort + j);
- while (port > 50000)
- port -= 30000;
-
- try (ServerSocket socket = new ServerSocket(port)) {
- return socket.getLocalPort();
- }
- catch (IOException e) {
- System.err.println("Could not bind port " + port + ": " + e);
- }
- }
- throw new RuntimeException("No free ports");
- }
-
- static Path getTmpDir() {
- try {
- Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test");
- tempDir.toFile().deleteOnExit();
- return tempDir.toAbsolutePath();
- }
- catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
index 8b22f658a94..39d0312915f 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
@@ -86,7 +86,7 @@ public class Configurator {
sb.append("reconfigEnabled=true").append("\n");
sb.append("skipACL=yes").append("\n");
ensureThisServerIsRepresented(config.myid(), config.server());
- config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n"));
+ config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort()));
sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig));
sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig));
return sb.toString();
@@ -111,8 +111,7 @@ public class Configurator {
}
}
- static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) {
- StringBuilder sb = new StringBuilder();
+ private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) {
sb.append("server.")
.append(server.id())
.append("=")
@@ -121,7 +120,7 @@ public class Configurator {
.append(server.quorumPort())
.append(":")
.append(server.electionPort());
- if (joining) {
+ if (server.joining()) {
// Servers that are joining an existing cluster must be marked as observers. Note that this will NOT
// actually make the server an observer, but prevent it from forming an ensemble independently of the
// existing cluster.
@@ -131,8 +130,8 @@ public class Configurator {
.append("observer");
}
sb.append(";")
- .append(server.clientPort());
- return sb.toString();
+ .append(clientPort)
+ .append("\n");
}
static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) {
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index 604419c063d..d4223e4d815 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
-
-import static com.yahoo.vespa.zookeeper.Configurator.serverSpec;
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
/**
* Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component
@@ -50,22 +50,17 @@ public class Reconfigurer extends AbstractComponent {
this.sleeper = Objects.requireNonNull(sleeper);
}
- @Override
- public void deconstruct() {
- shutdown();
- }
-
- QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
- Supplier<QuorumPeer> quorumPeerCreator) {
+ void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
+ Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) {
if (zooKeeperRunner == null) {
- peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers.
+ peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers.
zooKeeperRunner = startServer(newConfig, server);
}
+ quorumPeerSetter.accept(peer);
- if (newConfig.dynamicReconfiguration()) {
+ if (shouldReconfigure(newConfig)) {
reconfigure(newConfig);
}
- return peer;
}
ZookeeperServerConfig activeConfig() {
@@ -78,30 +73,42 @@ public class Reconfigurer extends AbstractComponent {
}
}
+ private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
+ if (!newConfig.dynamicReconfiguration()) return false;
+ if (activeConfig == null) return false;
+ return !newConfig.equals(activeConfig());
+ }
+
private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
activeConfig = zookeeperServerConfig;
return runner;
}
- // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>)
- // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers
- // TODO jonmv: wrap Curator in Provider, for Curator shutdown
private void reconfigure(ZookeeperServerConfig newConfig) {
Instant reconfigTriggered = Instant.now();
- String newServers = String.join(",", servers(newConfig));
- log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." +
+ // No point in trying to reconfigure if there is only one server in the new ensemble,
+ // the others will be shutdown or are about to be shutdown
+ if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO);
+
+ List<String> newServers = difference(servers(newConfig), servers(activeConfig));
+ String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig));
+ String joiningServersSpec = String.join(",", newServers);
+ leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds;
+ joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec;
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec +
+ "\nleaving servers: " + leavingServerIds +
"\nServers in active config:" + servers(activeConfig) +
"\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
- Duration reconfigTimeout = reconfigTimeout(newConfig.server().size());
+ Duration reconfigTimeout = reconfigTimeout(newServers.size());
Instant end = now.plus(reconfigTimeout);
// Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
for (int attempt = 1; now.isBefore(end); attempt++) {
try {
Instant reconfigStarted = Instant.now();
- vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers);
+ vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds);
Instant reconfigEnded = Instant.now();
log.log(Level.INFO, "Reconfiguration completed in " +
Duration.between(reconfigTriggered, reconfigEnded) +
@@ -140,11 +147,24 @@ public class Reconfigurer extends AbstractComponent {
return HostName.getLocalhost() + ":" + config.clientPort();
}
+ private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) {
+ return difference(servers(oldConfig), servers(newConfig)).stream()
+ .map(server -> server.substring(0, server.indexOf('=')))
+ .collect(Collectors.toList());
+ }
+
private static List<String> servers(ZookeeperServerConfig config) {
+ // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format
return config.server().stream()
- .filter(server -> ! server.retired())
- .map(server -> serverSpec(server, config.clientPort(), false))
- .collect(toList());
+ .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
+ server.electionPort() + ";" + config.clientPort())
+ .collect(Collectors.toList());
+ }
+
+ private static <T> List<T> difference(List<T> list1, List<T> list2) {
+ List<T> copy = new ArrayList<>(list1);
+ copy.removeAll(list2);
+ return copy;
}
}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
index 59c9628bcab..8809dca0def 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
@@ -10,7 +10,7 @@ import java.time.Duration;
*/
public interface VespaZooKeeperAdmin {
- void reconfigure(String connectionSpec, String servers) throws ReconfigException;
+ void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException;
/* Timeout for connecting to ZooKeeper */
default Duration sessionTimeout() { return Duration.ofSeconds(30); }
diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
index 760c326cf5d..1211624e3d6 100644
--- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
+++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
@@ -17,6 +17,7 @@ import java.util.Arrays;
import java.util.concurrent.Phaser;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
/**
@@ -50,26 +51,31 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // No reconfiguration happens with same config
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster shrinks
nextConfig = createConfig(3, true);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals(2, reconfigurer.reconfigurations());
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181",
- reconfigurer.servers());
+ assertNull("No servers are joining", reconfigurer.joiningServers());
+ assertEquals("3,4", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster loses node1, but node3 joins. Indices are shuffled.
nextConfig = createConfig(3, true, 1);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(4, reconfigurer.reconfigurations());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181",
- reconfigurer.servers());
+ assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers());
+ assertEquals("1,2", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -83,9 +89,9 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -106,27 +112,24 @@ public class ReconfigurerTest {
reconfigurer.shutdown();
}
- private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) {
- Arrays.sort(retiredIndices);
+ private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) {
+ Arrays.sort(skipIndices);
ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder();
builder.zooKeeperConfigFile(cfgFile.getAbsolutePath());
builder.myidFile(idFile.getAbsolutePath());
for (int i = 0, index = 0; i < numberOfServers; i++, index++) {
- boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0;
- if (retired) i--;
- builder.server(newServer(i, "node" + index, retired));
+ while (Arrays.binarySearch(skipIndices, index) >= 0) index++;
+ builder.server(newServer(i, "node" + index));
}
-
builder.myid(0);
builder.dynamicReconfiguration(dynamicReconfiguration);
return builder.build();
}
- private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) {
+ private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) {
ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder();
builder.id(id);
builder.hostname(hostName);
- builder.retired(retired);
return builder;
}
@@ -139,7 +142,6 @@ public class ReconfigurerTest {
private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer {
private final TestableVespaZooKeeperAdmin zooKeeperAdmin;
- private final Phaser phaser = new Phaser(2);
private QuorumPeer serverPeer;
TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) {
@@ -154,16 +156,19 @@ public class ReconfigurerTest {
}
void startOrReconfigure(ZookeeperServerConfig newConfig) {
- serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new);
- phaser.arriveAndDeregister();
+ startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer);
}
String connectionSpec() {
return zooKeeperAdmin.connectionSpec;
}
- String servers() {
- return zooKeeperAdmin.servers;
+ String joiningServers() {
+ return zooKeeperAdmin.joiningServers;
+ }
+
+ String leavingServers() {
+ return zooKeeperAdmin.leavingServers;
}
int reconfigurations() {
@@ -172,14 +177,10 @@ public class ReconfigurerTest {
@Override
public void shutdown() {
- phaser.arriveAndAwaitAdvance();
serverPeer.shutdown(Duration.ofSeconds(1)); }
@Override
- public void start(Path configFilePath) {
- phaser.arriveAndAwaitAdvance();
- serverPeer.start(configFilePath);
- }
+ public void start(Path configFilePath) { serverPeer.start(configFilePath); }
@Override
public boolean reconfigurable() {
@@ -191,7 +192,8 @@ public class ReconfigurerTest {
private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin {
String connectionSpec;
- String servers;
+ String joiningServers;
+ String leavingServers;
int reconfigurations = 0;
private int failures = 0;
@@ -203,11 +205,12 @@ public class ReconfigurerTest {
}
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
if (++attempts < failures)
throw new ReconfigException("Reconfig failed");
this.connectionSpec = connectionSpec;
- this.servers = servers;
+ this.joiningServers = joiningServers;
+ this.leavingServers = leavingServers;
this.reconfigurations++;
}