summaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-common
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-12-07 18:47:02 +0100
committerHarald Musum <musum@verizonmedia.com>2020-12-07 18:47:02 +0100
commit6e44ab449d2bb686127192c71e4dbe87091be6ee (patch)
tree47ac8cccd56ceb0eab04433b418e420f62175a8e /zookeeper-server/zookeeper-server-common
parentb0f74766d1b2af781d93da7d2ad89c89c8e9825b (diff)
Move more code to zookeeper-server-common
Diffstat (limited to 'zookeeper-server/zookeeper-server-common')
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java153
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java62
-rw-r--r--zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java194
3 files changed, 409 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
new file mode 100644
index 00000000000..3466b0a8d22
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -0,0 +1,153 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.net.HostName;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * Starts zookeeper server and supports reconfiguring zookeeper cluster. Created as a component
+ * without any config injected, to make sure that it is not recreated when config changes.
+ *
+ * @author hmusum
+ */
+public class Reconfigurer extends AbstractComponent {
+
+ private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName());
+
+ // How long to wait before triggering reconfig. This is multiplied by the node ID
+ private static final Duration reconfigInterval = Duration.ofSeconds(5);
+
+ // Total timeout for a reconfiguration
+ private static final Duration reconfigTimeout = Duration.ofSeconds(30);
+
+ // How long to wait between each retry
+ private static final Duration retryWait = Duration.ofSeconds(1);
+
+ private ZooKeeperRunner zooKeeperRunner;
+ private ZookeeperServerConfig activeConfig;
+
+ protected final ZkAdmin zkAdmin;
+
+ @Inject
+ public Reconfigurer(ZkAdmin zkAdmin) {
+ this.zkAdmin = zkAdmin;
+ log.log(Level.FINE, "Created ZooKeeperReconfigurer");
+ }
+
+ void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) {
+ startOrReconfigure(newConfig, Reconfigurer::defaultSleeper, server);
+ }
+
+ void startOrReconfigure(ZookeeperServerConfig newConfig, Consumer<Duration> sleeper, VespaZooKeeperServer server) {
+ if (zooKeeperRunner == null)
+ zooKeeperRunner = startServer(newConfig, server);
+
+ if (shouldReconfigure(newConfig))
+ reconfigure(newConfig, sleeper);
+ }
+
+ ZookeeperServerConfig activeConfig() {
+ return activeConfig;
+ }
+
+ void shutdown() {
+ if (zooKeeperRunner != null) {
+ zooKeeperRunner.shutdown();
+ }
+ }
+
+ private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
+ if (!newConfig.dynamicReconfiguration()) return false;
+ if (activeConfig == null) return false;
+ return !newConfig.equals(activeConfig());
+ }
+
+ private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
+ ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
+ activeConfig = zookeeperServerConfig;
+ return runner;
+ }
+
+ private void reconfigure(ZookeeperServerConfig newConfig, Consumer<Duration> sleeper) {
+ Instant reconfigTriggered = Instant.now();
+ String leavingServers = String.join(",", difference(serverIds(activeConfig), serverIds(newConfig)));
+ String joiningServers = String.join(",", difference(servers(newConfig), servers(activeConfig)));
+ leavingServers = leavingServers.isEmpty() ? null : leavingServers;
+ joiningServers = joiningServers.isEmpty() ? null : joiningServers;
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster in " + reconfigWaitPeriod() +
+ ". Joining servers: " + joiningServers + ", leaving servers: " + leavingServers);
+ sleeper.accept(reconfigWaitPeriod());
+ String connectionSpec = localConnectionSpec(activeConfig);
+ Instant end = Instant.now().plus(reconfigTimeout);
+ // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
+ for (int attempts = 1; Instant.now().isBefore(end); attempts++) {
+ try {
+ Instant reconfigStarted = Instant.now();
+ zkAdmin.reconfigure(connectionSpec, joiningServers, leavingServers);
+ Instant reconfigEnded = Instant.now();
+ log.log(Level.INFO, "Reconfiguration completed in " +
+ Duration.between(reconfigTriggered, reconfigEnded) +
+ ", after " + attempts + " attempt(s). ZooKeeper reconfig call took " +
+ Duration.between(reconfigStarted, reconfigEnded));
+ activeConfig = newConfig;
+ return;
+ } catch (ReconfigException e) {
+ log.log(Level.INFO, "Reconfiguration failed. Retrying in " + retryWait + ": " +
+ Exceptions.toMessageString(e));
+ sleeper.accept(retryWait);
+ }
+ }
+ }
+
+ /** Returns how long this node should wait before reconfiguring the cluster */
+ private Duration reconfigWaitPeriod() {
+ if (activeConfig == null) return Duration.ZERO;
+ return reconfigInterval.multipliedBy(activeConfig.myid());
+ }
+
+ private static String localConnectionSpec(ZookeeperServerConfig config) {
+ return HostName.getLocalhost() + ":" + config.clientPort();
+ }
+
+ private static List<String> serverIds(ZookeeperServerConfig config) {
+ return config.server().stream()
+ .map(ZookeeperServerConfig.Server::id)
+ .map(String::valueOf)
+ .collect(Collectors.toList());
+ }
+
+ private static List<String> servers(ZookeeperServerConfig config) {
+ // See https://zookeeper.apache.org/doc/r3.5.8/zookeeperReconfig.html#sc_reconfig_clientport for format
+ return config.server().stream()
+ .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
+ server.electionPort() + ";" + config.clientPort())
+ .collect(Collectors.toList());
+ }
+
+ private static <T> List<T> difference(List<T> list1, List<T> list2) {
+ List<T> copy = new ArrayList<>(list1);
+ copy.removeAll(list2);
+ return copy;
+ }
+
+ private static void defaultSleeper(Duration duration) {
+ try {
+ Thread.sleep(duration.toMillis());
+ } catch (InterruptedException interruptedException) {
+ interruptedException.printStackTrace();
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java
new file mode 100644
index 00000000000..492cfffa1ad
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java
@@ -0,0 +1,62 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.security.tls.TransportSecurityUtils;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.vespa.defaults.Defaults.getDefaults;
+import static com.yahoo.vespa.zookeeper.Configurator.zookeeperServerHostnames;
+
+/**
+ * Writes zookeeper config and starts zookeeper server.
+ *
+ * @author Harald Musum
+ */
+public class ZooKeeperRunner implements Runnable {
+
+ private static final Logger log = java.util.logging.Logger.getLogger(ZooKeeperRunner.class.getName());
+
+ private final ExecutorService executorService;
+ private final ZookeeperServerConfig zookeeperServerConfig;
+ private final VespaZooKeeperServer server;
+
+ public ZooKeeperRunner(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
+ this.zookeeperServerConfig = zookeeperServerConfig;
+ this.server = server;
+ new Configurator(zookeeperServerConfig).writeConfigToDisk(TransportSecurityUtils.getOptions());
+ executorService = Executors.newSingleThreadExecutor(new DaemonThreadFactory("zookeeper server"));
+ executorService.submit(this);
+ }
+
+ void shutdown() {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
+ log.log(Level.WARNING, "Failed to shut down within timeout");
+ }
+ } catch (InterruptedException e) {
+ log.log(Level.INFO, "Interrupted waiting for executor to complete", e);
+ }
+ if ( ! executorService.isTerminated()) {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public void run() {
+ Path path = Paths.get(getDefaults().underVespaHome(zookeeperServerConfig.zooKeeperConfigFile()));
+ log.log(Level.INFO, "Starting ZooKeeper server with config file " + path.toFile().getAbsolutePath() +
+ ". Trying to establish ZooKeeper quorum (members: " + zookeeperServerHostnames(zookeeperServerConfig) + ")");
+ server.start(path);
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
new file mode 100644
index 00000000000..a93cc40d322
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
@@ -0,0 +1,194 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.net.HostName;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests dynamic reconfiguration of zookeeper cluster.
+ *
+ * @author hmusum
+ */
+public class ReconfigurerTest {
+
+ private File cfgFile;
+ private File idFile;
+ private TestableReconfigurer reconfigurer;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Before
+ public void setup() throws IOException {
+ cfgFile = folder.newFile();
+ idFile = folder.newFile("myid");
+ reconfigurer = new TestableReconfigurer(new TestableZkAdmin());
+ }
+
+ @Test
+ public void testReconfigure() {
+ ZookeeperServerConfig initialConfig = createConfig(3, true);
+ reconfigurer.startOrReconfigure(initialConfig);
+ assertSame(initialConfig, reconfigurer.activeConfig());
+
+ // Cluster grows
+ ZookeeperServerConfig nextConfig = createConfig(5, true);
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals("node1:2181", reconfigurer.connectionSpec());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // No reconfiguration happens with same config
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // Cluster shrinks
+ nextConfig = createConfig(3, true);
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("node1:2181", reconfigurer.connectionSpec());
+ assertNull("No servers are joining", reconfigurer.joiningServers());
+ assertEquals("3,4", reconfigurer.leavingServers());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+ }
+
+ @Test
+ public void testReconfigureFailsWithReconfigInProgressThenSucceeds() {
+ reconfigurer = new TestableReconfigurer(new TemporarilyFailZkAdmin());
+ ZookeeperServerConfig initialConfig = createConfig(3, true);
+ reconfigurer.startOrReconfigure(initialConfig);
+ assertSame(initialConfig, reconfigurer.activeConfig());
+
+ ZookeeperServerConfig nextConfig = createConfig(5, true);
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals("node1:2181", reconfigurer.connectionSpec());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+ }
+
+ @Test
+ public void testDynamicReconfigurationDisabled() {
+ ZookeeperServerConfig initialConfig = createConfig(3, false);
+ reconfigurer.startOrReconfigure(initialConfig);
+ assertSame(initialConfig, reconfigurer.activeConfig());
+
+ ZookeeperServerConfig nextConfig = createConfig(5, false);
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertSame(initialConfig, reconfigurer.activeConfig());
+ assertEquals(0, reconfigurer.reconfigurations());
+ }
+
+ @After
+ public void stopReconfigurer() {
+ reconfigurer.shutdown();
+ }
+
+ private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration) {
+ ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder();
+ builder.zooKeeperConfigFile(cfgFile.getAbsolutePath());
+ builder.myidFile(idFile.getAbsolutePath());
+ IntStream.range(0, numberOfServers).forEach(i -> builder.server(newServer(i, "node" + i)));
+ builder.myid(0);
+ builder.dynamicReconfiguration(dynamicReconfiguration);
+ return builder.build();
+ }
+
+ private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) {
+ ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder();
+ builder.id(id);
+ builder.hostname(hostName);
+ return builder;
+ }
+
+ private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer{
+
+ private final TestableZkAdmin zkReconfigurer;
+
+ TestableReconfigurer(TestableZkAdmin zkReconfigurer) {
+ super(zkReconfigurer);
+ this.zkReconfigurer = zkReconfigurer;
+ HostName.setHostNameForTestingOnly("node1");
+ }
+
+ void startOrReconfigure(ZookeeperServerConfig newConfig) {
+ startOrReconfigure(newConfig, this);
+ }
+
+ @Override
+ void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) {
+ super.startOrReconfigure(newConfig, l -> {}, this);
+ }
+
+ String connectionSpec() {
+ return zkReconfigurer.connectionSpec;
+ }
+
+ String joiningServers() {
+ return zkReconfigurer.joiningServers;
+ }
+
+ String leavingServers() {
+ return zkReconfigurer.leavingServers;
+ }
+
+ int reconfigurations() {
+ return zkReconfigurer.reconfigurations;
+ }
+
+ @Override
+ public void start(Path path) { }
+
+ }
+
+ private static class TestableZkAdmin implements ZkAdmin {
+
+ String connectionSpec;
+ String joiningServers;
+ String leavingServers;
+ int reconfigurations = 0;
+
+ @Override
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ this.connectionSpec = connectionSpec;
+ this.joiningServers = joiningServers;
+ this.leavingServers = leavingServers;
+ this.reconfigurations++;
+ }
+
+ }
+
+ // Fails 3 times with KeeperException.ReconfigInProgress(), then succeeds
+ private static class TemporarilyFailZkAdmin extends TestableZkAdmin {
+
+ private int attempts = 0;
+
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
+ if (++attempts < 3)
+ throw new ReconfigException("Reconfig failed");
+ else
+ super.reconfigure(connectionSpec, joiningServers, leavingServers);
+ }
+
+ }
+
+
+}