aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java')
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java66
1 files changed, 23 insertions, 43 deletions
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index d4223e4d815..604419c063d 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
+
+import static com.yahoo.vespa.zookeeper.Configurator.serverSpec;
+import static java.util.stream.Collectors.toList;
/**
* Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component
@@ -50,17 +50,22 @@ public class Reconfigurer extends AbstractComponent {
this.sleeper = Objects.requireNonNull(sleeper);
}
- void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
- Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) {
+ @Override
+ public void deconstruct() {
+ shutdown();
+ }
+
+ QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
+ Supplier<QuorumPeer> quorumPeerCreator) {
if (zooKeeperRunner == null) {
- peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers.
+ peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers.
zooKeeperRunner = startServer(newConfig, server);
}
- quorumPeerSetter.accept(peer);
- if (shouldReconfigure(newConfig)) {
+ if (newConfig.dynamicReconfiguration()) {
reconfigure(newConfig);
}
+ return peer;
}
ZookeeperServerConfig activeConfig() {
@@ -73,42 +78,30 @@ public class Reconfigurer extends AbstractComponent {
}
}
- private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
- if (!newConfig.dynamicReconfiguration()) return false;
- if (activeConfig == null) return false;
- return !newConfig.equals(activeConfig());
- }
-
private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
activeConfig = zookeeperServerConfig;
return runner;
}
+ // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>)
+ // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers
+ // TODO jonmv: wrap Curator in Provider, for Curator shutdown
private void reconfigure(ZookeeperServerConfig newConfig) {
Instant reconfigTriggered = Instant.now();
- // No point in trying to reconfigure if there is only one server in the new ensemble,
- // the others will be shutdown or are about to be shutdown
- if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO);
-
- List<String> newServers = difference(servers(newConfig), servers(activeConfig));
- String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig));
- String joiningServersSpec = String.join(",", newServers);
- leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds;
- joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec;
- log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec +
- "\nleaving servers: " + leavingServerIds +
+ String newServers = String.join(",", servers(newConfig));
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." +
"\nServers in active config:" + servers(activeConfig) +
"\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
- Duration reconfigTimeout = reconfigTimeout(newServers.size());
+ Duration reconfigTimeout = reconfigTimeout(newConfig.server().size());
Instant end = now.plus(reconfigTimeout);
// Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
for (int attempt = 1; now.isBefore(end); attempt++) {
try {
Instant reconfigStarted = Instant.now();
- vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds);
+ vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers);
Instant reconfigEnded = Instant.now();
log.log(Level.INFO, "Reconfiguration completed in " +
Duration.between(reconfigTriggered, reconfigEnded) +
@@ -147,24 +140,11 @@ public class Reconfigurer extends AbstractComponent {
return HostName.getLocalhost() + ":" + config.clientPort();
}
- private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) {
- return difference(servers(oldConfig), servers(newConfig)).stream()
- .map(server -> server.substring(0, server.indexOf('=')))
- .collect(Collectors.toList());
- }
-
private static List<String> servers(ZookeeperServerConfig config) {
- // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format
return config.server().stream()
- .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
- server.electionPort() + ";" + config.clientPort())
- .collect(Collectors.toList());
- }
-
- private static <T> List<T> difference(List<T> list1, List<T> list2) {
- List<T> copy = new ArrayList<>(list1);
- copy.removeAll(list2);
- return copy;
+ .filter(server -> ! server.retired())
+ .map(server -> serverSpec(server, config.clientPort(), false))
+ .collect(toList());
}
}