From 6e44ab449d2bb686127192c71e4dbe87091be6ee Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Mon, 7 Dec 2020 18:47:02 +0100 Subject: Move more code to zookeeper-server-common --- .../com/yahoo/vespa/zookeeper/Reconfigurer.java | 153 ++++++++++++++++ .../com/yahoo/vespa/zookeeper/ZooKeeperRunner.java | 62 +++++++ .../yahoo/vespa/zookeeper/ReconfigurerTest.java | 194 +++++++++++++++++++++ 3 files changed, 409 insertions(+) create mode 100644 zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java create mode 100644 zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java create mode 100644 zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java (limited to 'zookeeper-server/zookeeper-server-common') diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java new file mode 100644 index 00000000000..3466b0a8d22 --- /dev/null +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -0,0 +1,153 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.google.inject.Inject; +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.net.HostName; +import com.yahoo.yolean.Exceptions; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Starts zookeeper server and supports reconfiguring zookeeper cluster. Created as a component + * without any config injected, to make sure that it is not recreated when config changes. + * + * @author hmusum + */ +public class Reconfigurer extends AbstractComponent { + + private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName()); + + // How long to wait before triggering reconfig. This is multiplied by the node ID + private static final Duration reconfigInterval = Duration.ofSeconds(5); + + // Total timeout for a reconfiguration + private static final Duration reconfigTimeout = Duration.ofSeconds(30); + + // How long to wait between each retry + private static final Duration retryWait = Duration.ofSeconds(1); + + private ZooKeeperRunner zooKeeperRunner; + private ZookeeperServerConfig activeConfig; + + protected final ZkAdmin zkAdmin; + + @Inject + public Reconfigurer(ZkAdmin zkAdmin) { + this.zkAdmin = zkAdmin; + log.log(Level.FINE, "Created ZooKeeperReconfigurer"); + } + + void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) { + startOrReconfigure(newConfig, Reconfigurer::defaultSleeper, server); + } + + void startOrReconfigure(ZookeeperServerConfig newConfig, Consumer sleeper, VespaZooKeeperServer server) { + if (zooKeeperRunner == null) + zooKeeperRunner = startServer(newConfig, server); + + if (shouldReconfigure(newConfig)) + reconfigure(newConfig, sleeper); + } + + ZookeeperServerConfig activeConfig() { + return activeConfig; + } + + void shutdown() { + if (zooKeeperRunner != null) { + zooKeeperRunner.shutdown(); + } + } + + private boolean shouldReconfigure(ZookeeperServerConfig newConfig) { + if (!newConfig.dynamicReconfiguration()) return false; + if (activeConfig == null) return false; + return !newConfig.equals(activeConfig()); + } + + private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) { + ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server); + activeConfig = zookeeperServerConfig; + return runner; + } + + private void reconfigure(ZookeeperServerConfig newConfig, Consumer sleeper) { + Instant reconfigTriggered = Instant.now(); + String leavingServers = String.join(",", difference(serverIds(activeConfig), serverIds(newConfig))); + String joiningServers = String.join(",", difference(servers(newConfig), servers(activeConfig))); + leavingServers = leavingServers.isEmpty() ? null : leavingServers; + joiningServers = joiningServers.isEmpty() ? null : joiningServers; + log.log(Level.INFO, "Will reconfigure ZooKeeper cluster in " + reconfigWaitPeriod() + + ". Joining servers: " + joiningServers + ", leaving servers: " + leavingServers); + sleeper.accept(reconfigWaitPeriod()); + String connectionSpec = localConnectionSpec(activeConfig); + Instant end = Instant.now().plus(reconfigTimeout); + // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed + for (int attempts = 1; Instant.now().isBefore(end); attempts++) { + try { + Instant reconfigStarted = Instant.now(); + zkAdmin.reconfigure(connectionSpec, joiningServers, leavingServers); + Instant reconfigEnded = Instant.now(); + log.log(Level.INFO, "Reconfiguration completed in " + + Duration.between(reconfigTriggered, reconfigEnded) + + ", after " + attempts + " attempt(s). ZooKeeper reconfig call took " + + Duration.between(reconfigStarted, reconfigEnded)); + activeConfig = newConfig; + return; + } catch (ReconfigException e) { + log.log(Level.INFO, "Reconfiguration failed. Retrying in " + retryWait + ": " + + Exceptions.toMessageString(e)); + sleeper.accept(retryWait); + } + } + } + + /** Returns how long this node should wait before reconfiguring the cluster */ + private Duration reconfigWaitPeriod() { + if (activeConfig == null) return Duration.ZERO; + return reconfigInterval.multipliedBy(activeConfig.myid()); + } + + private static String localConnectionSpec(ZookeeperServerConfig config) { + return HostName.getLocalhost() + ":" + config.clientPort(); + } + + private static List serverIds(ZookeeperServerConfig config) { + return config.server().stream() + .map(ZookeeperServerConfig.Server::id) + .map(String::valueOf) + .collect(Collectors.toList()); + } + + private static List servers(ZookeeperServerConfig config) { + // See https://zookeeper.apache.org/doc/r3.5.8/zookeeperReconfig.html#sc_reconfig_clientport for format + return config.server().stream() + .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" + + server.electionPort() + ";" + config.clientPort()) + .collect(Collectors.toList()); + } + + private static List difference(List list1, List list2) { + List copy = new ArrayList<>(list1); + copy.removeAll(list2); + return copy; + } + + private static void defaultSleeper(Duration duration) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java new file mode 100644 index 00000000000..492cfffa1ad --- /dev/null +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java @@ -0,0 +1,62 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.security.tls.TransportSecurityUtils; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.vespa.defaults.Defaults.getDefaults; +import static com.yahoo.vespa.zookeeper.Configurator.zookeeperServerHostnames; + +/** + * Writes zookeeper config and starts zookeeper server. + * + * @author Harald Musum + */ +public class ZooKeeperRunner implements Runnable { + + private static final Logger log = java.util.logging.Logger.getLogger(ZooKeeperRunner.class.getName()); + + private final ExecutorService executorService; + private final ZookeeperServerConfig zookeeperServerConfig; + private final VespaZooKeeperServer server; + + public ZooKeeperRunner(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) { + this.zookeeperServerConfig = zookeeperServerConfig; + this.server = server; + new Configurator(zookeeperServerConfig).writeConfigToDisk(TransportSecurityUtils.getOptions()); + executorService = Executors.newSingleThreadExecutor(new DaemonThreadFactory("zookeeper server")); + executorService.submit(this); + } + + void shutdown() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) { + log.log(Level.WARNING, "Failed to shut down within timeout"); + } + } catch (InterruptedException e) { + log.log(Level.INFO, "Interrupted waiting for executor to complete", e); + } + if ( ! executorService.isTerminated()) { + executorService.shutdownNow(); + } + } + + @Override + public void run() { + Path path = Paths.get(getDefaults().underVespaHome(zookeeperServerConfig.zooKeeperConfigFile())); + log.log(Level.INFO, "Starting ZooKeeper server with config file " + path.toFile().getAbsolutePath() + + ". Trying to establish ZooKeeper quorum (members: " + zookeeperServerHostnames(zookeeperServerConfig) + ")"); + server.start(path); + } + +} diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java new file mode 100644 index 00000000000..a93cc40d322 --- /dev/null +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -0,0 +1,194 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests dynamic reconfiguration of zookeeper cluster. + * + * @author hmusum + */ +public class ReconfigurerTest { + + private File cfgFile; + private File idFile; + private TestableReconfigurer reconfigurer; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() throws IOException { + cfgFile = folder.newFile(); + idFile = folder.newFile("myid"); + reconfigurer = new TestableReconfigurer(new TestableZkAdmin()); + } + + @Test + public void testReconfigure() { + ZookeeperServerConfig initialConfig = createConfig(3, true); + reconfigurer.startOrReconfigure(initialConfig); + assertSame(initialConfig, reconfigurer.activeConfig()); + + // Cluster grows + ZookeeperServerConfig nextConfig = createConfig(5, true); + reconfigurer.startOrReconfigure(nextConfig); + assertEquals("node1:2181", reconfigurer.connectionSpec()); + assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); + assertNull("No servers are leaving", reconfigurer.leavingServers()); + assertEquals(1, reconfigurer.reconfigurations()); + assertSame(nextConfig, reconfigurer.activeConfig()); + + // No reconfiguration happens with same config + reconfigurer.startOrReconfigure(nextConfig); + assertEquals(1, reconfigurer.reconfigurations()); + assertSame(nextConfig, reconfigurer.activeConfig()); + + // Cluster shrinks + nextConfig = createConfig(3, true); + reconfigurer.startOrReconfigure(nextConfig); + assertEquals(2, reconfigurer.reconfigurations()); + assertEquals("node1:2181", reconfigurer.connectionSpec()); + assertNull("No servers are joining", reconfigurer.joiningServers()); + assertEquals("3,4", reconfigurer.leavingServers()); + assertSame(nextConfig, reconfigurer.activeConfig()); + } + + @Test + public void testReconfigureFailsWithReconfigInProgressThenSucceeds() { + reconfigurer = new TestableReconfigurer(new TemporarilyFailZkAdmin()); + ZookeeperServerConfig initialConfig = createConfig(3, true); + reconfigurer.startOrReconfigure(initialConfig); + assertSame(initialConfig, reconfigurer.activeConfig()); + + ZookeeperServerConfig nextConfig = createConfig(5, true); + reconfigurer.startOrReconfigure(nextConfig); + assertEquals("node1:2181", reconfigurer.connectionSpec()); + assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); + assertNull("No servers are leaving", reconfigurer.leavingServers()); + assertEquals(1, reconfigurer.reconfigurations()); + assertSame(nextConfig, reconfigurer.activeConfig()); + } + + @Test + public void testDynamicReconfigurationDisabled() { + ZookeeperServerConfig initialConfig = createConfig(3, false); + reconfigurer.startOrReconfigure(initialConfig); + assertSame(initialConfig, reconfigurer.activeConfig()); + + ZookeeperServerConfig nextConfig = createConfig(5, false); + reconfigurer.startOrReconfigure(nextConfig); + assertSame(initialConfig, reconfigurer.activeConfig()); + assertEquals(0, reconfigurer.reconfigurations()); + } + + @After + public void stopReconfigurer() { + reconfigurer.shutdown(); + } + + private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration) { + ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder(); + builder.zooKeeperConfigFile(cfgFile.getAbsolutePath()); + builder.myidFile(idFile.getAbsolutePath()); + IntStream.range(0, numberOfServers).forEach(i -> builder.server(newServer(i, "node" + i))); + builder.myid(0); + builder.dynamicReconfiguration(dynamicReconfiguration); + return builder.build(); + } + + private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) { + ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder(); + builder.id(id); + builder.hostname(hostName); + return builder; + } + + private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer{ + + private final TestableZkAdmin zkReconfigurer; + + TestableReconfigurer(TestableZkAdmin zkReconfigurer) { + super(zkReconfigurer); + this.zkReconfigurer = zkReconfigurer; + HostName.setHostNameForTestingOnly("node1"); + } + + void startOrReconfigure(ZookeeperServerConfig newConfig) { + startOrReconfigure(newConfig, this); + } + + @Override + void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) { + super.startOrReconfigure(newConfig, l -> {}, this); + } + + String connectionSpec() { + return zkReconfigurer.connectionSpec; + } + + String joiningServers() { + return zkReconfigurer.joiningServers; + } + + String leavingServers() { + return zkReconfigurer.leavingServers; + } + + int reconfigurations() { + return zkReconfigurer.reconfigurations; + } + + @Override + public void start(Path path) { } + + } + + private static class TestableZkAdmin implements ZkAdmin { + + String connectionSpec; + String joiningServers; + String leavingServers; + int reconfigurations = 0; + + @Override + public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + this.connectionSpec = connectionSpec; + this.joiningServers = joiningServers; + this.leavingServers = leavingServers; + this.reconfigurations++; + } + + } + + // Fails 3 times with KeeperException.ReconfigInProgress(), then succeeds + private static class TemporarilyFailZkAdmin extends TestableZkAdmin { + + private int attempts = 0; + + public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + if (++attempts < 3) + throw new ReconfigException("Reconfig failed"); + else + super.reconfigure(connectionSpec, joiningServers, leavingServers); + } + + } + + +} -- cgit v1.2.3