diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-02-19 12:30:30 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-02-19 12:34:16 +0100 |
commit | 896879291572a2de2d421acf552bbf272f8bc44a (patch) | |
tree | 8aff17203bf3c606ed8f84497384891c41a7c838 | |
parent | 1d1390fcfad2ace92795e47a15513a90d493cc95 (diff) |
Move ownership of QuorumPeer to Recoonfigurer
6 files changed, 69 insertions, 23 deletions
diff --git a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index d614aecbad2..0b08966e241 100644 --- a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -6,29 +6,33 @@ import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.component.AbstractComponent; import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; /** - * Starts or reconfigures zookeeper cluster + * Starts or reconfigures zookeeper cluster. + * The QuorumPeer conditionally created here is owned by the Reconfigurer; + * when it already has a peer, that peer is used here in case start or shutdown is required. * * @author hmusum */ public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { - private final VespaQuorumPeer peer; + private final AtomicReference<QuorumPeer> peer = new AtomicReference<>(); @Inject public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { - this.peer = new VespaQuorumPeer(); - reconfigurer.startOrReconfigure(zookeeperServerConfig, this); + reconfigurer.startOrReconfigure(zookeeperServerConfig, this, VespaQuorumPeer::new, peer::set); } @Override public void shutdown() { - peer.shutdown(); + peer.get().shutdown(Duration.ofMinutes(1)); } + @Override public void start(Path configFilePath) { - peer.start(configFilePath); + peer.get().start(configFilePath); } @Override diff --git a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java index ba11286404a..e003aa39f7b 100644 --- a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java +++ b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java @@ -18,22 +18,26 @@ import java.util.logging.Logger; * * @author hmusum */ -class VespaQuorumPeer extends QuorumPeerMain { +class VespaQuorumPeer extends QuorumPeerMain implements QuorumPeer { private static final Logger log = java.util.logging.Logger.getLogger(Reconfigurer.class.getName()); private static final Duration timeToWaitForShutdown = Duration.ofSeconds(60); + @Override public void start(Path path) { initializeAndRun(new String[]{ path.toFile().getAbsolutePath()}); } - public void shutdown() { + @Override + public void shutdown(Duration timeout) { if (quorumPeer != null) { log.log(Level.INFO, "Shutting down ZooKeeper server"); try { quorumPeer.shutdown(); - quorumPeer.join(timeToWaitForShutdown.toMillis()); // Wait for shutdown to complete - } catch (RuntimeException|InterruptedException e) { + quorumPeer.join(timeout.toMillis()); // Wait for shutdown to complete + if (quorumPeer.isAlive()) + throw new IllegalStateException("Peer still alive after " + timeout); + } catch (RuntimeException | InterruptedException e) { // If shutdown fails, we have no other option than forcing the JVM to stop and letting it be restarted. // // When a VespaZooKeeperServer component receives a new config, the container will try to start a new diff --git a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java index 2a66bebe048..430aab802c2 100644 --- a/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java +++ b/zookeeper-server/zookeeper-server-3.6.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java @@ -6,6 +6,7 @@ import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.component.AbstractComponent; import java.nio.file.Path; +import java.time.Duration; /** * @author Ulf Lilleengen @@ -30,7 +31,7 @@ public class VespaZooKeeperServerImpl extends AbstractComponent implements Vespa @Override public void shutdown() { - peer.shutdown(); + peer.shutdown(Duration.ofMinutes(1)); } @Override diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/QuorumPeer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/QuorumPeer.java new file mode 100644 index 00000000000..010b6546443 --- /dev/null +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/QuorumPeer.java @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import java.nio.file.Path; +import java.time.Duration; + +/** + * Wraps ZK-version-dependent VespaQuorumPeer + * + * @author jonmv + */ +public interface QuorumPeer { + + /** Starts ZK with config from the given path. */ + void start(Path path); + + /** Shuts down this peer, with the given timeout, or kills the process. */ + void shutdown(Duration timeout); + +} diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java index 5296e3646c8..045538b2d7e 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java @@ -13,6 +13,8 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -34,6 +36,7 @@ public class Reconfigurer extends AbstractComponent { private final VespaZooKeeperAdmin vespaZooKeeperAdmin; private final Sleeper sleeper; + private QuorumPeer peer; private ZooKeeperRunner zooKeeperRunner; private ZookeeperServerConfig activeConfig; @@ -48,12 +51,16 @@ public class Reconfigurer extends AbstractComponent { log.log(Level.FINE, "Created ZooKeeperReconfigurer"); } - void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) { - if (zooKeeperRunner == null) + void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server, + Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) { + if (zooKeeperRunner == null) { + peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with laters servers. zooKeeperRunner = startServer(newConfig, server); + } + quorumPeerSetter.accept(peer); if (shouldReconfigure(newConfig)) { - reconfigure(newConfig, server); + reconfigure(newConfig); } } @@ -79,11 +86,11 @@ public class Reconfigurer extends AbstractComponent { return runner; } - private void reconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server) { + private void reconfigure(ZookeeperServerConfig newConfig) { Instant reconfigTriggered = Instant.now(); // No point in trying to reconfigure if there is only one server in the new ensemble, // the others will be shutdown or are about to be shutdown - if (newConfig.server().size() == 1) shutdownAndDie(server, Duration.ZERO); + if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO); List<String> newServers = difference(servers(newConfig), servers(activeConfig)); String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig)); @@ -122,12 +129,12 @@ public class Reconfigurer extends AbstractComponent { } // Reconfiguration failed - shutdownAndDie(server, reconfigTimeout); + shutdownAndDie(reconfigTimeout); } - private void shutdownAndDie(VespaZooKeeperServer server, Duration reconfigTimeout) { - server.shutdown(); - Process.logAndDie("Reconfiguration did not complete within timeout " + reconfigTimeout + ". Forcing shutdown"); + private void shutdownAndDie(Duration reconfigTimeout) { + shutdown(); + Process.logAndDie("Reconfiguration did not complete within timeout " + reconfigTimeout + ". Forcing container shutdown."); } /** Returns the timeout to use for the given joining server count */ diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java index 5cee0de2b6e..a2a3fc0a39f 100644 --- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java +++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java @@ -14,10 +14,13 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; /** * Tests dynamic reconfiguration of zookeeper cluster. @@ -136,9 +139,16 @@ public class ReconfigurerTest { return builder; } + private static class MockQuorumPeer implements QuorumPeer { + final AtomicBoolean on = new AtomicBoolean(); + @Override public void start(Path path) { assertFalse(on.getAndSet(true)); } + @Override public void shutdown(Duration timeout) { assertTrue(on.getAndSet(false)); } + } + private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer { private final TestableVespaZooKeeperAdmin zooKeeperAdmin; + private QuorumPeer serverPeer; TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) { super(zooKeeperAdmin, new Sleeper() { @@ -152,7 +162,7 @@ public class ReconfigurerTest { } void startOrReconfigure(ZookeeperServerConfig newConfig) { - startOrReconfigure(newConfig, this); + startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer); } String connectionSpec() { @@ -172,10 +182,10 @@ public class ReconfigurerTest { } @Override - public void shutdown() {} + public void shutdown() { serverPeer.shutdown(Duration.ofSeconds(1)); } @Override - public void start(Path configFilePath) { } + public void start(Path configFilePath) { serverPeer.start(configFilePath); } @Override public boolean reconfigurable() { |