From fd5a6d5401117b4351e0a4afd588dd4ca63046e5 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 17 Jan 2022 09:33:31 +0100 Subject: Revert "Revert "Jonmv/reapply zk changes 2"" This reverts commit 810da357155a349884b862de87e18de87ed54b35. --- .../com/yahoo/vespa/zookeeper/Configurator.java | 11 ++-- .../com/yahoo/vespa/zookeeper/Reconfigurer.java | 66 ++++++++------------- .../yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java | 2 +- .../yahoo/vespa/zookeeper/ReconfigurerTest.java | 69 +++++++++++----------- 4 files changed, 63 insertions(+), 85 deletions(-) (limited to 'zookeeper-server/zookeeper-server-common') diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java index 39d0312915f..8b22f658a94 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java @@ -86,7 +86,7 @@ public class Configurator { sb.append("reconfigEnabled=true").append("\n"); sb.append("skipACL=yes").append("\n"); ensureThisServerIsRepresented(config.myid(), config.server()); - config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort())); + config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n")); sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig)); sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig)); return sb.toString(); @@ -111,7 +111,8 @@ public class Configurator { } } - private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) { + static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) { + StringBuilder sb = new StringBuilder(); sb.append("server.") .append(server.id()) .append("=") @@ -120,7 +121,7 @@ public class Configurator { .append(server.quorumPort()) .append(":") .append(server.electionPort()); - if (server.joining()) { + if (joining) { // Servers that are joining an existing cluster must be marked as observers. Note that this will NOT // actually make the server an observer, but prevent it from forming an ensemble independently of the // existing cluster. @@ -130,8 +131,8 @@ public class Configurator { .append("observer"); } sb.append(";") - .append(clientPort) - .append("\n"); + .append(server.clientPort()); + return sb.toString(); } static List zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) { diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index d4223e4d815..604419c063d 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; -import java.util.stream.Collectors; + +import static com.yahoo.vespa.zookeeper.Configurator.serverSpec; +import static java.util.stream.Collectors.toList; /** * Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component @@ -50,17 +50,22 @@ public class Reconfigurer extends AbstractComponent { this.sleeper = Objects.requireNonNull(sleeper); } - void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, - Supplier quorumPeerGetter, Consumer quorumPeerSetter) { + @Override + public void deconstruct() { + shutdown(); + } + + QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, + Supplier quorumPeerCreator) { if (zooKeeperRunner == null) { - peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers. + peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers. zooKeeperRunner = startServer(newConfig, server); } - quorumPeerSetter.accept(peer); - if (shouldReconfigure(newConfig)) { + if (newConfig.dynamicReconfiguration()) { reconfigure(newConfig); } + return peer; } ZookeeperServerConfig activeConfig() { @@ -73,42 +78,30 @@ public class Reconfigurer extends AbstractComponent { } } - private boolean shouldReconfigure(ZookeeperServerConfig newConfig) { - if (!newConfig.dynamicReconfiguration()) return false; - if (activeConfig == null) return false; - return !newConfig.equals(activeConfig()); - } - private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) { ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server); activeConfig = zookeeperServerConfig; return runner; } + // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.) + // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers + // TODO jonmv: wrap Curator in Provider, for Curator shutdown private void reconfigure(ZookeeperServerConfig newConfig) { Instant reconfigTriggered = Instant.now(); - // No point in trying to reconfigure if there is only one server in the new ensemble, - // the others will be shutdown or are about to be shutdown - if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO); - - List newServers = difference(servers(newConfig), servers(activeConfig)); - String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig)); - String joiningServersSpec = String.join(",", newServers); - leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds; - joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec; - log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec + - "\nleaving servers: " + leavingServerIds + + String newServers = String.join(",", servers(newConfig)); + log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." + "\nServers in active config:" + servers(activeConfig) + "\nServers in new config:" + servers(newConfig)); String connectionSpec = localConnectionSpec(activeConfig); Instant now = Instant.now(); - Duration reconfigTimeout = reconfigTimeout(newServers.size()); + Duration reconfigTimeout = reconfigTimeout(newConfig.server().size()); Instant end = now.plus(reconfigTimeout); // Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed for (int attempt = 1; now.isBefore(end); attempt++) { try { Instant reconfigStarted = Instant.now(); - vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds); + vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers); Instant reconfigEnded = Instant.now(); log.log(Level.INFO, "Reconfiguration completed in " + Duration.between(reconfigTriggered, reconfigEnded) + @@ -147,24 +140,11 @@ public class Reconfigurer extends AbstractComponent { return HostName.getLocalhost() + ":" + config.clientPort(); } - private static List serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) { - return difference(servers(oldConfig), servers(newConfig)).stream() - .map(server -> server.substring(0, server.indexOf('='))) - .collect(Collectors.toList()); - } - private static List servers(ZookeeperServerConfig config) { - // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format return config.server().stream() - .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" + - server.electionPort() + ";" + config.clientPort()) - .collect(Collectors.toList()); - } - - private static List difference(List list1, List list2) { - List copy = new ArrayList<>(list1); - copy.removeAll(list2); - return copy; + .filter(server -> ! server.retired()) + .map(server -> serverSpec(server, config.clientPort(), false)) + .collect(toList()); } } diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java index 8809dca0def..59c9628bcab 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java @@ -10,7 +10,7 @@ import java.time.Duration; */ public interface VespaZooKeeperAdmin { - void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException; + void reconfigure(String connectionSpec, String servers) throws ReconfigException; /* Timeout for connecting to ZooKeeper */ default Duration sessionTimeout() { return Duration.ofSeconds(30); } diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java index 1211624e3d6..760c326cf5d 100644 --- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -17,7 +17,6 @@ import java.util.Arrays; import java.util.concurrent.Phaser; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; /** @@ -51,31 +50,26 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); - assertNull("No servers are leaving", reconfigurer.leavingServers()); - assertEquals(1, reconfigurer.reconfigurations()); - assertSame(nextConfig, reconfigurer.activeConfig()); - - // No reconfiguration happens with same config - reconfigurer.startOrReconfigure(nextConfig); - assertEquals(1, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", + reconfigurer.servers()); + assertEquals(2, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster shrinks nextConfig = createConfig(3, true); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(2, reconfigurer.reconfigurations()); + assertEquals(3, reconfigurer.reconfigurations()); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertNull("No servers are joining", reconfigurer.joiningServers()); - assertEquals("3,4", reconfigurer.leavingServers()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181", + reconfigurer.servers()); assertSame(nextConfig, reconfigurer.activeConfig()); // Cluster loses node1, but node3 joins. Indices are shuffled. nextConfig = createConfig(3, true, 1); reconfigurer.startOrReconfigure(nextConfig); - assertEquals(3, reconfigurer.reconfigurations()); - assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers()); - assertEquals("1,2", reconfigurer.leavingServers()); + assertEquals(4, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181", + reconfigurer.servers()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -89,9 +83,9 @@ public class ReconfigurerTest { ZookeeperServerConfig nextConfig = createConfig(5, true); reconfigurer.startOrReconfigure(nextConfig); assertEquals("node1:2181", reconfigurer.connectionSpec()); - assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers()); - assertNull("No servers are leaving", reconfigurer.leavingServers()); - assertEquals(1, reconfigurer.reconfigurations()); + assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181", + reconfigurer.servers()); + assertEquals(2, reconfigurer.reconfigurations()); assertSame(nextConfig, reconfigurer.activeConfig()); } @@ -112,24 +106,27 @@ public class ReconfigurerTest { reconfigurer.shutdown(); } - private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) { - Arrays.sort(skipIndices); + private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) { + Arrays.sort(retiredIndices); ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder(); builder.zooKeeperConfigFile(cfgFile.getAbsolutePath()); builder.myidFile(idFile.getAbsolutePath()); for (int i = 0, index = 0; i < numberOfServers; i++, index++) { - while (Arrays.binarySearch(skipIndices, index) >= 0) index++; - builder.server(newServer(i, "node" + index)); + boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0; + if (retired) i--; + builder.server(newServer(i, "node" + index, retired)); } + builder.myid(0); builder.dynamicReconfiguration(dynamicReconfiguration); return builder.build(); } - private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) { + private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) { ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder(); builder.id(id); builder.hostname(hostName); + builder.retired(retired); return builder; } @@ -142,6 +139,7 @@ public class ReconfigurerTest { private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer { private final TestableVespaZooKeeperAdmin zooKeeperAdmin; + private final Phaser phaser = new Phaser(2); private QuorumPeer serverPeer; TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) { @@ -156,19 +154,16 @@ public class ReconfigurerTest { } void startOrReconfigure(ZookeeperServerConfig newConfig) { - startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer); + serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new); + phaser.arriveAndDeregister(); } String connectionSpec() { return zooKeeperAdmin.connectionSpec; } - String joiningServers() { - return zooKeeperAdmin.joiningServers; - } - - String leavingServers() { - return zooKeeperAdmin.leavingServers; + String servers() { + return zooKeeperAdmin.servers; } int reconfigurations() { @@ -177,10 +172,14 @@ public class ReconfigurerTest { @Override public void shutdown() { + phaser.arriveAndAwaitAdvance(); serverPeer.shutdown(Duration.ofSeconds(1)); } @Override - public void start(Path configFilePath) { serverPeer.start(configFilePath); } + public void start(Path configFilePath) { + phaser.arriveAndAwaitAdvance(); + serverPeer.start(configFilePath); + } @Override public boolean reconfigurable() { @@ -192,8 +191,7 @@ public class ReconfigurerTest { private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin { String connectionSpec; - String joiningServers; - String leavingServers; + String servers; int reconfigurations = 0; private int failures = 0; @@ -205,12 +203,11 @@ public class ReconfigurerTest { } @Override - public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { if (++attempts < failures) throw new ReconfigException("Reconfig failed"); this.connectionSpec = connectionSpec; - this.joiningServers = joiningServers; - this.leavingServers = leavingServers; + this.servers = servers; this.reconfigurations++; } -- cgit v1.2.3