aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-common
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2022-01-14 17:57:43 +0100
committerGitHub <noreply@github.com>2022-01-14 17:57:43 +0100
commit810da357155a349884b862de87e18de87ed54b35 (patch)
treeaba2f3b711660a193686a8567ac3599876028f54 /zookeeper-server/zookeeper-server-common
parentc0a980df17141c55e19dfd98c8610f42c79029a0 (diff)
Revert "Jonmv/reapply zk changes 2"
Diffstat (limited to 'zookeeper-server/zookeeper-server-common')
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java11
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java66
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java2
-rw-r--r--zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java69
4 files changed, 85 insertions, 63 deletions
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
index 8b22f658a94..39d0312915f 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Configurator.java
@@ -86,7 +86,7 @@ public class Configurator {
sb.append("reconfigEnabled=true").append("\n");
sb.append("skipACL=yes").append("\n");
ensureThisServerIsRepresented(config.myid(), config.server());
- config.server().forEach(server -> sb.append(serverSpec(server, config.clientPort(), server.joining())).append("\n"));
+ config.server().forEach(server -> addServerToCfg(sb, server, config.clientPort()));
sb.append(new TlsQuorumConfig().createConfig(vespaTlsConfig));
sb.append(new TlsClientServerConfig().createConfig(vespaTlsConfig));
return sb.toString();
@@ -111,8 +111,7 @@ public class Configurator {
}
}
- static String serverSpec(ZookeeperServerConfig.Server server, int clientPort, boolean joining) {
- StringBuilder sb = new StringBuilder();
+ private void addServerToCfg(StringBuilder sb, ZookeeperServerConfig.Server server, int clientPort) {
sb.append("server.")
.append(server.id())
.append("=")
@@ -121,7 +120,7 @@ public class Configurator {
.append(server.quorumPort())
.append(":")
.append(server.electionPort());
- if (joining) {
+ if (server.joining()) {
// Servers that are joining an existing cluster must be marked as observers. Note that this will NOT
// actually make the server an observer, but prevent it from forming an ensemble independently of the
// existing cluster.
@@ -131,8 +130,8 @@ public class Configurator {
.append("observer");
}
sb.append(";")
- .append(server.clientPort());
- return sb.toString();
+ .append(clientPort)
+ .append("\n");
}
static List<String> zookeeperServerHostnames(ZookeeperServerConfig zookeeperServerConfig) {
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
index 604419c063d..d4223e4d815 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/Reconfigurer.java
@@ -10,14 +10,14 @@ import com.yahoo.yolean.Exceptions;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
-
-import static com.yahoo.vespa.zookeeper.Configurator.serverSpec;
-import static java.util.stream.Collectors.toList;
+import java.util.stream.Collectors;
/**
* Starts zookeeper server and supports reconfiguring zookeeper cluster. Keep this as a component
@@ -50,22 +50,17 @@ public class Reconfigurer extends AbstractComponent {
this.sleeper = Objects.requireNonNull(sleeper);
}
- @Override
- public void deconstruct() {
- shutdown();
- }
-
- QuorumPeer startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
- Supplier<QuorumPeer> quorumPeerCreator) {
+ void startOrReconfigure(ZookeeperServerConfig newConfig, VespaZooKeeperServer server,
+ Supplier<QuorumPeer> quorumPeerGetter, Consumer<QuorumPeer> quorumPeerSetter) {
if (zooKeeperRunner == null) {
- peer = quorumPeerCreator.get(); // Obtain the peer from the server. This will be shared with later servers.
+ peer = quorumPeerGetter.get(); // Obtain the peer from the server. This will be shared with later servers.
zooKeeperRunner = startServer(newConfig, server);
}
+ quorumPeerSetter.accept(peer);
- if (newConfig.dynamicReconfiguration()) {
+ if (shouldReconfigure(newConfig)) {
reconfigure(newConfig);
}
- return peer;
}
ZookeeperServerConfig activeConfig() {
@@ -78,30 +73,42 @@ public class Reconfigurer extends AbstractComponent {
}
}
+ private boolean shouldReconfigure(ZookeeperServerConfig newConfig) {
+ if (!newConfig.dynamicReconfiguration()) return false;
+ if (activeConfig == null) return false;
+ return !newConfig.equals(activeConfig());
+ }
+
private ZooKeeperRunner startServer(ZookeeperServerConfig zookeeperServerConfig, VespaZooKeeperServer server) {
ZooKeeperRunner runner = new ZooKeeperRunner(zookeeperServerConfig, server);
activeConfig = zookeeperServerConfig;
return runner;
}
- // TODO jonmv: read dynamic file, discard if old quorum impossible (config file + .dynamic.<id>)
- // TODO jonmv: if dynamic file, all unlisted servers are observers; otherwise joiners are observers
- // TODO jonmv: wrap Curator in Provider, for Curator shutdown
private void reconfigure(ZookeeperServerConfig newConfig) {
Instant reconfigTriggered = Instant.now();
- String newServers = String.join(",", servers(newConfig));
- log.log(Level.INFO, "Will reconfigure ZooKeeper cluster." +
+ // No point in trying to reconfigure if there is only one server in the new ensemble,
+ // the others will be shutdown or are about to be shutdown
+ if (newConfig.server().size() == 1) shutdownAndDie(Duration.ZERO);
+
+ List<String> newServers = difference(servers(newConfig), servers(activeConfig));
+ String leavingServerIds = String.join(",", serverIdsDifference(activeConfig, newConfig));
+ String joiningServersSpec = String.join(",", newServers);
+ leavingServerIds = leavingServerIds.isEmpty() ? null : leavingServerIds;
+ joiningServersSpec = joiningServersSpec.isEmpty() ? null : joiningServersSpec;
+ log.log(Level.INFO, "Will reconfigure ZooKeeper cluster. \nJoining servers: " + joiningServersSpec +
+ "\nleaving servers: " + leavingServerIds +
"\nServers in active config:" + servers(activeConfig) +
"\nServers in new config:" + servers(newConfig));
String connectionSpec = localConnectionSpec(activeConfig);
Instant now = Instant.now();
- Duration reconfigTimeout = reconfigTimeout(newConfig.server().size());
+ Duration reconfigTimeout = reconfigTimeout(newServers.size());
Instant end = now.plus(reconfigTimeout);
// Loop reconfiguring since we might need to wait until another reconfiguration is finished before we can succeed
for (int attempt = 1; now.isBefore(end); attempt++) {
try {
Instant reconfigStarted = Instant.now();
- vespaZooKeeperAdmin.reconfigure(connectionSpec, newServers);
+ vespaZooKeeperAdmin.reconfigure(connectionSpec, joiningServersSpec, leavingServerIds);
Instant reconfigEnded = Instant.now();
log.log(Level.INFO, "Reconfiguration completed in " +
Duration.between(reconfigTriggered, reconfigEnded) +
@@ -140,11 +147,24 @@ public class Reconfigurer extends AbstractComponent {
return HostName.getLocalhost() + ":" + config.clientPort();
}
+ private static List<String> serverIdsDifference(ZookeeperServerConfig oldConfig, ZookeeperServerConfig newConfig) {
+ return difference(servers(oldConfig), servers(newConfig)).stream()
+ .map(server -> server.substring(0, server.indexOf('=')))
+ .collect(Collectors.toList());
+ }
+
private static List<String> servers(ZookeeperServerConfig config) {
+ // See https://zookeeper.apache.org/doc/r3.6.3/zookeeperReconfig.html#sc_reconfig_clientport for format
return config.server().stream()
- .filter(server -> ! server.retired())
- .map(server -> serverSpec(server, config.clientPort(), false))
- .collect(toList());
+ .map(server -> server.id() + "=" + server.hostname() + ":" + server.quorumPort() + ":" +
+ server.electionPort() + ";" + config.clientPort())
+ .collect(Collectors.toList());
+ }
+
+ private static <T> List<T> difference(List<T> list1, List<T> list2) {
+ List<T> copy = new ArrayList<>(list1);
+ copy.removeAll(list2);
+ return copy;
}
}
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
index 59c9628bcab..8809dca0def 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdmin.java
@@ -10,7 +10,7 @@ import java.time.Duration;
*/
public interface VespaZooKeeperAdmin {
- void reconfigure(String connectionSpec, String servers) throws ReconfigException;
+ void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException;
/* Timeout for connecting to ZooKeeper */
default Duration sessionTimeout() { return Duration.ofSeconds(30); }
diff --git a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
index 760c326cf5d..1211624e3d6 100644
--- a/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
+++ b/zookeeper-server/zookeeper-server-common/src/test/java/com/yahoo/vespa/zookeeper/ReconfigurerTest.java
@@ -17,6 +17,7 @@ import java.util.Arrays;
import java.util.concurrent.Phaser;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
/**
@@ -50,26 +51,31 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
+ assertSame(nextConfig, reconfigurer.activeConfig());
+
+ // No reconfiguration happens with same config
+ reconfigurer.startOrReconfigure(nextConfig);
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster shrinks
nextConfig = createConfig(3, true);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals(2, reconfigurer.reconfigurations());
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181",
- reconfigurer.servers());
+ assertNull("No servers are joining", reconfigurer.joiningServers());
+ assertEquals("3,4", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
// Cluster loses node1, but node3 joins. Indices are shuffled.
nextConfig = createConfig(3, true, 1);
reconfigurer.startOrReconfigure(nextConfig);
- assertEquals(4, reconfigurer.reconfigurations());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node2:2182:2183;2181,server.2=node3:2182:2183;2181",
- reconfigurer.servers());
+ assertEquals(3, reconfigurer.reconfigurations());
+ assertEquals("1=node2:2182:2183;2181,2=node3:2182:2183;2181", reconfigurer.joiningServers());
+ assertEquals("1,2", reconfigurer.leavingServers());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -83,9 +89,9 @@ public class ReconfigurerTest {
ZookeeperServerConfig nextConfig = createConfig(5, true);
reconfigurer.startOrReconfigure(nextConfig);
assertEquals("node1:2181", reconfigurer.connectionSpec());
- assertEquals("server.0=node0:2182:2183;2181,server.1=node1:2182:2183;2181,server.2=node2:2182:2183;2181,server.3=node3:2182:2183;2181,server.4=node4:2182:2183;2181",
- reconfigurer.servers());
- assertEquals(2, reconfigurer.reconfigurations());
+ assertEquals("3=node3:2182:2183;2181,4=node4:2182:2183;2181", reconfigurer.joiningServers());
+ assertNull("No servers are leaving", reconfigurer.leavingServers());
+ assertEquals(1, reconfigurer.reconfigurations());
assertSame(nextConfig, reconfigurer.activeConfig());
}
@@ -106,27 +112,24 @@ public class ReconfigurerTest {
reconfigurer.shutdown();
}
- private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... retiredIndices) {
- Arrays.sort(retiredIndices);
+ private ZookeeperServerConfig createConfig(int numberOfServers, boolean dynamicReconfiguration, int... skipIndices) {
+ Arrays.sort(skipIndices);
ZookeeperServerConfig.Builder builder = new ZookeeperServerConfig.Builder();
builder.zooKeeperConfigFile(cfgFile.getAbsolutePath());
builder.myidFile(idFile.getAbsolutePath());
for (int i = 0, index = 0; i < numberOfServers; i++, index++) {
- boolean retired = Arrays.binarySearch(retiredIndices, index) >= 0;
- if (retired) i--;
- builder.server(newServer(i, "node" + index, retired));
+ while (Arrays.binarySearch(skipIndices, index) >= 0) index++;
+ builder.server(newServer(i, "node" + index));
}
-
builder.myid(0);
builder.dynamicReconfiguration(dynamicReconfiguration);
return builder.build();
}
- private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName, boolean retired) {
+ private ZookeeperServerConfig.Server.Builder newServer(int id, String hostName) {
ZookeeperServerConfig.Server.Builder builder = new ZookeeperServerConfig.Server.Builder();
builder.id(id);
builder.hostname(hostName);
- builder.retired(retired);
return builder;
}
@@ -139,7 +142,6 @@ public class ReconfigurerTest {
private static class TestableReconfigurer extends Reconfigurer implements VespaZooKeeperServer {
private final TestableVespaZooKeeperAdmin zooKeeperAdmin;
- private final Phaser phaser = new Phaser(2);
private QuorumPeer serverPeer;
TestableReconfigurer(TestableVespaZooKeeperAdmin zooKeeperAdmin) {
@@ -154,16 +156,19 @@ public class ReconfigurerTest {
}
void startOrReconfigure(ZookeeperServerConfig newConfig) {
- serverPeer = startOrReconfigure(newConfig, this, MockQuorumPeer::new);
- phaser.arriveAndDeregister();
+ startOrReconfigure(newConfig, this, MockQuorumPeer::new, peer -> serverPeer = peer);
}
String connectionSpec() {
return zooKeeperAdmin.connectionSpec;
}
- String servers() {
- return zooKeeperAdmin.servers;
+ String joiningServers() {
+ return zooKeeperAdmin.joiningServers;
+ }
+
+ String leavingServers() {
+ return zooKeeperAdmin.leavingServers;
}
int reconfigurations() {
@@ -172,14 +177,10 @@ public class ReconfigurerTest {
@Override
public void shutdown() {
- phaser.arriveAndAwaitAdvance();
serverPeer.shutdown(Duration.ofSeconds(1)); }
@Override
- public void start(Path configFilePath) {
- phaser.arriveAndAwaitAdvance();
- serverPeer.start(configFilePath);
- }
+ public void start(Path configFilePath) { serverPeer.start(configFilePath); }
@Override
public boolean reconfigurable() {
@@ -191,7 +192,8 @@ public class ReconfigurerTest {
private static class TestableVespaZooKeeperAdmin implements VespaZooKeeperAdmin {
String connectionSpec;
- String servers;
+ String joiningServers;
+ String leavingServers;
int reconfigurations = 0;
private int failures = 0;
@@ -203,11 +205,12 @@ public class ReconfigurerTest {
}
@Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
+ public void reconfigure(String connectionSpec, String joiningServers, String leavingServers) throws ReconfigException {
if (++attempts < failures)
throw new ReconfigException("Reconfig failed");
this.connectionSpec = connectionSpec;
- this.servers = servers;
+ this.joiningServers = joiningServers;
+ this.leavingServers = leavingServers;
this.reconfigurations++;
}