diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.6.3')
4 files changed, 291 insertions, 29 deletions
diff --git a/zookeeper-server/zookeeper-server-3.6.3/pom.xml b/zookeeper-server/zookeeper-server-3.6.3/pom.xml index f7e6f512f7c..a8ad183de4e 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/pom.xml +++ b/zookeeper-server/zookeeper-server-3.6.3/pom.xml @@ -11,6 +11,9 @@ <artifactId>zookeeper-server-3.6.3</artifactId> <packaging>container-plugin</packaging> <version>7-SNAPSHOT</version> + <properties> + <zookeeper.version>3.6.3</zookeeper.version> + </properties> <dependencies> <dependency> <groupId>com.yahoo.vespa</groupId> @@ -32,7 +35,7 @@ <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> - <version>3.6.3</version> + <version>${zookeeper.version}</version> <exclusions> <!-- Container provides wiring for all common log libraries @@ -87,7 +90,6 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> - <arg>-Werror</arg> </compilerArgs> </configuration> </plugin> @@ -97,6 +99,9 @@ <configuration> <redirectTestOutputToFile>${test.hide}</redirectTestOutputToFile> <forkMode>once</forkMode> + <systemPropertyVariables> + <zk-version>${zookeeper.version}</zk-version> + </systemPropertyVariables> </configuration> </plugin> <plugin> diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index c002ffa72ce..246911fdfc7 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -18,21 +18,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); + private QuorumPeer peer; @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); + peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); } @Override public void shutdown() { - peer.get().shutdown(Duration.ofMinutes(1)); + peer.shutdown(Duration.ofMinutes(1)); } @Override public void start(Path configFilePath) { - peer.get().start(configFilePath); + peer.start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index 27aa18c64c7..f0a95b70e96 100644 --- a/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.6.3/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -2,11 +2,15 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -19,27 +23,28 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); @Override - public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException { - ZooKeeperAdmin zooKeeperAdmin = null; - try { - zooKeeperAdmin = createAdmin(connectionSpec); + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { + try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { long fromConfig = -1; // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0) - byte[] appliedConfig = zooKeeperAdmin.reconfigure(joiningServers, leavingServers, null, fromConfig, null); + log.log(Level.INFO, "Applying ZooKeeper config: " + servers); + byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); - } catch (KeeperException e) { - if (retryOn(e)) - throw new ReconfigException(e); - else - throw new RuntimeException(e); - } catch (IOException | InterruptedException e) { + + // Verify by issuing a write operation; this is only accepted once new quorum is obtained. + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperAdmin.delete(node, -1); + + log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); + } + catch ( KeeperException.ReconfigInProgress + | KeeperException.ConnectionLossException + | KeeperException.NewConfigNoQuorum e) { + throw new ReconfigException(e); + } + catch (KeeperException | IOException | InterruptedException e) { throw new RuntimeException(e); - } finally { - if (zooKeeperAdmin != null) { - try { - zooKeeperAdmin.close(); - } catch (InterruptedException e) { /* ignore */} - } } } @@ -48,11 +53,5 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()); } - private static boolean retryOn(KeeperException e) { - return e instanceof KeeperException.ReconfigInProgress || - e instanceof KeeperException.ConnectionLossException || - e instanceof KeeperException.NewConfigNoQuorum; - } - } diff --git a/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java new file mode 100644 index 00000000000..922c389f94a --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.6.3/src/test/java/com/yahoo/vespa/zookeper/VespaZooKeeperTest.java @@ -0,0 +1,258 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.ReconfigurableVespaZooKeeperServer; +import com.yahoo.vespa.zookeeper.Reconfigurer; +import com.yahoo.vespa.zookeeper.VespaZooKeeperAdminImpl; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; + +public class VespaZooKeeperTest { + + static final Path tempDirRoot = getTmpDir(); + static final List<Integer> ports = new ArrayList<>(); + + /** + * Performs dynamic reconfiguration of ZooKeeper servers. + * + * First, a cluster of 3 servers is set up, and some data is written to it. + * Then, 3 new servers are added, and the first 3 marked for retirement; + * this should force the quorum to move the 3 new servers, but not disconnect the old ones. + * Next, the old servers are removed. + * Then, the cluster is reduced to size 1. + * Finally, the cluster grows to size 3 again. + * + * Throughout all of this, quorum should remain, and the data should remain the same. + */ + @Test(timeout = 120_000) + @Ignore // Unstable, some ZK server keeps resetting connections sometimes. + public void testReconfiguration() throws ExecutionException, InterruptedException, IOException, KeeperException, TimeoutException { + List<ZooKeeper> keepers = new ArrayList<>(); + for (int i = 0; i < 8; i++) keepers.add(new ZooKeeper()); + for (int i = 0; i < 8; i++) keepers.get(i).run(); + + // Start the first three servers. + List<ZookeeperServerConfig> configs = getConfigs(0, 0, 3, 0); + for (int i = 0; i < 3; i++) keepers.get(i).config = configs.get(i); + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for all servers to be up and running. + for (int i = 0; i < 3; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Write data to verify later. + String path = writeData(configs.get(0)); + + // Let three new servers join, causing the three older ones to retire and leave the ensemble. + configs = getConfigs(0, 3, 3, 3); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // The existing servers can't reconfigure and leave before the joiners are up. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Wait for new quorum to be established. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + // Old servers are removed. + configs = getConfigs(3, 0, 3, 0); + for (int i = 0; i < 6; i++) keepers.get(i).config = configs.get(i); + // Old servers shut down, while the newer servers remain. + for (int i = 0; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Ensure old servers shut down properly. + for (int i = 0; i < 3; i++) keepers.get(i).await(); + // Ensure new servers have reconfigured. + for (int i = 3; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + + // Verify written data is preserved. + verifyData(path, configs.get(3)); + + + // Cluster shrinks to a single server. + configs = getConfigs(5, 0, 1, 0); + for (int i = 3; i < 6; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 6; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // We let the remaining server reconfigure the others out before they die. + for (int i = 3; i < 5; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 3; i < 5; i++) keepers.get(i).await(); + verifyData(path, configs.get(5)); + + // Cluster grows to 3 servers again. + configs = getConfigs(5, 0, 3, 2); + for (int i = 5; i < 8; i++) keepers.get(i).config = configs.get(i); + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + // Wait for the joiners. + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + verifyData(path, configs.get(7)); + + // Let the remaining servers terminate. + for (int i = 5; i < 8; i++) keepers.get(i).config = null; + for (int i = 5; i < 8; i++) keepers.get(i).phaser.arriveAndAwaitAdvance(); + for (int i = 5; i < 8; i++) keepers.get(i).await(); + } + + static String writeData(ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + ZooKeeperAdmin admin = createAdmin(config); + List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = admin.create("/test-node", "hi".getBytes(UTF_8), acl, CreateMode.EPHEMERAL_SEQUENTIAL); + String read = new String(admin.getData(node, false, new Stat()), UTF_8); + assertEquals("hi", read); + return node; + } + + static void verifyData(String path, ZookeeperServerConfig config) throws IOException, InterruptedException, KeeperException { + for (int i = 0; i < 10; i++) { + try { + assertEquals("hi", new String(createAdmin(config).getData(path, false, new Stat()), UTF_8)); + return; + } + catch (KeeperException.ConnectionLossException e) { + e.printStackTrace(); + Thread.sleep(10 << i); + } + } + } + + static ZooKeeperAdmin createAdmin(ZookeeperServerConfig config) throws IOException { + return new ZooKeeperAdmin(HostName.getLocalhost() + ":" + config.clientPort(), + 10_000, + System.err::println, + new ZkClientConfigBuilder().toConfig()); + } + + static class ZooKeeper { + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Phaser phaser = new Phaser(2); + final AtomicReference<Future<?>> future = new AtomicReference<>(); + ZookeeperServerConfig config; + + void run() { + future.set(executor.submit(() -> { + Reconfigurer reconfigurer = new Reconfigurer(new VespaZooKeeperAdminImpl()); + phaser.arriveAndAwaitAdvance(); + while (config != null) { + new ReconfigurableVespaZooKeeperServer(reconfigurer, config); + phaser.arriveAndAwaitAdvance(); // server is now up, let test thread sync here + phaser.arriveAndAwaitAdvance(); // wait before reconfig/teardown to let test thread do stuff + } + reconfigurer.deconstruct(); + })); + } + + void await() throws ExecutionException, InterruptedException, TimeoutException { + future.get().get(30, SECONDS); + } + } + + static List<ZookeeperServerConfig> getConfigs(int removed, int retired, int active, int joining) { + return IntStream.rangeClosed(1, removed + retired + active) + .mapToObj(id -> getConfig(removed, retired, active, joining, id)) + .collect(toList()); + } + + // Config for server #id among retired + active servers, of which the last may be joining, and with offset removed. + static ZookeeperServerConfig getConfig(int removed, int retired, int active, int joining, int id) { + if (id <= removed) + return null; + + Path tempDir = tempDirRoot.resolve("zookeeper-" + id); + return new ZookeeperServerConfig.Builder() + .clientPort(getPorts(id).get(0)) + .dataDir(tempDir.toString()) + .zooKeeperConfigFile(tempDir.resolve("zookeeper.cfg").toString()) + .myid(id) + .myidFile(tempDir.resolve("myid").toString()) + .dynamicReconfiguration(true) + .server(IntStream.rangeClosed(removed + 1, removed + retired + active) + .mapToObj(i -> new ZookeeperServerConfig.Server.Builder() + .id(i) + .clientPort(getPorts(i).get(0)) + .electionPort(getPorts(i).get(1)) + .quorumPort(getPorts(i).get(2)) + .hostname("localhost") + .joining(i - removed > retired + active - joining) + .retired(i - removed <= retired)) + .collect(toList())) + .build(); + } + + static List<Integer> getPorts(int id) { + if (ports.size() < id * 3) { + int previousPort; + if (ports.isEmpty()) { + String[] version = System.getProperty("zk-version").split("\\."); + int versionPortOffset = 0; + for (String part : version) + versionPortOffset = 32 * (versionPortOffset + Integer.parseInt(part)); + previousPort = 20000 + versionPortOffset % 30000; + } + else + previousPort = ports.get(ports.size() - 1); + + for (int i = 0; i < 3; i++) + ports.add(previousPort = nextPort(previousPort)); + } + return ports.subList(id * 3 - 3, id * 3); + } + + static int nextPort(int previousPort) { + for (int j = 1; j <= 30000; j++) { + int port = (previousPort + j); + while (port > 50000) + port -= 30000; + + try (ServerSocket socket = new ServerSocket(port)) { + return socket.getLocalPort(); + } + catch (IOException e) { + System.err.println("Could not bind port " + port + ": " + e); + } + } + throw new RuntimeException("No free ports"); + } + + static Path getTmpDir() { + try { + Path tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "vespa-zk-test"); + tempDir.toFile().deleteOnExit(); + return tempDir.toAbsolutePath(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + +} |