diff options
43 files changed, 364 insertions, 839 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 6108c39f9d3..6b46e0957ff 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -82,7 +82,8 @@ public interface ModelContext { @ModelFeatureFlag(owners = {"baldersheim"}) default boolean skipMbusReplyThread() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"baldersheim"}) default boolean useAsyncMessageHandlingOnSchedule() { throw new UnsupportedOperationException("TODO specify default value"); } @ModelFeatureFlag(owners = {"baldersheim"}) default double feedConcurrency() { throw new UnsupportedOperationException("TODO specify default value"); } - @ModelFeatureFlag(owners = {"baldersheim"}) default int metricsproxyNumThreads() { throw new UnsupportedOperationException("TODO specify default value"); } + @ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.527") default int metricsproxyNumThreads() { return defaultPoolNumThreads(); } + @ModelFeatureFlag(owners = {"baldersheim"}) default int defaultPoolNumThreads() { return 2; } @ModelFeatureFlag(owners = {"baldersheim"}, removeAfter = "7.527") default int largeRankExpressionLimit() { return 8192; } @ModelFeatureFlag(owners = {"baldersheim"}) default int maxUnCommittedMemory() { return 130000; } @ModelFeatureFlag(owners = {"baldersheim"}) default int maxConcurrentMergesPerNode() { return 16; } diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index c148bb0e6e4..40b88372348 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -118,7 +118,6 @@ public class TestProperties implements ModelContext.Properties, ModelContext.Fea @Override public double resourceLimitDisk() { return resourceLimitDisk; } @Override public double resourceLimitMemory() { return resourceLimitMemory; } @Override public double minNodeRatioPerGroup() { return minNodeRatioPerGroup; } - @Override public int metricsproxyNumThreads() { return 1; } @Override public double containerShutdownTimeout() { return containerShutdownTimeout; } @Override public boolean containerDumpHeapOnShutdownTimeout() { return containerDumpHeapOnShutdownTimeout; } @Override public int distributorMergeBusyWait() { return distributorMergeBusyWait; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java index 43f66f2c727..75b13a89e83 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/LogserverContainerCluster.java @@ -17,7 +17,7 @@ import java.util.Optional; public class LogserverContainerCluster extends ContainerCluster<LogserverContainer> { public LogserverContainerCluster(AbstractConfigProducer<?> parent, String name, DeployState deployState) { - super(parent, name, name, deployState, true); + super(parent, name, name, deployState, true, deployState.featureFlags().defaultPoolNumThreads()); addDefaultHandlersWithVip(); addLogHandler(); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java index 0b99496a9b4..6e6f027b520 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerCluster.java @@ -50,12 +50,14 @@ public class ClusterControllerCluster extends AbstractConfigProducer<ClusterCont public void getConfig(ZookeeperServerConfig.Builder builder) { builder.clientPort(ZK_CLIENT_PORT); builder.juteMaxBuffer(1024 * 1024); // 1 Mb should be more than enough for cluster controller + boolean oldQuorumExists = containerCluster.getContainers().stream() // More than half the previous hosts must be present in the new config for quorum to persist. + .filter(container -> previousHosts.contains(container.getHostName())) // Set intersection is symmetric. + .count() > previousHosts.size() / 2; for (ClusterControllerContainer container : containerCluster.getContainers()) { ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder(); serverBuilder.hostname(container.getHostName()); serverBuilder.id(container.index()); - serverBuilder.joining( ! previousHosts.isEmpty() && ! previousHosts.contains(container.getHostName())); - serverBuilder.retired(container.isRetired()); + serverBuilder.joining(oldQuorumExists && ! previousHosts.contains(container.getHostName())); builder.server(serverBuilder); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java index 728e46f2ff7..a7f3a6224f2 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/clustercontroller/ClusterControllerContainerCluster.java @@ -22,7 +22,7 @@ public class ClusterControllerContainerCluster extends ContainerCluster<ClusterC public ClusterControllerContainerCluster( AbstractConfigProducer<?> parent, String subId, String name, DeployState deployState) { - super(parent, subId, name, deployState, false); + super(parent, subId, name, deployState, false, deployState.featureFlags().defaultPoolNumThreads()); addDefaultHandlersWithVip(); this.reindexingContext = createReindexingContext(deployState); setJvmGCOptions(deployState.getProperties().jvmGCOptions(Optional.of(ClusterSpec.Type.admin))); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java index dd6f77ed093..a29647b062a 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerCluster.java @@ -87,7 +87,7 @@ public class MetricsProxyContainerCluster extends ContainerCluster<MetricsProxyC private final ApplicationId applicationId; public MetricsProxyContainerCluster(AbstractConfigProducer<?> parent, String name, DeployState deployState) { - super(parent, name, name, deployState, true); + super(parent, name, name, deployState, true, deployState.featureFlags().defaultPoolNumThreads()); this.parent = parent; applicationId = deployState.getProperties().applicationId(); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java index 438e143bdfd..89c455269f4 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java @@ -94,7 +94,7 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat private List<ApplicationClusterEndpoint> endpointList = List.of(); public ApplicationContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState) { - super(parent, configSubId, clusterId, deployState, true); + super(parent, configSubId, clusterId, deployState, true, 10); this.tlsClientAuthority = deployState.tlsClientAuthority(); previousHosts = deployState.getPreviousModel().stream() .map(Model::allocatedHosts) @@ -309,9 +309,8 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat ZookeeperServerConfig.Server.Builder serverBuilder = new ZookeeperServerConfig.Server.Builder(); serverBuilder.hostname(container.getHostName()) .id(container.index()) - .joining( ! previousHosts.isEmpty() && - ! previousHosts.contains(container.getHostName())) - .retired(container.isRetired()); + .joining(!previousHosts.isEmpty() && + !previousHosts.contains(container.getHostName())); builder.server(serverBuilder); builder.dynamicReconfiguration(true); } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java index c73a3b2a676..7010d7b3d4e 100755 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ContainerCluster.java @@ -160,7 +160,7 @@ public abstract class ContainerCluster<CONTAINER extends Container> private boolean deferChangesUntilRestart = false; - public ContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState, boolean zooKeeperLocalhostAffinity) { + public ContainerCluster(AbstractConfigProducer<?> parent, String configSubId, String clusterId, DeployState deployState, boolean zooKeeperLocalhostAffinity, int defaultPoolNumThreads) { super(parent, configSubId); this.name = clusterId; this.isHostedVespa = stateIsHosted(deployState); @@ -176,7 +176,7 @@ public abstract class ContainerCluster<CONTAINER extends Container> addComponent(new StatisticsComponent()); addSimpleComponent(AccessLog.class); - addComponent(new DefaultThreadpoolProvider(this, deployState.featureFlags().metricsproxyNumThreads())); + addComponent(new DefaultThreadpoolProvider(this, defaultPoolNumThreads)); addSimpleComponent(com.yahoo.concurrent.classlock.ClassLocking.class); addSimpleComponent("com.yahoo.container.jdisc.metric.MetricConsumerProviderProvider"); addSimpleComponent("com.yahoo.container.jdisc.metric.MetricProvider"); diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java b/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java index e0d4f3c0692..0b37abaded9 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/DefaultThreadpoolProvider.java @@ -5,7 +5,6 @@ import com.yahoo.container.bundle.BundleInstantiationSpecification; import com.yahoo.container.handler.ThreadPoolProvider; import com.yahoo.container.handler.ThreadpoolConfig; import com.yahoo.osgi.provider.model.ComponentModel; -import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainerCluster; import com.yahoo.vespa.model.container.component.SimpleComponent; /** @@ -16,38 +15,27 @@ import com.yahoo.vespa.model.container.component.SimpleComponent; class DefaultThreadpoolProvider extends SimpleComponent implements ThreadpoolConfig.Producer { private final ContainerCluster<?> cluster; - private final int metricsproxyNumThreads; + private final int defaultWorkerThreads; - DefaultThreadpoolProvider(ContainerCluster<?> cluster, int metricsproxyNumThreads) { + DefaultThreadpoolProvider(ContainerCluster<?> cluster, int defaultWorkerThreads) { super(new ComponentModel( BundleInstantiationSpecification.getFromStrings( "default-threadpool", ThreadPoolProvider.class.getName(), null))); this.cluster = cluster; - this.metricsproxyNumThreads = metricsproxyNumThreads; - } - - private int defaultThreadsByClusterType() { - if (cluster instanceof MetricsProxyContainerCluster) { - return metricsproxyNumThreads; - } - return 10; + this.defaultWorkerThreads = defaultWorkerThreads; } @Override public void getConfig(ThreadpoolConfig.Builder builder) { - if (!(cluster instanceof ApplicationContainerCluster)) { + if (cluster instanceof ApplicationContainerCluster) { + // Core pool size of 2xcores, and max of 100xcores and using a synchronous Q + // This is the deafault pool used by both federation and generally when you ask for an Executor. + builder.corePoolSize(-2).maxthreads(-100).queueSize(0); + } else { // Container clusters such as logserver, metricsproxy and clustercontroller - int defaultWorkerThreads = defaultThreadsByClusterType(); - builder.maxthreads(defaultWorkerThreads); - builder.corePoolSize(defaultWorkerThreads); - builder.queueSize(50); - return; + builder.corePoolSize(defaultWorkerThreads).maxthreads(defaultWorkerThreads).queueSize(50); } - - // Core pool size of 2xcores, and max of 100xcores and using a synchronous Q - // This is the deafault pool used by both federation and generally when you ask for an Executor. - builder.corePoolSize(-2).maxthreads(-100).queueSize(0); } } diff --git a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java index ba70b7493a2..10f883bdc75 100644 --- a/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java +++ b/config-model/src/test/java/com/yahoo/config/model/provision/ModelProvisioningTest.java @@ -2054,7 +2054,7 @@ public class ModelProvisioningTest { assertTrue("Initial servers are not joining", config.build().server().stream().noneMatch(ZookeeperServerConfig.Server::joining)); } { - VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(3), true, false, false, 0, Optional.of(model), new DeployState.Builder(), "node-1-3-10-04", "node-1-3-10-03"); + VespaModel nextModel = tester.createModel(Zone.defaultZone(), servicesXml.apply(5), true, false, false, 0, Optional.of(model), new DeployState.Builder()); ApplicationContainerCluster cluster = nextModel.getContainerClusters().get("zk"); ZookeeperServerConfig.Builder config = new ZookeeperServerConfig.Builder(); cluster.getContainers().forEach(c -> c.getConfig(config)); @@ -2067,14 +2067,6 @@ public class ModelProvisioningTest { 4, true), config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id, ZookeeperServerConfig.Server::joining))); - assertEquals("Retired nodes are retired", - Map.of(0, false, - 1, true, - 2, true, - 3, false, - 4, false), - config.build().server().stream().collect(Collectors.toMap(ZookeeperServerConfig.Server::id, - ZookeeperServerConfig.Server::retired))); } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java index 474013c17fc..41c7f6d72e2 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/ClusterControllerTestCase.java @@ -397,7 +397,7 @@ public class ClusterControllerTestCase extends DomBuilderTest { assertEquals("-XX:+UseG1GC -XX:MaxTenuringThreshold=15", qrStartConfig.jvm().gcopts()); assertEquals(512, qrStartConfig.jvm().stacksize()); assertEquals(0, qrStartConfig.jvm().directMemorySizeCache()); - assertEquals(75, qrStartConfig.jvm().baseMaxDirectMemorySize()); + assertEquals(16, qrStartConfig.jvm().baseMaxDirectMemorySize()); assertReindexingConfigPresent(model); assertReindexingConfiguredOnAdminCluster(model); diff --git a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java index eb1cf668cc9..14a90130a57 100755 --- a/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/container/ContainerClusterTest.java @@ -190,7 +190,7 @@ public class ContainerClusterTest { root.freezeModelTopology(); ThreadpoolConfig threadpoolConfig = root.getConfig(ThreadpoolConfig.class, "container0/component/default-threadpool"); - assertEquals(10, threadpoolConfig.maxthreads()); + assertEquals(2, threadpoolConfig.maxthreads()); assertEquals(50, threadpoolConfig.queueSize()); } diff --git a/configdefinitions/src/vespa/zookeeper-server.def b/configdefinitions/src/vespa/zookeeper-server.def index d80ccc4d042..04632ffd35f 100644 --- a/configdefinitions/src/vespa/zookeeper-server.def +++ b/configdefinitions/src/vespa/zookeeper-server.def @@ -32,13 +32,10 @@ juteMaxBuffer int default=52428800 myid int restart server[].id int server[].hostname string -server[].clientPort int default=2181 server[].quorumPort int default=2182 server[].electionPort int default=2183 # Whether this server is joining an existing cluster server[].joining bool default=false -# Whether this server is retired, and about to be removed -server[].retired bool default=false # Needed when upgrading from ZooKeeper 3.4 to 3.5, see https://issues.apache.org/jira/browse/ZOOKEEPER-3056, # and in general where there is a zookeeper ensemble running that has had few transactions. diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 5d944df2f30..db9869fa5f2 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -273,7 +273,7 @@ public class ModelContextImpl implements ModelContext { @Override public double resourceLimitDisk() { return resourceLimitDisk; } @Override public double resourceLimitMemory() { return resourceLimitMemory; } @Override public double minNodeRatioPerGroup() { return minNodeRatioPerGroup; } - @Override public int metricsproxyNumThreads() { return metricsproxyNumThreads; } + @Override public int defaultPoolNumThreads() { return metricsproxyNumThreads; } @Override public double containerShutdownTimeout() { return containerShutdownTimeout; } @Override public boolean containerDumpHeapOnShutdownTimeout() { return containerDumpHeapOnShutdownTimeout; } @Override public int distributorMergeBusyWait() { return distributorMergeBusyWait; } diff --git a/container-search/src/main/resources/configdefinitions/search.config.qr-start.def b/container-search/src/main/resources/configdefinitions/search.config.qr-start.def index e2856e137f0..c58f9944d61 100644 --- a/container-search/src/main/resources/configdefinitions/search.config.qr-start.def +++ b/container-search/src/main/resources/configdefinitions/search.config.qr-start.def @@ -24,7 +24,7 @@ jvm.stacksize int default=512 restart jvm.compressedClassSpaceSize int default=32 restart ## Base value of maximum direct memory size (in megabytes) -jvm.baseMaxDirectMemorySize int default=75 restart +jvm.baseMaxDirectMemorySize int default=16 restart ## Amount of direct memory used for caching. (in megabytes) jvm.directMemorySizeCache int default=0 restart diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java deleted file mode 100644 index 553f2ffba36..00000000000 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/file/Templar.java +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.node.admin.task.util.file; - -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; - -/** - * A very simple template engine when there's little complexity and lots of Velocity special characters $ and #, - * i.e. typically shell script. - * - * @author hakonhall - */ -public class Templar { - private final String template; - - private static final String prefix = "<%="; - private static final String suffix = "%>"; - - private final Map<String, String> settings = new HashMap<>(); - - public static Templar fromUtf8File(Path path) { - return new Templar(new UnixPath(path).readUtf8File()); - } - - public Templar(String template) { - this.template = template; - } - - public Templar set(String name, String value) { - settings.put(name, value); - return this; - } - - public String resolve() { - StringBuilder text = new StringBuilder(template.length() * 2); - - int start= 0; - int end; - - for (; start < template.length(); start = end) { - int prefixStart = template.indexOf(prefix, start); - - - if (prefixStart == -1) { - text.append(template, start, template.length()); - break; - } else { - text.append(template, start, prefixStart); - } - - int suffixStart = template.indexOf(suffix, prefixStart + prefix.length()); - if (suffixStart == -1) { - throw new IllegalArgumentException("Prefix at offset " + prefixStart + " is not terminated"); - } - - int prefixEnd = prefixStart + prefix.length(); - String name = template.substring(prefixEnd, suffixStart).trim(); - String value = settings.get(name); - if (value == null) { - throw new IllegalArgumentException("No value is set for name '" + name + "' at offset " + prefixEnd); - } - - text.append(value); - - end = suffixStart + suffix.length(); - } - - return text.toString(); - } - - public FileWriter getFileWriterTo(Path path) { - return new FileWriter(path, this::resolve); - } -} diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java index ea55af17a0e..6d80ac2cad9 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/template/Template.java @@ -26,12 +26,15 @@ import java.util.Optional; * id: a valid Java identifier * </pre> * + * <p>Other directive delimiters than "%{" and "}" may be used, see {@link TemplateDescriptor}.</p> + * * <p>Fill the template with variable values ({@link #set(String, String) set()}, set if conditions * ({@link #set(String, boolean)}), add list elements ({@link #add(String) add()}, etc, and finally * render it as a String ({@link #render()}).</p> * * <p>To reuse a template, create the template and work on snapshots of that ({@link #snapshot()}).</p> * + * @see TemplateDescriptor * @author hakonhall */ public class Template implements Form { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java deleted file mode 100644 index ed410ffc1d1..00000000000 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/file/TemplarTest.java +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.hosted.node.admin.task.util.file; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -/** - * @author hakonhall - */ -public class TemplarTest { - @Test - public void test() { - Templar templar = new Templar("x y <%= foo %>, some other <%=bar%> text"); - templar.set("foo", "fidelity") - .set("bar", "halimov") - .set("not", "used"); - - assertEquals("x y fidelity, some other halimov text", templar.resolve()); - } -}
\ No newline at end of file diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index 214c57557bc..385d0eb363e 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -178,6 +178,12 @@ private: MockSharedThreadingService _shared_service; storage::spi::dummy::DummyBucketExecutor _bucketExecutor; + static std::shared_ptr<ProtonConfig> make_proton_config() { + ProtonConfigBuilder proton_config; + proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY; + return std::make_shared<ProtonConfig>(proton_config); + } + public: DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort); ~DocumentDBFactory() override; @@ -196,7 +202,7 @@ public: TuneFileDocumentDB::SP tuneFileDocDB(new TuneFileDocumentDB()); DocumentDBConfigHelper mgr(spec, docType.getName()); auto b = std::make_shared<BootstrapConfig>(1, factory.getTypeCfg(), factory.getTypeRepo(), - std::make_shared<ProtonConfig>(), + make_proton_config(), std::make_shared<FiledistributorrpcConfig>(), std::make_shared<BucketspacesConfig>(), tuneFileDocDB, HwInfo()); diff --git a/searchlib/src/vespa/searchlib/diskindex/field_merger.h b/searchlib/src/vespa/searchlib/diskindex/field_merger.h index 5017a7d5192..bbb4efc03b1 100644 --- a/searchlib/src/vespa/searchlib/diskindex/field_merger.h +++ b/searchlib/src/vespa/searchlib/diskindex/field_merger.h @@ -53,7 +53,6 @@ class FieldMerger std::unique_ptr<FieldWriter> _writer; State _state; bool _failed; - bool _force_small_merge_chunk; void make_tmp_dirs(); bool clean_tmp_dirs(); diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index ef7f8bfb0f6..243935d4013 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture) EXPECT_EQUAL(42, tv->_val); } +TEST_F("require that task with same component id are serialized when executed with list", Fixture) +{ + std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); + EXPECT_EQUAL(0, tv->_val); + ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0); + ISequencedTaskExecutor::TaskList list; + list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); })); + list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); })); + f._threads->executeTasks(std::move(list)); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync_all(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + TEST_F("require that task with different component ids are not serialized", Fixture) { int tryCnt = 0; @@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture namespace { -int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +int +detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) { int tryCnt = 0; for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { @@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t return tryCnt; } -vespalib::string makeAltComponentId(Fixture &f) +vespalib::string +makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index dd71380f64a..56352ff3c0d 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -30,6 +30,28 @@ TEST("test that all tasks are executed") { EXPECT_EQUAL(10000u, counter); } +TEST("test that executor can overflow") { + constexpr size_t NUM_TASKS = 1000; + std::atomic<uint64_t> counter(0); + vespalib::Gate gate; + SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms); + executor.execute(makeLambdaTask([&gate] { gate.await();})); + + for(size_t i(0); i < NUM_TASKS; i++) { + executor.execute(makeLambdaTask([&counter, i] { + EXPECT_EQUAL(i, counter); + counter++; + })); + } + EXPECT_EQUAL(0u, counter); + ExecutorStats stats = executor.getStats(); + EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks); + EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max()); + gate.countDown(); + executor.sync(); + EXPECT_EQUAL(NUM_TASKS, counter); +} + void verifyResizeTaskLimit(bool up) { std::mutex lock; std::condition_variable cond; @@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) { constexpr uint32_t INITIAL = 20; const uint32_t INITIAL_2inN = roundUp2inN(INITIAL); double waterMarkRatio = 0.5; - SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms); + SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms); EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark()); diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index c54f182891c..b31d72da3b1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; +void +ISequencedTaskExecutor::executeTasks(TaskList tasks) { + for (auto & task : tasks) { + executeTask(task.first, std::move(task.second)); + } +} + ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { - vespalib::hash<vespalib::stringref> hashfun; +ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const { + hash<stringref> hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 3fe6fb5d678..ff90556e3e4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -14,7 +14,7 @@ namespace vespalib { * Interface class to run multiple tasks in parallel, but tasks with same * id has to be run in sequence. */ -class ISequencedTaskExecutor : public vespalib::IWakeup +class ISequencedTaskExecutor : public IWakeup { public: class ExecutorId { @@ -28,6 +28,7 @@ public: private: uint32_t _id; }; + using TaskList = std::vector<std::pair<ExecutorId, Executor::Task::UP>>; ISequencedTaskExecutor(uint32_t numExecutors); virtual ~ISequencedTaskExecutor(); @@ -40,7 +41,7 @@ public: virtual ExecutorId getExecutorId(uint64_t componentId) const = 0; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(stringref componentId) const; /** * Returns an executor id that is NOT equal to the given executor id, @@ -58,7 +59,13 @@ public: * @param id which internal executor to use * @param task unique pointer to the task to be executed */ - virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0; + /** + * Schedule a list of tasks to run after all previously scheduled tasks with + * same id. Default is to just iterate and execute one by one, but implementations + * that can schedule all in one go more efficiently can implement this one. + */ + virtual void executeTasks(TaskList tasks); /** * Call this one to ensure you get the attention of the workers. */ @@ -74,7 +81,7 @@ public: */ template <class FunctionType> void executeLambda(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } /** * Wait for all scheduled tasks to complete. @@ -83,7 +90,7 @@ public: virtual void setTaskLimit(uint32_t taskLimit) = 0; - virtual vespalib::ExecutorStats getStats() = 0; + virtual ExecutorStats getStats() = 0; /** * Wrap lambda function into a task and schedule it to be run. @@ -96,7 +103,7 @@ public: template <class FunctionType> void execute(uint64_t componentId, FunctionType &&function) { ExecutorId id = getExecutorId(componentId); - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } /** @@ -109,7 +116,7 @@ public: */ template <class FunctionType> void execute(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); + executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); } private: diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 76b0235301b..58ae862f7c6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); } else { executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp index 5ae8e96b606..d81b8ec1db6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; void -SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task) { ++_executeCnt; { @@ -28,6 +28,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta } void +SequencedTaskExecutorObserver::executeTasks(TaskList tasks) +{ + _executeCnt += tasks.size(); + { + std::lock_guard<std::mutex> guard(_mutex); + for (const auto & task : tasks) { + _executeHistory.emplace_back(task.first.getId()); + } + } + _executor.executeTasks(std::move(tasks)); +} + +void SequencedTaskExecutorObserver::sync_all() { ++_syncCnt; @@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { _executor.setTaskLimit(taskLimit); } -vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { +ExecutorStats SequencedTaskExecutorObserver::getStats() { return _executor.getStats(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h index 7e2bf968952..1d54283c393 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -24,10 +24,11 @@ public: ~SequencedTaskExecutorObserver() override; ExecutorId getExecutorId(uint64_t componentId) const override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void executeTask(ExecutorId id, Executor::Task::UP task) override; + void executeTasks(TaskList tasks) override; void sync_all() override; void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a99bce0a705..21ed90c3d22 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -7,12 +7,12 @@ namespace vespalib { SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) - : SingleExecutor(func, taskLimit, taskLimit/10, 100ms) + : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms) { } -SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) - : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0), - _taskLimit(vespalib::roundUp2inN(taskLimit)), +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime) + : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0), + _taskLimit(vespalib::roundUp2inN(reservedQueueSize)), _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), @@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _wp(0), _watermark(_taskLimit.load()*_watermarkRatio), _reactionTime(reactionTime), - _closed(false) + _closed(false), + _overflow() { - assert(taskLimit >= watermark); + assert(reservedQueueSize >= watermark); + if ( ! isQueueSizeHard) { + _overflow = std::make_unique<ArrayQueue<Task::UP>>(); + } _thread.start(); } @@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) { if (_closed) { return task; } - wait_for_room(guard); - wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); + task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task)); + if (task) { + wp = move_to_main_q(guard, std::move(task)); + } else { + wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard); + } } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); @@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) { return task; } +uint64_t +SingleExecutor::numTasks() { + if (_overflow) { + Lock guard(_mutex); + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } else { + return num_tasks_in_main_q(); + } +} + +uint64_t +SingleExecutor::move_to_main_q(Lock &, Task::UP task) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + void SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); @@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { void SingleExecutor::drain(Lock & lock) { uint64_t wp = _wp.load(std::memory_order_relaxed); - while (numTasks() > 0) { + while (numTasks(lock) > 0) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); } @@ -97,7 +121,7 @@ SingleExecutor::wakeup() { SingleExecutor & SingleExecutor::sync() { Lock lock(_mutex); - uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); @@ -119,7 +143,7 @@ SingleExecutor::run() { _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); Lock lock(_mutex); - if (numTasks() <= 0) { + if (numTasks(lock) <= 0) { steady_time now = steady_clock::now(); _threadIdleTracker.set_idle(now); _consumerCondition.wait_until(lock, now + _reactionTime); @@ -134,6 +158,22 @@ void SingleExecutor::drain_tasks() { while (numTasks() > 0) { run_tasks_till(_wp.load(std::memory_order_acquire)); + move_overflow_to_main_q(); + } +} + +void +SingleExecutor::move_overflow_to_main_q() +{ + if ( ! _overflow) return; + Lock guard(_mutex); + move_overflow_to_main_q(guard); +} +void +SingleExecutor::move_overflow_to_main_q(Lock & guard) { + while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { + move_to_main_q(guard, std::move(_overflow->front())); + _overflow->pop(); } } @@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) { } } -void -SingleExecutor::wait_for_room(Lock & lock) { +Executor::Task::UP +SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) { uint64_t wp = _wp.load(std::memory_order_relaxed); uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { - drain(lock); + drain(guard); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); _watermark = _taskLimit * _watermarkRatio; } - _queueSize.add(numTasks()); - while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, _reactionTime, wp - get_watermark()); + uint64_t numTaskInQ = numTasks(guard); + _queueSize.add(numTaskInQ); + if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) { + if (_overflow) { + _overflow->push(std::move(task)); + } else { + while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) { + sleepProducer(guard, _reactionTime, wp - get_watermark()); + } + } + } else { + if (_overflow && !_overflow->empty()) { + _overflow->push(std::move(task)); + } + } + if (_overflow && !_overflow->empty()) { + assert(!task); + move_overflow_to_main_q(guard); } + return task; } ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); - uint64_t accepted = _wp.load(std::memory_order_relaxed); + uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index e76e3f17a41..4fdc217e701 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -19,8 +20,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - SingleExecutor(init_fun_t func, uint32_t taskLimit); - SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -39,12 +40,22 @@ private: void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); - void wait_for_room(Lock & guard); + Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task); + uint64_t move_to_main_q(Lock & guard, Task::UP task); + void move_overflow_to_main_q(); + void move_overflow_to_main_q(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks() const { + uint64_t numTasks(); + uint64_t numTasks(Lock & guard) const { + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } + uint64_t num_tasks_in_overflow_q(Lock &) const { + return _overflow ? _overflow->size() : 0; + } + uint64_t num_tasks_in_main_q() const { return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); } const double _watermarkRatio; @@ -67,6 +78,7 @@ private: std::atomic<uint32_t> _watermark; const duration _reactionTime; bool _closed; + std::unique_ptr<ArrayQueue<Task::UP>> _overflow; }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 3d24ee87879..5dafd9c5eda 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor public: using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>; UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove) - : _to_remove(to_remove) + : _to_remove(to_remove) {} void process(spi::DocEntry& entry) override { diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp index 73e70e7fd89..8f3dd8ab006 100644 --- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp +++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp @@ -2,11 +2,11 @@ #pragma once -#include <stdint.h> -#include <stdlib.h> +#include "traits.h" +#include <cstdint> +#include <cstdlib> #include <cassert> #include <algorithm> -#include "traits.h" namespace vespalib { diff --git a/zookeeper-server/zookeeper-server-3.6.3/pom.xml b/zookeeper-server/zookeeper-server-3.6.3/pom.xml index a8ad183de4e..f7e6f512f7c 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/pom.xml +++ b/zookeeper-server/zookeeper-server-3.6.3/pom.xml @@ -11,9 +11,6 @@ <artifactId>zookeeper-server-3.6.3</artifactId> <packaging>container-plugin</packaging> <version>7-SNAPSHOT</version> - <properties> - <zookeeper.version>3.6.3</zookeeper.version> - </properties> <dependencies> <dependency> <groupId>com.yahoo.vespa</groupId> @@ -35,7 +32,7 @@ <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> + <version>3.6.3</version> <exclusions> <!-- Container provides wiring for all common log libraries @@ -90,6 +87,7 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> + <arg>-Werror</arg> </compilerArgs> </configuration> </plugin> @@ -99,9 +97,6 @@ <configuration> <redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile> <forkMode>once</forkMode> - <systemPropertyVariables> - <zk-version>${zookeeper.version}</zk-version> - </systemPropertyVariables> </configuration> </plugin> <plugin> diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index 246911fdfc7..c002ffa72ce 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private QuorumPeer peer; + private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); + reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); } @Override public void shutdown() { - peer.shutdown(Duration.ofMinutes(1)); + peer.get().shutdown(Duration.ofMinutes(1)); } @Override public void start(Path configFilePath) { - peer.start(configFilePath); + peer.get().start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index f0a95b70e96..27aa18c64c7 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -2,15 +2,11 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); @Override - public void reconfigure(String connectionSpec, String servers) throws ReconfigException { - try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { + public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + ZooKeeperAdmin zooKeeperAdmin = null; + try { + zooKeeperAdmin = createAdmin(connectionSpec); long fromConfig = -1; // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0) - log.log(Level.INFO, "Applying ZooKeeper config: " + servers); - byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); + byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null); log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - - // Verify by issuing a write operation; this is only accepted once new quorum is obtained. - List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); - zooKeeperAdmin.delete(node, -1); - - log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - } - catch ( KeeperException.ReconfigInProgress - | KeeperException.ConnectionLossException - | KeeperException.NewConfigNoQuorum e) { - throw new ReconfigException(e); - } - catch (KeeperException | IOException | InterruptedException e) { + } catch (KeeperException e) { + if (retryOn(e)) + throw new ReconfigException(e); + else + throw new RuntimeException(e); + } catch (IOException | InterruptedException e) { throw new RuntimeException(e); + } finally { + if (zooKeeperAdmin != null) { + try { + zooKeeperAdmin.close(); + } catch (InterruptedException e) { /* ignore */} + } } } @@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()); } + private static boolean retryOn(KeeperException e) { + return e instanceof KeeperException.ReconfigInProgress || + e instanceof KeeperException.ConnectionLossException || + e instanceof KeeperException.NewConfigNoQuorum; + } + } diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java deleted file mode 100644 index 922c389f94a..00000000000 --- a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java +++ /dev/null @@ -1,258 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.zookeper; - -import com.yahoo.cloud.config.ZookeeperServerConfig; -import com.yahoo.net.HostName; -import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; -import com.yahoo.vespa.zookeeper.Reconfigurer; -import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; -import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toList; -import static org.junit.Assert.assertEquals; - -public class VespaZooKeeperTest { - - static final Path tempDirRoot = getTmpDir(); - static final List<Integer> ports = new ArrayList<>(); - - /** - * Performs dynamic reconfiguration of ZooKeeper servers. - * - * First, a cluster of 3 servers is set up, and some data is written to it. - * Then, 3 new servers are added, and the first 3 marked for retirement; - * this should force the quorum to move the 3 new servers, but not disconnect the old ones. - * Next, the old servers are removed. - * Then, the cluster is reduced to size 1. - * Finally, the cluster grows to size 3 again. - * - * Throughout all of this, quorum should remain, and the data should remain the same. - */ - @Test(timeout = 120_000) - @Ignore // Unstable, some ZK server keeps resetting connections sometimes. - public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { - List<ZooKeeper> keepers = new ArrayList<>(); - for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); - for (int i = 0; i < 8; i++) keepers.get(i).run(); - - // Start the first three servers. - List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); - for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); - for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Wait for all servers to be up and running. - for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Write data to verify later. - String path = writeData(configs.get(0)); - - // Let three new servers join, causing the three older ones to retire and leave the ensemble. - configs = getConfigs(0, 3, 3, 3); - for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); - // The existing servers can't reconfigure and leave before the joiners are up. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Wait for new quorum to be established. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Verify written data is preserved. - verifyData(path, configs.get(3)); - - // Old servers are removed. - configs = getConfigs(3, 0, 3, 0); - for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); - // Old servers shut down, while the newer servers remain. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // Ensure old servers shut down properly. - for (int i = 0; i < 3; i++) keepers.get(i).await(); - // Ensure new servers have reconfigured. - for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Verify written data is preserved. - verifyData(path, configs.get(3)); - - - // Cluster shrinks to a single server. - configs = getConfigs(5, 0, 1, 0); - for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); - for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // We let the remaining server reconfigure the others out before they die. - for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 3; i < 5; i++) keepers.get(i).await(); - verifyData(path, configs.get(5)); - - // Cluster grows to 3 servers again. - configs = getConfigs(5, 0, 3, 2); - for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // Wait for the joiners. - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - verifyData(path, configs.get(7)); - - // Let the remaining servers terminate. - for (int i = 5; i < 8; i++) keepers.get(i).config = null; - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 5; i < 8; i++) keepers.get(i).await(); - } - - static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { - ZooKeeperAdmin admin = createAdmin(config); - List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); - String read = new String(admin.getData(node, false, new Stat()), UTF_8); - assertEquals("hi", read); - return node; - } - - static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { - for (int i = 0; i < 10; i++) { - try { - assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8)); - return; - } - catch (KeeperException.ConnectionLossException e) { - e.printStackTrace(); - Thread.sleep(10 << i); - } - } - } - - static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { - return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), - 10_000, - System.err::println, - new ZkClientConfigBuilder().toConfig()); - } - - static class ZooKeeper { - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final Phaser phaser = new Phaser(2); - final AtomicReference<Future<?>> future = new AtomicReference<>(); - ZookeeperServerConfig config; - - void run() { - future.set(executor.submit(() -> { - Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); - phaser.arriveAndAwaitAdvance(); - while (config != null) { - new ReconfigurableVespaZooKeeperServer(reconfigurer, config); - phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here - phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff - } - reconfigurer.deconstruct(); - })); - } - - void await() throws ExecutionException, InterruptedException, TimeoutException { - future.get().get(30, SECONDS); - } - } - - static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { - return IntStream.rangeClosed(1, removed + retired + active) - .mapToObj(id -> getConfig(removed, retired, active, joining, id)) - .collect(toList()); - } - - // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. - static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { - if (id <= removed) - return null; - - Path tempDir = tempDirRoot.resolve("zookeeper-" + id); - return new ZookeeperServerConfig.Builder() - .clientPort(getPorts(id).get(0)) - .dataDir(tempDir.toString()) - .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) - .myid(id) - .myidFile(tempDir.resolve("myid").toString()) - .dynamicReconfiguration(true) - .server(IntStream.rangeClosed(removed + 1, removed + retired + active) - .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() - .id(i) - .clientPort(getPorts(i).get(0)) - .electionPort(getPorts(i).get(1)) - .quorumPort(getPorts(i).get(2)) - .hostname("localhost") - .joining(i - removed > retired + active - joining) - .retired(i - removed <= retired)) - .collect(toList())) - .build(); - } - - static List<Integer> getPorts(int id) { - if (ports.size() < id * 3) { - int previousPort; - if (ports.isEmpty()) { - String[] version = System.getProperty("zk-version").split("\\."); - int versionPortOffset = 0; - for (String part : version) - versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part)); - previousPort = 20000 + versionPortOffset % 30000; - } - else - previousPort = ports.get(ports.size() - 1); - - for (int i = 0; i < 3; i++) - ports.add(previousPort = nextPort(previousPort)); - } - return ports.subList(id * 3 - 3, id * 3); - } - - static int nextPort(int previousPort) { - for (int j = 1; j <= 30000; j++) { - int port = (previousPort + j); - while (port > 50000) - port -= 30000; - - try (ServerSocket socket = new ServerSocket(port)) { - return socket.getLocalPort(); - } - catch (IOException e) { - System.err.println("Could not bind port " + port + ": " + e); - } - } - throw new RuntimeException("No free ports"); - } - - static Path getTmpDir() { - try { - Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); - tempDir.toFile().deleteOnExit(); - return tempDir.toAbsolutePath(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - -} diff --git a/zookeeper-server/zookeeper-server-3.7.0/pom.xml b/zookeeper-server/zookeeper-server-3.7.0/pom.xml index 01fd83a496b..ac7db35e6af 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/pom.xml +++ b/zookeeper-server/zookeeper-server-3.7.0/pom.xml @@ -11,9 +11,6 @@ <artifactId>zookeeper-server-3.7.0</artifactId> <packaging>container-plugin</packaging> <version>7-SNAPSHOT</version> - <properties> - <zookeeper.version>3.7.0</zookeeper.version> - </properties> <dependencies> <dependency> <groupId>com.yahoo.vespa</groupId> @@ -35,7 +32,7 @@ <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> + <version>3.7.0</version> <exclusions> <!-- Container provides wiring for all common log libraries @@ -90,6 +87,7 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> + <arg>-Werror</arg> </compilerArgs> </configuration> </plugin> @@ -99,9 +97,6 @@ <configuration> <redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile> <forkMode>once</forkMode> - <systemPropertyVariables> - <zk-version>${zookeeper.version}</zk-version> - </systemPropertyVariables> </configuration> </plugin> <plugin> diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index 246911fdfc7..c002ffa72ce 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private QuorumPeer peer; + private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); + reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); } @Override public void shutdown() { - peer.shutdown(Duration.ofMinutes(1)); + peer.get().shutdown(Duration.ofMinutes(1)); } @Override public void start(Path configFilePath) { - peer.start(configFilePath); + peer.get().start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index ae7bf8d84f5..27aa18c64c7 100644 --- a/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.7.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -2,15 +2,11 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,28 +19,27 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); @Override - public void reconfigure(String connectionSpec, String servers) throws ReconfigException { - try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { + public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + ZooKeeperAdmin zooKeeperAdmin = null; + try { + zooKeeperAdmin = createAdmin(connectionSpec); long fromConfig = -1; - // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0). - log.log(Level.INFO, "Applying ZooKeeper config: " + servers); - byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); + // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0) + byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null); log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - - // Verify by issuing a write operation; this is only accepted once new quorum is obtained. - List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); - zooKeeperAdmin.delete(node, -1); - - log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - } - catch ( KeeperException.ReconfigInProgress - | KeeperException.ConnectionLossException - | KeeperException.NewConfigNoQuorum e) { - throw new ReconfigException(e); - } - catch (KeeperException | IOException | InterruptedException e) { + } catch (KeeperException e) { + if (retryOn(e)) + throw new ReconfigException(e); + else + throw new RuntimeException(e); + } catch (IOException | InterruptedException e) { throw new RuntimeException(e); + } finally { + if (zooKeeperAdmin != null) { + try { + zooKeeperAdmin.close(); + } catch (InterruptedException e) { /* ignore */} + } } } @@ -53,5 +48,11 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()); } + private static boolean retryOn(KeeperException e) { + return e instanceof KeeperException.ReconfigInProgress || + e instanceof KeeperException.ConnectionLossException || + e instanceof KeeperException.NewConfigNoQuorum; + } + } diff --git a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java deleted file mode 100644 index 922c389f94a..00000000000 --- a/zookeeper-server/zookeeper-server-3.7.0/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java +++ /dev/null @@ -1,258 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.zookeper; - -import com.yahoo.cloud.config.ZookeeperServerConfig; -import com.yahoo.net.HostName; -import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; -import com.yahoo.vespa.zookeeper.Reconfigurer; -import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; -import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.admin.ZooKeeperAdmin; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toList; -import static org.junit.Assert.assertEquals; - -public class VespaZooKeeperTest { - - static final Path tempDirRoot = getTmpDir(); - static final List<Integer> ports = new ArrayList<>(); - - /** - * Performs dynamic reconfiguration of ZooKeeper servers. - * - * First, a cluster of 3 servers is set up, and some data is written to it. - * Then, 3 new servers are added, and the first 3 marked for retirement; - * this should force the quorum to move the 3 new servers, but not disconnect the old ones. - * Next, the old servers are removed. - * Then, the cluster is reduced to size 1. - * Finally, the cluster grows to size 3 again. - * - * Throughout all of this, quorum should remain, and the data should remain the same. - */ - @Test(timeout = 120_000) - @Ignore // Unstable, some ZK server keeps resetting connections sometimes. - public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { - List<ZooKeeper> keepers = new ArrayList<>(); - for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); - for (int i = 0; i < 8; i++) keepers.get(i).run(); - - // Start the first three servers. - List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); - for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); - for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Wait for all servers to be up and running. - for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Write data to verify later. - String path = writeData(configs.get(0)); - - // Let three new servers join, causing the three older ones to retire and leave the ensemble. - configs = getConfigs(0, 3, 3, 3); - for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); - // The existing servers can't reconfigure and leave before the joiners are up. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Wait for new quorum to be established. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Verify written data is preserved. - verifyData(path, configs.get(3)); - - // Old servers are removed. - configs = getConfigs(3, 0, 3, 0); - for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); - // Old servers shut down, while the newer servers remain. - for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // Ensure old servers shut down properly. - for (int i = 0; i < 3; i++) keepers.get(i).await(); - // Ensure new servers have reconfigured. - for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - - // Verify written data is preserved. - verifyData(path, configs.get(3)); - - - // Cluster shrinks to a single server. - configs = getConfigs(5, 0, 1, 0); - for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); - for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // We let the remaining server reconfigure the others out before they die. - for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 3; i < 5; i++) keepers.get(i).await(); - verifyData(path, configs.get(5)); - - // Cluster grows to 3 servers again. - configs = getConfigs(5, 0, 3, 2); - for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - // Wait for the joiners. - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - verifyData(path, configs.get(7)); - - // Let the remaining servers terminate. - for (int i = 5; i < 8; i++) keepers.get(i).config = null; - for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); - for (int i = 5; i < 8; i++) keepers.get(i).await(); - } - - static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { - ZooKeeperAdmin admin = createAdmin(config); - List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; - String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); - String read = new String(admin.getData(node, false, new Stat()), UTF_8); - assertEquals("hi", read); - return node; - } - - static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { - for (int i = 0; i < 10; i++) { - try { - assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8)); - return; - } - catch (KeeperException.ConnectionLossException e) { - e.printStackTrace(); - Thread.sleep(10 << i); - } - } - } - - static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { - return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), - 10_000, - System.err::println, - new ZkClientConfigBuilder().toConfig()); - } - - static class ZooKeeper { - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final Phaser phaser = new Phaser(2); - final AtomicReference<Future<?>> future = new AtomicReference<>(); - ZookeeperServerConfig config; - - void run() { - future.set(executor.submit(() -> { - Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); - phaser.arriveAndAwaitAdvance(); - while (config != null) { - new ReconfigurableVespaZooKeeperServer(reconfigurer, config); - phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here - phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff - } - reconfigurer.deconstruct(); - })); - } - - void await() throws ExecutionException, InterruptedException, TimeoutException { - future.get().get(30, SECONDS); - } - } - - static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { - return IntStream.rangeClosed(1, removed + retired + active) - .mapToObj(id -> getConfig(removed, retired, active, joining, id)) - .collect(toList()); - } - - // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. - static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { - if (id <= removed) - return null; - - Path tempDir = tempDirRoot.resolve("zookeeper-" + id); - return new ZookeeperServerConfig.Builder() - .clientPort(getPorts(id).get(0)) - .dataDir(tempDir.toString()) - .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) - .myid(id) - .myidFile(tempDir.resolve("myid").toString()) - .dynamicReconfiguration(true) - .server(IntStream.rangeClosed(removed + 1, removed + retired + active) - .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() - .id(i) - .clientPort(getPorts(i).get(0)) - .electionPort(getPorts(i).get(1)) - .quorumPort(getPorts(i).get(2)) - .hostname("localhost") - .joining(i - removed > retired + active - joining) - .retired(i - removed <= retired)) - .collect(toList())) - .build(); - } - - static List<Integer> getPorts(int id) { - if (ports.size() < id * 3) { - int previousPort; - if (ports.isEmpty()) { - String[] version = System.getProperty("zk-version").split("\\."); - int versionPortOffset = 0; - for (String part : version) - versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part)); - previousPort = 20000 + versionPortOffset % 30000; - } - else - previousPort = ports.get(ports.size() - 1); - - for (int i = 0; i < 3; i++) - ports.add(previousPort = nextPort(previousPort)); - } - return ports.subList(id * 3 - 3, id * 3); - } - - static int nextPort(int previousPort) { - for (int j = 1; j <= 30000; j++) { - int port = (previousPort + j); - while (port > 50000) - port -= 30000; - - try (ServerSocket socket = new ServerSocket(port)) { - return socket.getLocalPort(); - } - catch (IOException e) { - System.err.println("Could not bind port " + port + ": " + e); - } - } - throw new RuntimeException("No free ports"); - } - - static Path getTmpDir() { - try { - Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); - tempDir.toFile().deleteOnExit(); - return tempDir.toAbsolutePath(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - -} diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java index 8b22f658a94..39d0312915f 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java @@ -86,7 +86,7 @@ public class Configurator { sb.append("reconfigEnabled=true").append("\n"); sb.append("skipACL=yes").append("\n"); ensureThisServerIsRepresented(config.myid(), config.server()); - config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n")); + config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort())); sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig)); sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig)); return sb.toString(); @@ -111,8 +111,7 @@ public class Configurator { } } - static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) { - StringBuilder sb = new StringBuilder(); + private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) { sb.append("server.") .append(server.id()) .append("=") @@ -121,7 +120,7 @@ public class Configurator { .append(server.quorumPort()) .append(":") .append(server.electionPort()); - if (joining) { + if (server.joining()) { // Servers that are joining an existing cluster must be marked as observers. Note that this will NOT // actually make the server an observer, but prevent it from forming an ensemble independently of the // existing cluster. @@ -131,8 +130,8 @@ public class Configurator { .append("observer"); } sb.append(";") - .append(server.clientPort()); - return sb.toString(); + .append(clientPort) + .append("\n"); } static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) { diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index 604419c063d..d4223e4d815 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; - -import static com.yahoo.vespa.zookeeper.Configurator.serverSpec; -import static java.util.stream.Collectors.toList; +import java.util.stream.Collectors; /** * Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component @@ -50,22 +50,17 @@ public class Reconfigurer extends AbstractComponent { this.sleeper = Objects.requireNonNull(sleeper); } - @Override - public void deconstruct() { - shutdown(); - } - - QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, - Supplier<QuorumPeer> quorumPeerCreator) { + void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, + Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) { if (zooKeeperRunner == null) { - peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers. + peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers. zooKeeperRunner = startServer(newConfig, server); } + quorumPeerSetter.accept(peer); - if (newConfig.dynamicReconfiguration()) { + if (shouldReconfigure(newConfig)) { reconfigure(newConfig); } - return peer; } ZookeeperServerConfig activeConfig() { @@ -78,30 +73,42 @@ public class Reconfigurer extends AbstractComponent { } } + private boolean shouldReconfigure(ZookeeperServerConfig newConfig) { + if (!newConfig.dynamicReconfiguration()) return false; + if (activeConfig == null) return false; + return !newConfig.equals(activeConfig()); + } + private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) { ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server); activeConfig = zookeeperServerConfig; return runner; } - // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>) - // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers - // TODO jonmv: wrap Curator in Provider, for Curator shutdown private void reconfigure(ZookeeperServerConfig newConfig) { Instant reconfigTriggered = Instant.now(); - String newServers = String.join(",", servers(newConfig)); - log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." + + // No point in trying to reconfigure if there is only one server in the new ensemble, + // the others will be shutdown or are about to be shutdown + if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO); + + List<String> newServers = difference(servers(newConfig), servers(activeConfig)); + String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig)); + String joiningServersSpec = String.join(",", newServers); + leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds; + joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec; + log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec + + "\nleaving servers: " + leavingServerIds + "\nServers in active config:" + servers(activeConfig) + "\nServers in new config:" + servers(newConfig)); String connectionSpec = localConnectionSpec(activeConfig); Instant now = Instant.now(); - Duration reconfigTimeout = reconfigTimeout(newConfig.server().size()); + Duration reconfigTimeout = reconfigTimeout(newServers.size()); Instant end = now.plus(reconfigTimeout); // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed for (int attempt = 1; now.isBefore(end); attempt++) { try { Instant reconfigStarted = Instant.now(); - vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers); + vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds); Instant reconfigEnded = Instant.now(); log.log(Level.INFO, "Reconfiguration completed in " + Duration.between(reconfigTriggered, reconfigEnded) + @@ -140,11 +147,24 @@ public class Reconfigurer extends AbstractComponent { return HostName.getLocalhost() + ":" + config.clientPort(); } + private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) { + return difference(servers(oldConfig), servers(newConfig)).stream() + .map(server -> server.substring(0, server.indexOf('='))) + .collect(Collectors.toList()); + } + private static List<String> servers(ZookeeperServerConfig config) { + // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format return config.server().stream() - .filter(server -> ! server.retired()) - .map(server -> serverSpec(server, config.clientPort(), false)) - .collect(toList()); + .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" + + server.electionPort() + ";" + config.clientPort()) + .collect(Collectors.toList()); + } + + private static <T> List<T> difference(List<T> list1, List<T> list2) { + List<T> copy = new ArrayList<>(list1); + copy.removeAll(list2); + return copy; } } diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java index 59c9628bcab..8809dca0def 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java @@ -10,7 +10,7 @@ import java.time.Duration; */ public interface VespaZooKeeperAdmin { - void reconfigure(String connectionSpec, String servers) throws ReconfigException; + void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException; /* Timeout for connecting to ZooKeeper */ default Duration sessionTimeout() { return Duration.ofSeconds(30); } diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java index 760c326cf5d..1211624e3d6 100644 --- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -17,6 +17,7 @@ import java.util.Arrays; import java.util.concurrent.Phaser; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; /** @@ -50,26 +51,31 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", - reconfigurer.servers()); - assertEquals(2, reconfigurer.reconfigurations()); + assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); + assertNull("No servers are leaving", reconfigurer.leavingServers()); + assertEquals(1, reconfigurer.reconfigurations()); + assertSame(nextConfig, reconfigurer.activeConfig()); + + // No reconfiguration happens with same config + reconfigurer.startOrReconfigure(nextConfig); + assertEquals(1, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster shrinks nextConfig = createConfig(3, true); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(3, reconfigurer.reconfigurations()); + assertEquals(2, reconfigurer.reconfigurations()); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181", - reconfigurer.servers()); + assertNull("No servers are joining", reconfigurer.joiningServers()); + assertEquals("3,4", reconfigurer.leavingServers()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster loses node1, but node3 joins. Indices are shuffled. nextConfig = createConfig(3, true, 1); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(4, reconfigurer.reconfigurations()); - assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181", - reconfigurer.servers()); + assertEquals(3, reconfigurer.reconfigurations()); + assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers()); + assertEquals("1,2", reconfigurer.leavingServers()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -83,9 +89,9 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", - reconfigurer.servers()); - assertEquals(2, reconfigurer.reconfigurations()); + assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); + assertNull("No servers are leaving", reconfigurer.leavingServers()); + assertEquals(1, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -106,27 +112,24 @@ public class ReconfigurerTest { reconfigurer.shutdown(); } - private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) { - Arrays.sort(retiredIndices); + private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) { + Arrays.sort(skipIndices); ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder(); builder.zooKeeperConfigFile(cfgFile.getAbsolutePath()); builder.myidFile(idFile.getAbsolutePath()); for (int i = 0, index = 0; i < numberOfServers; i++, index++) { - boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0; - if (retired) i--; - builder.server(newServer(i, "node" + index, retired)); + while (Arrays.binarySearch(skipIndices, index) >= 0) index++; + builder.server(newServer(i, "node" + index)); } - builder.myid(0); builder.dynamicReconfiguration(dynamicReconfiguration); return builder.build(); } - private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) { + private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) { ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder(); builder.id(id); builder.hostname(hostName); - builder.retired(retired); return builder; } @@ -139,7 +142,6 @@ public class ReconfigurerTest { private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer { private final TestableVespaZooKeeperAdmin zooKeeperAdmin; - private final Phaser phaser = new Phaser(2); private QuorumPeer serverPeer; TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) { @@ -154,16 +156,19 @@ public class ReconfigurerTest { } void startOrReconfigure(ZookeeperServerConfig newConfig) { - serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new); - phaser.arriveAndDeregister(); + startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer); } String connectionSpec() { return zooKeeperAdmin.connectionSpec; } - String servers() { - return zooKeeperAdmin.servers; + String joiningServers() { + return zooKeeperAdmin.joiningServers; + } + + String leavingServers() { + return zooKeeperAdmin.leavingServers; } int reconfigurations() { @@ -172,14 +177,10 @@ public class ReconfigurerTest { @Override public void shutdown() { - phaser.arriveAndAwaitAdvance(); serverPeer.shutdown(Duration.ofSeconds(1)); } @Override - public void start(Path configFilePath) { - phaser.arriveAndAwaitAdvance(); - serverPeer.start(configFilePath); - } + public void start(Path configFilePath) { serverPeer.start(configFilePath); } @Override public boolean reconfigurable() { @@ -191,7 +192,8 @@ public class ReconfigurerTest { private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin { String connectionSpec; - String servers; + String joiningServers; + String leavingServers; int reconfigurations = 0; private int failures = 0; @@ -203,11 +205,12 @@ public class ReconfigurerTest { } @Override - public void reconfigure(String connectionSpec, String servers) throws ReconfigException { + public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { if (++attempts < failures) throw new ReconfigException("Reconfig failed"); this.connectionSpec = connectionSpec; - this.servers = servers; + this.joiningServers = joiningServers; + this.leavingServers = leavingServers; this.reconfigurations++; } |