summaryrefslogtreecommitdiffstats
path: root/zookeeper-server
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2023-01-30 16:28:23 +0100
committerHarald Musum <musum@yahooinc.com>2023-01-30 16:28:23 +0100
commit8973f674dea9cacb820069821ae50035fb96e442 (patch)
treef77d28c5459067bb59e47d97c58482f5d38c1b08 /zookeeper-server
parentb229fe2920007ae37b87fe0b10e52ed7ef68949b (diff)
Add support for ZooKeeper 3.8.1
Diffstat (limited to 'zookeeper-server')
-rw-r--r--zookeeper-server/CMakeLists.txt1
-rw-r--r--zookeeper-server/pom.xml1
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt2
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/pom.xml107
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java43
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java45
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java41
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java60
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java93
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java51
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java94
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java353
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java37
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java2329
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java310
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java920
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java184
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java140
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java239
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java84
20 files changed, 5134 insertions, 0 deletions
diff --git a/zookeeper-server/CMakeLists.txt b/zookeeper-server/CMakeLists.txt
index e88493c1a1b..ea8bf68eba8 100644
--- a/zookeeper-server/CMakeLists.txt
+++ b/zookeeper-server/CMakeLists.txt
@@ -1,3 +1,4 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
add_subdirectory(zookeeper-server-common)
add_subdirectory(zookeeper-server)
+add_subdirectory(zookeeper-server-3.8.1)
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 69a38ef9e2a..23590a3941f 100644
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -14,6 +14,7 @@
<modules>
<module>zookeeper-server-common</module>
<module>zookeeper-server</module>
+ <module>zookeeper-server-3.8.1</module>
</modules>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt
new file mode 100644
index 00000000000..72c47b6028b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt
@@ -0,0 +1,2 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+install_jar(zookeeper-server-3.8.1-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server-3.8.1/pom.xml b/zookeeper-server/zookeeper-server-3.8.1/pom.xml
new file mode 100644
index 00000000000..72da1ba26c4
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0"?>
+<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>zookeeper-server-parent</artifactId>
+ <version>8-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>zookeeper-server-3.8.1</artifactId>
+ <packaging>container-plugin</packaging>
+ <version>8-SNAPSHOT</version>
+ <properties>
+ <zookeeper.version>3.8.1</zookeeper.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>zookeeper-server-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>zookeeper-client-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <!-- Don't use ZK version from zookeeper-client-common -->
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ <exclusions>
+ <!--
+ Container provides wiring for all common log libraries
+ Duplicate embedding results in various warnings being printed to stderr
+ -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- snappy-java and metrics-core are included here
+ to be able to work with ZooKeeper 3.7.0 due to
+ class loading issues -->
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerArgs>
+ <arg>-Xlint:all</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <updateReleaseInfo>true</updateReleaseInfo>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <importPackage>com.sun.management</importPackage>
+ <bundleSymbolicName>zookeeper-server</bundleSymbolicName>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
new file mode 100644
index 00000000000..17179aa5e69
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
@@ -0,0 +1,43 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
+import java.nio.file.Path;
+
+/**
+ *
+ * Server used for starting config server, needed to be able to have different behavior for hosted and
+ * self-hosted Vespa (controlled by zookeeperServerConfig.dynamicReconfiguration).
+ *
+ * @author Harald Musum
+ */
+public class ConfigServerZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
+
+ private final VespaZooKeeperServer zooKeeperServer;
+
+ @Inject
+ public ConfigServerZooKeeperServer(ZookeeperServerConfig zookeeperServerConfig) {
+ this.zooKeeperServer = zookeeperServerConfig.dynamicReconfiguration()
+ ? new ReconfigurableVespaZooKeeperServer(new Reconfigurer(new VespaZooKeeperAdminImpl()), zookeeperServerConfig)
+ : new VespaZooKeeperServerImpl(zookeeperServerConfig);
+ }
+
+ @Override
+ public void deconstruct() { zooKeeperServer.shutdown(); }
+
+ @Override
+ public void shutdown() {
+ zooKeeperServer.shutdown();
+ }
+
+ @Override
+ public void start(Path configFilePath) {
+ zooKeeperServer.start(configFilePath);
+ }
+
+ @Override
+ public boolean reconfigurable() { return zooKeeperServer.reconfigurable(); }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
new file mode 100644
index 00000000000..8161e4bba4b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import ai.vespa.validation.Validation;
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
+import java.nio.file.Path;
+import java.time.Duration;
+
+/**
+ * Starts or reconfigures zookeeper cluster.
+ * The QuorumPeer conditionally created here is owned by the Reconfigurer;
+ * when it already has a peer, that peer is used here in case start or shutdown is required.
+ *
+ * @author hmusum
+ */
+public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
+
+ private QuorumPeer peer;
+
+ @Inject
+ public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
+ Validation.require(zookeeperServerConfig.dynamicReconfiguration(),
+ zookeeperServerConfig.dynamicReconfiguration(),
+ "dynamicReconfiguration must be true");
+ peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
+ }
+
+ @Override
+ public void shutdown() {
+ peer.shutdown(Duration.ofMinutes(1));
+ }
+
+ @Override
+ public void start(Path configFilePath) {
+ peer.start(configFilePath);
+ }
+
+ @Override
+ public boolean reconfigurable() {
+ return true;
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
new file mode 100644
index 00000000000..66742b0e05b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
@@ -0,0 +1,41 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.auth.AuthenticationProvider;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+
+import java.security.cert.X509Certificate;
+import java.util.logging.Logger;
+
+/**
+ * A {@link AuthenticationProvider} to be used in combination with Vespa mTLS
+ *
+ * @author bjorncs
+ */
+public class VespaMtlsAuthenticationProvider extends X509AuthenticationProvider {
+
+ private static final Logger log = Logger.getLogger(VespaMtlsAuthenticationProvider.class.getName());
+
+ public VespaMtlsAuthenticationProvider() throws X509Exception { super(null, null);}
+
+ @Override
+ public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
+ // Vespa's mTLS peer authorization rules are performed by the underlying trust manager implementation.
+ // The client is authorized once the SSL handshake has completed.
+ X509Certificate[] certificateChain = (X509Certificate[]) cnxn.getClientCertificateChain();
+ if (certificateChain == null || certificateChain.length == 0) {
+ log.warning("Client not authenticated - should not be possible with clientAuth=NEED");
+ return KeeperException.Code.AUTHFAILED;
+ }
+ X509Certificate certificate = certificateChain[0];
+ cnxn.addAuthInfo(new Id(getScheme(), certificate.getSubjectX500Principal().getName()));
+ return KeeperException.Code.OK;
+ }
+
+ @Override public String getScheme() { return "x509"; }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
new file mode 100644
index 00000000000..e5c35a185b5
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
@@ -0,0 +1,60 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.yahoo.protect.Process;
+import org.apache.zookeeper.server.admin.AdminServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Starts or stops a ZooKeeper server. Extends QuorumPeerMain to be able to call initializeAndRun() and wraps
+ * exceptions so that it can be used by code that does not depend on ZooKeeper.
+ *
+ * @author hmusum
+ */
+class VespaQuorumPeer extends QuorumPeerMain implements QuorumPeer {
+
+ private static final Logger log = java.util.logging.Logger.getLogger(VespaQuorumPeer.class.getName());
+
+ @Override
+ public void start(Path path) {
+ initializeAndRun(new String[]{ path.toFile().getAbsolutePath()});
+ }
+
+ @Override
+ public void shutdown(Duration timeout) {
+ if (quorumPeer != null) {
+ log.log(Level.FINE, "Shutting down ZooKeeper server");
+ try {
+ quorumPeer.shutdown();
+ quorumPeer.join(timeout.toMillis()); // Wait for shutdown to complete
+ if (quorumPeer.isAlive())
+ throw new IllegalStateException("Peer still alive after " + timeout);
+ } catch (RuntimeException | InterruptedException e) {
+ // If shutdown fails, we have no other option than forcing the JVM to stop and letting it be restarted.
+ //
+ // When a VespaZooKeeperServer component receives a new config, the container will try to start a new
+ // server with the new config, this will fail until the old server is deconstructed. If the old server
+ // fails to deconstruct/shutdown, the new one will never start and if that happens forcing a restart is
+ // the better option.
+ Process.logAndDie("Failed to shut down ZooKeeper server properly, forcing shutdown", e);
+ }
+ }
+ }
+
+ @Override
+ protected void initializeAndRun(String[] args) {
+ try {
+ super.initializeAndRun(args);
+ } catch (QuorumPeerConfig.ConfigException | IOException | AdminServer.AdminServerException e) {
+ throw new RuntimeException("Exception when initializing or running ZooKeeper server", e);
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
new file mode 100644
index 00000000000..be69a12599b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -0,0 +1,93 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.net.HostName;
+import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.data.ACL;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.yolean.Exceptions.uncheck;
+
+/**
+ * @author hmusum
+ */
+@SuppressWarnings("unused") // Created by injection
+public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
+
+ private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
+
+ @Override
+ public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
+ try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
+ long fromConfig = -1;
+ // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0).
+ log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
+ byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
+ log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
+
+ // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
+ List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+ String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+ zooKeeperAdmin.delete(node, -1);
+
+ log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
+ }
+ catch ( KeeperException.ReconfigInProgress
+ | KeeperException.ConnectionLossException
+ | KeeperException.NewConfigNoQuorum e) {
+ throw new ReconfigException(e);
+ }
+ catch (KeeperException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ZooKeeperAdmin createAdmin(String connectionSpec) {
+ return uncheck(() -> new ZooKeeperAdmin(connectionSpec, (int) sessionTimeout().toMillis(),
+ (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig()));
+ }
+
+ /** Creates a node in zookeeper, with hostname as part of node name, this ensures that server is up and working before returning */
+ void createDummyNode(ZookeeperServerConfig zookeeperServerConfig) {
+ int sleepTime = 2_000;
+ try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(localConnectionSpec(zookeeperServerConfig))) {
+ Instant end = Instant.now().plus(Duration.ofMinutes(5));
+ Exception exception = null;
+ do {
+ try {
+ zooKeeperAdmin.create("/dummy-node-" + HostName.getLocalhost(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ return;
+ } catch (KeeperException e) {
+ if (e instanceof KeeperException.NodeExistsException) {
+ try {
+ zooKeeperAdmin.setData("/dummy-node-" + HostName.getLocalhost(), new byte[0], -1);
+ return;
+ } catch (KeeperException ex) {
+ log.log(Level.INFO, e.getMessage());
+ Thread.sleep(sleepTime);
+ continue;
+ }
+ }
+ log.log(Level.INFO, e.getMessage());
+ exception = e;
+ Thread.sleep(sleepTime);
+ }
+ } while (Instant.now().isBefore(end));
+ throw new RuntimeException("Unable to create dummy node: ", exception);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
+
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
new file mode 100644
index 00000000000..8adabeedb1b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
@@ -0,0 +1,51 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.zookeeper;
+
+import ai.vespa.validation.Validation;
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.annotation.Inject;
+import java.nio.file.Path;
+import java.time.Duration;
+
+/**
+ * @author Ulf Lilleengen
+ * @author Harald Musum
+ */
+public class VespaZooKeeperServerImpl extends AbstractComponent implements VespaZooKeeperServer {
+
+ private final VespaQuorumPeer peer;
+ private final ZooKeeperRunner runner;
+
+ @Inject
+ public VespaZooKeeperServerImpl(ZookeeperServerConfig zookeeperServerConfig) {
+ Validation.require(! zookeeperServerConfig.dynamicReconfiguration(),
+ ! zookeeperServerConfig.dynamicReconfiguration(),
+ "dynamicReconfiguration must be false");
+ this.peer = new VespaQuorumPeer();
+ this.runner = new ZooKeeperRunner(zookeeperServerConfig, this);
+ new VespaZooKeeperAdminImpl().createDummyNode(zookeeperServerConfig);
+ }
+
+ @Override
+ public void deconstruct() {
+ runner.shutdown();
+ super.deconstruct();
+ }
+
+ @Override
+ public void shutdown() {
+ peer.shutdown(Duration.ofMinutes(1));
+ }
+
+ @Override
+ public void start(Path configFilePath) {
+ peer.start(configFilePath);
+ }
+
+ @Override
+ public boolean reconfigurable() {
+ return false;
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java
new file mode 100644
index 00000000000..33ec9b1303a
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java
@@ -0,0 +1,94 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * This class contains common utilities for netstuff. Like printing IPv6 literals correctly
+ */
+public class NetUtils {
+
+ // Note: Changed from original to use hostname from InetSocketAddress if there exists one
+ public static String formatInetAddr(InetSocketAddress addr) {
+ String hostName = addr.getHostName();
+ if (hostName != null) {
+ return String.format("%s:%s", hostName, addr.getPort());
+ }
+
+ InetAddress ia = addr.getAddress();
+
+ if (ia == null) {
+ return String.format("%s:%s", addr.getHostString(), addr.getPort());
+ }
+ if (ia instanceof Inet6Address) {
+ return String.format("[%s]:%s", ia.getHostAddress(), addr.getPort());
+ } else {
+ return String.format("%s:%s", ia.getHostAddress(), addr.getPort());
+ }
+ }
+
+ /**
+ * Separates host and port from given host port string if host port string is enclosed
+ * within square bracket.
+ *
+ * @param hostPort host port string
+ * @return String[]{host, port} if host port string is host:port
+ * or String[] {host, port:port} if host port string is host:port:port
+ * or String[] {host} if host port string is host
+ * or String[]{} if not a ipv6 host port string.
+ */
+ public static String[] getIPV6HostAndPort(String hostPort) {
+ if (hostPort.startsWith("[")) {
+ int i = hostPort.lastIndexOf(']');
+ if (i < 0) {
+ throw new IllegalArgumentException(
+ hostPort + " starts with '[' but has no matching ']'");
+ }
+ String host = hostPort.substring(1, i);
+ if (host.isEmpty()) {
+ throw new IllegalArgumentException(host + " is empty.");
+ }
+ if (hostPort.length() > i + 1) {
+ return getHostPort(hostPort, i, host);
+ }
+ return new String[] { host };
+ } else {
+ //Not an IPV6 host port string
+ return new String[] {};
+ }
+ }
+
+ private static String[] getHostPort(String hostPort, int indexOfClosingBracket, String host) {
+ // [127::1]:2181 , check separator : exits
+ if (hostPort.charAt(indexOfClosingBracket + 1) != ':') {
+ throw new IllegalArgumentException(hostPort + " does not have : after ]");
+ }
+ // [127::1]: scenario
+ if (indexOfClosingBracket + 2 == hostPort.length()) {
+ throw new IllegalArgumentException(hostPort + " doesn't have a port after colon.");
+ }
+ //do not include
+ String port = hostPort.substring(indexOfClosingBracket + 2);
+ return new String[] { host, port };
+ }
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
new file mode 100644
index 00000000000..e03e0b07944
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This RequestProcessor logs requests to disk. It batches the requests to do
+ * the io efficiently. The request is not passed to the next RequestProcessor
+ * until its log has been synced to disk.
+ *
+ * SyncRequestProcessor is used in 3 different cases
+ * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
+ * send ack back to itself.
+ * 2. Follower - Sync request to disk and forward request to
+ * SendAckRequestProcessor which send the packets to leader.
+ * SendAckRequestProcessor is flushable which allow us to force
+ * push packets to leader.
+ * 3. Observer - Sync committed request to disk (received as INFORM packet).
+ * It never send ack back to the leader, so the nextProcessor will
+ * be null. This change the semantic of txnlog on the observer
+ * since it only contains committed txns.
+ */
+public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
+
+ private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
+
+ private static class FlushRequest extends Request {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ public FlushRequest() {
+ super(null, 0, 0, 0, null, null);
+ }
+ }
+
+ private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
+ private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);
+
+ private static class DelayingProcessor implements RequestProcessor, Flushable {
+ private final RequestProcessor next;
+ private Queue<Request> delayed = null;
+ private DelayingProcessor(RequestProcessor next) {
+ this.next = next;
+ }
+ @Override
+ public void flush() throws IOException {
+ if (delayed == null && next instanceof Flushable) {
+ ((Flushable) next).flush();
+ }
+ }
+ @Override
+ public void processRequest(Request request) throws RequestProcessorException {
+ if (delayed == null) {
+ next.processRequest(request);
+ } else {
+ delayed.add(request);
+ }
+ }
+ @Override
+ public void shutdown() {
+ next.shutdown();
+ }
+ private void close() {
+ if (delayed == null) {
+ delayed = new ArrayDeque<>();
+ }
+ }
+ private void open() throws RequestProcessorException {
+ if (delayed != null) {
+ for (Request request : delayed) {
+ next.processRequest(request);
+ }
+ delayed = null;
+ }
+ }
+ }
+
+ /** The number of log entries to log before starting a snapshot */
+ private static int snapCount = ZooKeeperServer.getSnapCount();
+
+ /**
+ * The total size of log entries before starting a snapshot
+ */
+ private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
+
+ /**
+ * Random numbers used to vary snapshot timing
+ */
+ private int randRoll;
+ private long randSize;
+
+ private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+
+ private final Semaphore snapThreadMutex = new Semaphore(1);
+
+ private final ZooKeeperServer zks;
+
+ private final DelayingProcessor nextProcessor;
+
+ /**
+ * Transactions that have been written and are waiting to be flushed to
+ * disk. Basically this is the list of SyncItems whose callbacks will be
+ * invoked after flush returns successfully.
+ */
+ private final Queue<Request> toFlush;
+ private long lastFlushTime;
+
+ public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
+ this.zks = zks;
+ this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
+ this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param count
+ */
+ public static void setSnapCount(int count) {
+ snapCount = count;
+ }
+
+ /**
+ * used by tests to get the snapcount
+ * @return the snapcount
+ */
+ public static int getSnapCount() {
+ return snapCount;
+ }
+
+ private long getRemainingDelay() {
+ long flushDelay = zks.getFlushDelay();
+ long duration = Time.currentElapsedTime() - lastFlushTime;
+ if (duration < flushDelay) {
+ return flushDelay - duration;
+ }
+ return 0;
+ }
+
+ /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
+ * whenever either condition is hit. If only one or the other is
+ * set, flush only when the relevant condition is hit.
+ */
+ private boolean shouldFlush() {
+ long flushDelay = zks.getFlushDelay();
+ long maxBatchSize = zks.getMaxBatchSize();
+ if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
+ return true;
+ }
+ return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param size
+ */
+ public static void setSnapSizeInBytes(long size) {
+ snapSizeInBytes = size;
+ }
+
+ private boolean shouldSnapshot() {
+ int logCount = zks.getZKDatabase().getTxnCount();
+ long logSize = zks.getZKDatabase().getTxnSize();
+ return (logCount > (snapCount / 2 + randRoll))
+ || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
+ }
+
+ private void resetSnapshotStats() {
+ randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
+ randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
+ }
+
+ @Override
+ public void run() {
+ try {
+ // we do this in an attempt to ensure that not all of the servers
+ // in the ensemble take a snapshot at the same time
+ resetSnapshotStats();
+ lastFlushTime = Time.currentElapsedTime();
+ while (true) {
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
+ long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
+ Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
+ if (si == null) {
+ /* We timed out looking for more writes to batch, go ahead and flush immediately */
+ flush();
+ si = queuedRequests.take();
+ }
+
+ if (si == REQUEST_OF_DEATH) {
+ break;
+ }
+
+ if (si == turnForwardingDelayOn) {
+ nextProcessor.close();
+ continue;
+ }
+ if (si == turnForwardingDelayOff) {
+ nextProcessor.open();
+ continue;
+ }
+
+ if (si instanceof FlushRequest) {
+ flush();
+ ((FlushRequest) si).latch.countDown();
+ continue;
+ }
+
+ long startProcessTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
+
+ // track the number of records written to the log
+ if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
+ if (shouldSnapshot()) {
+ resetSnapshotStats();
+ // roll the log
+ zks.getZKDatabase().rollLog();
+ // take a snapshot
+ if (!snapThreadMutex.tryAcquire()) {
+ LOG.warn("Too busy to snap, skipping");
+ } else {
+ new ZooKeeperThread("Snapshot Thread") {
+ public void run() {
+ try {
+ zks.takeSnapshot();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ snapThreadMutex.release();
+ }
+ }
+ }.start();
+ }
+ }
+ } else if (toFlush.isEmpty()) {
+ // optimization for read heavy workloads
+ // iff this is a read or a throttled request(which doesn't need to be written to the disk),
+ // and there are no pending flushes (writes), then just pass this to the next processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ nextProcessor.flush();
+ }
+ continue;
+ }
+ toFlush.add(si);
+ if (shouldFlush()) {
+ flush();
+ }
+ ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
+ }
+ } catch (Throwable t) {
+ handleException(this.getName(), t);
+ }
+ LOG.info("SyncRequestProcessor exited!");
+ }
+
+ /** Flushes all pending writes, and waits for this to complete. */
+ public void syncFlush() throws InterruptedException {
+ FlushRequest marker = new FlushRequest();
+ queuedRequests.add(marker);
+ marker.latch.await();
+ }
+
+ public void setDelayForwarding(boolean delayForwarding) {
+ queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
+ }
+
+ private void flush() throws IOException, RequestProcessorException {
+ if (this.toFlush.isEmpty()) {
+ return;
+ }
+
+ ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+ long flushStartTime = Time.currentElapsedTime();
+ zks.getZKDatabase().commit();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
+
+ if (this.nextProcessor == null) {
+ this.toFlush.clear();
+ } else {
+ while (!this.toFlush.isEmpty()) {
+ final Request i = this.toFlush.remove();
+ long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
+ this.nextProcessor.processRequest(i);
+ }
+ nextProcessor.flush();
+ }
+ lastFlushTime = Time.currentElapsedTime();
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down");
+ queuedRequests.add(REQUEST_OF_DEATH);
+ try {
+ this.join();
+ this.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while wating for {} to finish", this);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOG.warn("Got IO exception during shutdown");
+ } catch (RequestProcessorException e) {
+ LOG.warn("Got request processor exception during shutdown");
+ }
+ if (nextProcessor != null) {
+ nextProcessor.shutdown();
+ }
+ }
+
+ public void processRequest(final Request request) {
+ Objects.requireNonNull(request, "Request cannot be null");
+
+ request.syncQueueStartTime = Time.currentElapsedTime();
+ queuedRequests.add(request);
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
new file mode 100644
index 00000000000..fdfe0fe8467
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
@@ -0,0 +1,37 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package org.apache.zookeeper.server;
+
+import com.yahoo.vespa.zookeeper.Configurator;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+/**
+ * Overrides secure setting with value from {@link Configurator}.
+ * Workaround for incorrect handling of clientSecurePort in combination with ZooKeeper Dynamic Reconfiguration in 3.6.2
+ * See https://issues.apache.org/jira/browse/ZOOKEEPER-3577.
+ *
+ * Using package {@link org.apache.zookeeper.server} as {@link NettyServerCnxnFactory#NettyServerCnxnFactory()} is package-private.
+ *
+ * @author bjorncs
+ */
+public class VespaNettyServerCnxnFactory extends NettyServerCnxnFactory {
+
+ private static final Logger log = Logger.getLogger(VespaNettyServerCnxnFactory.class.getName());
+
+ private final boolean isSecure;
+
+ public VespaNettyServerCnxnFactory() {
+ super();
+ this.isSecure = Configurator.VespaNettyServerCnxnFactory_isSecure;
+ boolean portUnificationEnabled = Boolean.getBoolean(NettyServerCnxnFactory.PORT_UNIFICATION_KEY);
+ log.info(String.format("For %h: isSecure=%b, portUnification=%b", this, isSecure, portUnificationEnabled));
+ }
+
+ @Override
+ public void configure(InetSocketAddress addr, int maxClientCnxns, int backlog, boolean secure) throws IOException {
+ log.info(String.format("For %h: configured() invoked with parameter 'secure'=%b, overridden to %b", this, secure, isSecure));
+ super.configure(addr, maxClientCnxns, backlog, isSecure);
+ }
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
new file mode 100644
index 00000000000..5f408ea58ff
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -0,0 +1,2329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.Version;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZookeeperBanner;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.metrics.MetricsContext;
+import org.apache.zookeeper.proto.AuthPacket;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.GetSASLRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.proto.SetSASLResponse;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
+import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
+import org.apache.zookeeper.server.SessionTracker.Session;
+import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.server.util.JvmPauseMonitor;
+import org.apache.zookeeper.server.util.OSMXBean;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
+import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.sasl.SaslException;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+/**
+ * This class implements a simple standalone ZooKeeperServer. It sets up the
+ * following chain of RequestProcessors to process requests:
+ * PrepRequestProcessor -&gt; SyncRequestProcessor -&gt; FinalRequestProcessor
+ */
+public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
+
+ protected static final Logger LOG;
+ private static final RateLogger RATE_LOGGER;
+
+ public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
+
+ public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
+ public static final String SKIP_ACL = "zookeeper.skipACL";
+ public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
+
+ // When enabled, will check ACL constraints appertained to the requests first,
+ // before sending the requests to the quorum.
+ static final boolean enableEagerACLCheck;
+
+ static final boolean skipACL;
+
+ public static final boolean enforceQuota;
+
+ public static final String SASL_SUPER_USER = "zookeeper.superUser";
+
+ public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
+ public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
+ private static boolean digestEnabled;
+
+ // Add a enable/disable option for now, we should remove this one when
+ // this feature is confirmed to be stable
+ public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
+ private static boolean closeSessionTxnEnabled = true;
+
+ static {
+ LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
+
+ RATE_LOGGER = new RateLogger(LOG);
+
+ ZookeeperBanner.printBanner(LOG);
+
+ Environment.logEnv("Server environment:", LOG);
+
+ enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
+ LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);
+
+ skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
+ if (skipACL) {
+ LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
+ }
+
+ enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
+ if (enforceQuota) {
+ LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
+ }
+
+ digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
+ LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
+
+ closeSessionTxnEnabled = Boolean.parseBoolean(
+ System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
+ LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+ }
+
+ public static boolean isCloseSessionTxnEnabled() {
+ return closeSessionTxnEnabled;
+ }
+
+ public static void setCloseSessionTxnEnabled(boolean enabled) {
+ ZooKeeperServer.closeSessionTxnEnabled = enabled;
+ LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
+ ZooKeeperServer.closeSessionTxnEnabled);
+ }
+
+ protected ZooKeeperServerBean jmxServerBean;
+ protected DataTreeBean jmxDataTreeBean;
+
+ public static final int DEFAULT_TICK_TIME = 3000;
+ protected int tickTime = DEFAULT_TICK_TIME;
+ public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
+ protected static volatile int throttledOpWaitTime =
+ Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
+ /** value of -1 indicates unset, use default */
+ protected int minSessionTimeout = -1;
+ /** value of -1 indicates unset, use default */
+ protected int maxSessionTimeout = -1;
+ /** Socket listen backlog. Value of -1 indicates unset */
+ protected int listenBacklog = -1;
+ protected SessionTracker sessionTracker;
+ private FileTxnSnapLog txnLogFactory = null;
+ private ZKDatabase zkDb;
+ private ResponseCache readResponseCache;
+ private ResponseCache getChildrenResponseCache;
+ private final AtomicLong hzxid = new AtomicLong(0);
+ public static final Exception ok = new Exception("No prob");
+ protected RequestProcessor firstProcessor;
+ protected JvmPauseMonitor jvmPauseMonitor;
+ protected volatile State state = State.INITIAL;
+ private boolean isResponseCachingEnabled = true;
+ /* contains the configuration file content read at startup */
+ protected String initialConfig;
+ protected boolean reconfigEnabled;
+ private final RequestPathMetricsCollector requestPathMetricsCollector;
+ private static final int DEFAULT_SNAP_COUNT = 100000;
+ private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000;
+
+ private boolean localSessionEnabled = false;
+ protected enum State {
+ INITIAL,
+ RUNNING,
+ SHUTDOWN,
+ ERROR
+ }
+
+ /**
+ * This is the secret that we use to generate passwords. For the moment,
+ * it's more of a checksum that's used in reconnection, which carries no
+ * security weight, and is treated internally as if it carries no
+ * security weight.
+ */
+ private static final long superSecret = 0XB3415C00L;
+
+ private final AtomicInteger requestsInProcess = new AtomicInteger(0);
+ final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
+ // this data structure must be accessed under the outstandingChanges lock
+ final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
+
+ protected ServerCnxnFactory serverCnxnFactory;
+ protected ServerCnxnFactory secureServerCnxnFactory;
+
+ private final ServerStats serverStats;
+ private final ZooKeeperServerListener listener;
+ private ZooKeeperServerShutdownHandler zkShutdownHandler;
+ private volatile int createSessionTrackerServerId = 1;
+
+ private static final String FLUSH_DELAY = "zookeeper.flushDelay";
+ private static volatile long flushDelay;
+ private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
+ private static volatile long maxWriteQueuePollTime;
+ private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
+ private static volatile int maxBatchSize;
+
+ /**
+ * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
+ * Flag not used for small transfers like connectResponses.
+ */
+ public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
+ public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
+ public static final int intBufferStartingSizeBytes;
+
+ public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
+ public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
+
+ static {
+ long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
+ setFlushDelay(configuredFlushDelay);
+ setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
+ setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
+
+ intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);
+
+ if (intBufferStartingSizeBytes < 32) {
+ String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. "
+ + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
+ }
+
+ // Connection throttling
+ private BlueThrottle connThrottle = new BlueThrottle();
+
+ private RequestThrottler requestThrottler;
+ public static final String SNAP_COUNT = "zookeeper.snapCount";
+
+ /**
+ * This setting sets a limit on the total number of large requests that
+ * can be inflight and is designed to prevent ZooKeeper from accepting
+ * too many large requests such that the JVM runs out of usable heap and
+ * ultimately crashes.
+ *
+ * The limit is enforced by the {@link checkRequestSize(int, boolean)}
+ * method which is called by the connection layer ({@link NIOServerCnxn},
+ * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
+ * data off the TCP socket. The limit is then checked again by the
+ * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
+ * also atomically updates {@link currentLargeRequestBytes}. The request is
+ * then marked as a large request, with the request size stored in the Request
+ * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
+ *
+ * When a request is completed or dropped, the relevant code path calls the
+ * {@link requestFinished(Request)} method which performs the decrement if
+ * needed.
+ */
+ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
+
+ /**
+ * The size threshold after which a request is considered a large request
+ * and is checked against the large request byte limit.
+ */
+ private volatile int largeRequestThreshold = -1;
+
+ private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
+
+ private AuthenticationHelper authHelper;
+
+ void removeCnxn(ServerCnxn cnxn) {
+ zkDb.removeCnxn(cnxn);
+ }
+
+ /**
+ * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
+ * methods to prepare the instance (eg datadir, datalogdir, ticktime,
+ * builder, etc...)
+ *
+ */
+ public ZooKeeperServer() {
+ listener = new ZooKeeperServerListenerImpl(this);
+ serverStats = new ServerStats(this);
+ this.requestPathMetricsCollector = new RequestPathMetricsCollector();
+ this.authHelper = new AuthenticationHelper();
+ }
+
+ /**
+ * Keeping this constructor for backward compatibility
+ */
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
+ this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
+ }
+
+ /**
+ * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
+ * actually start listening for clients until run() is invoked.
+ *
+ */
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
+ serverStats = new ServerStats(this);
+ this.txnLogFactory = txnLogFactory;
+ this.txnLogFactory.setServerStats(this.serverStats);
+ this.zkDb = zkDb;
+ this.tickTime = tickTime;
+ setMinSessionTimeout(minSessionTimeout);
+ setMaxSessionTimeout(maxSessionTimeout);
+ this.listenBacklog = clientPortListenBacklog;
+ this.reconfigEnabled = reconfigEnabled;
+
+ listener = new ZooKeeperServerListenerImpl(this);
+
+ readResponseCache = new ResponseCache(Integer.getInteger(
+ GET_DATA_RESPONSE_CACHE_SIZE,
+ ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
+
+ getChildrenResponseCache = new ResponseCache(Integer.getInteger(
+ GET_CHILDREN_RESPONSE_CACHE_SIZE,
+ ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
+
+ this.initialConfig = initialConfig;
+
+ this.requestPathMetricsCollector = new RequestPathMetricsCollector();
+
+ this.initLargeRequestThrottlingSettings();
+
+ this.authHelper = new AuthenticationHelper();
+
+ LOG.info(
+ "Created patched server with"
+ + " tickTime {} ms"
+ + " minSessionTimeout {} ms"
+ + " maxSessionTimeout {} ms"
+ + " clientPortListenBacklog {}"
+ + " datadir {}"
+ + " snapdir {}",
+ tickTime,
+ getMinSessionTimeout(),
+ getMaxSessionTimeout(),
+ getClientPortListenBacklog(),
+ txnLogFactory.getDataDir(),
+ txnLogFactory.getSnapDir());
+ }
+
+ public String getInitialConfig() {
+ return initialConfig;
+ }
+
+ /**
+ * Adds JvmPauseMonitor and calls
+ * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
+ *
+ */
+ public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
+ this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
+ this.jvmPauseMonitor = jvmPauseMonitor;
+ if (jvmPauseMonitor != null) {
+ LOG.info("Added JvmPauseMonitor to server");
+ }
+ }
+
+ /**
+ * creates a zookeeperserver instance.
+ * @param txnLogFactory the file transaction snapshot logging class
+ * @param tickTime the ticktime for the server
+ */
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
+ this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
+ }
+
+ public ServerStats serverStats() {
+ return serverStats;
+ }
+
+ public RequestPathMetricsCollector getRequestPathMetricsCollector() {
+ return requestPathMetricsCollector;
+ }
+
+ public BlueThrottle connThrottle() {
+ return connThrottle;
+ }
+
+ public void dumpConf(PrintWriter pwriter) {
+ pwriter.print("clientPort=");
+ pwriter.println(getClientPort());
+ pwriter.print("secureClientPort=");
+ pwriter.println(getSecureClientPort());
+ pwriter.print("dataDir=");
+ pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
+ pwriter.print("dataDirSize=");
+ pwriter.println(getDataDirSize());
+ pwriter.print("dataLogDir=");
+ pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
+ pwriter.print("dataLogSize=");
+ pwriter.println(getLogDirSize());
+ pwriter.print("tickTime=");
+ pwriter.println(getTickTime());
+ pwriter.print("maxClientCnxns=");
+ pwriter.println(getMaxClientCnxnsPerHost());
+ pwriter.print("minSessionTimeout=");
+ pwriter.println(getMinSessionTimeout());
+ pwriter.print("maxSessionTimeout=");
+ pwriter.println(getMaxSessionTimeout());
+ pwriter.print("clientPortListenBacklog=");
+ pwriter.println(getClientPortListenBacklog());
+
+ pwriter.print("serverId=");
+ pwriter.println(getServerId());
+ }
+
+ public ZooKeeperServerConf getConf() {
+ return new ZooKeeperServerConf(
+ getClientPort(),
+ zkDb.snapLog.getSnapDir().getAbsolutePath(),
+ zkDb.snapLog.getDataDir().getAbsolutePath(),
+ getTickTime(),
+ getMaxClientCnxnsPerHost(),
+ getMinSessionTimeout(),
+ getMaxSessionTimeout(),
+ getServerId(),
+ getClientPortListenBacklog());
+ }
+
+ /**
+ * This constructor is for backward compatibility with the existing unit
+ * test code.
+ * It defaults to FileLogProvider persistence provider.
+ */
+ public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
+ this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
+ }
+
+ /**
+ * Default constructor, relies on the config for its argument values
+ *
+ * @throws IOException
+ */
+ public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
+ this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
+ }
+
+ /**
+ * get the zookeeper database for this server
+ * @return the zookeeper database for this server
+ */
+ public ZKDatabase getZKDatabase() {
+ return this.zkDb;
+ }
+
+ /**
+ * set the zkdatabase for this zookeeper server
+ * @param zkDb
+ */
+ public void setZKDatabase(ZKDatabase zkDb) {
+ this.zkDb = zkDb;
+ }
+
+ /**
+ * Restore sessions and data
+ */
+ public void loadData() throws IOException, InterruptedException {
+ /*
+ * When a new leader starts executing Leader#lead, it
+ * invokes this method. The database, however, has been
+ * initialized before running leader election so that
+ * the server could pick its zxid for its initial vote.
+ * It does it by invoking QuorumPeer#getLastLoggedZxid.
+ * Consequently, we don't need to initialize it once more
+ * and avoid the penalty of loading it a second time. Not
+ * reloading it is particularly important for applications
+ * that host a large database.
+ *
+ * The following if block checks whether the database has
+ * been initialized or not. Note that this method is
+ * invoked by at least one other method:
+ * ZooKeeperServer#startdata.
+ *
+ * See ZOOKEEPER-1642 for more detail.
+ */
+ if (zkDb.isInitialized()) {
+ setZxid(zkDb.getDataTreeLastProcessedZxid());
+ } else {
+ setZxid(zkDb.loadDataBase());
+ }
+
+ // Clean up dead sessions
+ zkDb.getSessions().stream()
+ .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
+ .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
+
+ // Make a clean snapshot
+ takeSnapshot();
+ }
+
+ public void takeSnapshot() {
+ takeSnapshot(false);
+ }
+
+ public void takeSnapshot(boolean syncSnap) {
+ long start = Time.currentElapsedTime();
+ try {
+ txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+ } catch (IOException e) {
+ LOG.error("Severe unrecoverable error, exiting", e);
+ // This is a severe error that we cannot recover from,
+ // so we need to exit
+ ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ }
+ long elapsed = Time.currentElapsedTime() - start;
+ LOG.info("Snapshot taken in {} ms", elapsed);
+ ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
+ }
+
+ public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
+ return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
+ }
+
+ @Override
+ public long getDataDirSize() {
+ if (zkDb == null) {
+ return 0L;
+ }
+ File path = zkDb.snapLog.getDataDir();
+ return getDirSize(path);
+ }
+
+ @Override
+ public long getLogDirSize() {
+ if (zkDb == null) {
+ return 0L;
+ }
+ File path = zkDb.snapLog.getSnapDir();
+ return getDirSize(path);
+ }
+
+ private long getDirSize(File file) {
+ long size = 0L;
+ if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ size += getDirSize(f);
+ }
+ }
+ } else {
+ size = file.length();
+ }
+ return size;
+ }
+
+ public long getZxid() {
+ return hzxid.get();
+ }
+
+ public SessionTracker getSessionTracker() {
+ return sessionTracker;
+ }
+
+ long getNextZxid() {
+ return hzxid.incrementAndGet();
+ }
+
+ public void setZxid(long zxid) {
+ hzxid.set(zxid);
+ }
+
+ private void close(long sessionId) {
+ Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
+ submitRequest(si);
+ }
+
+ public void closeSession(long sessionId) {
+ LOG.info("Closing session 0x{}", Long.toHexString(sessionId));
+
+ // we do not want to wait for a session close. send it as soon as we
+ // detect it!
+ close(sessionId);
+ }
+
+ protected void killSession(long sessionId, long zxid) {
+ zkDb.killSession(sessionId, zxid);
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
+ }
+ if (sessionTracker != null) {
+ sessionTracker.removeSession(sessionId);
+ }
+ }
+
+ public void expire(Session session) {
+ long sessionId = session.getSessionId();
+ LOG.info(
+ "Expiring session 0x{}, timeout of {}ms exceeded",
+ Long.toHexString(sessionId),
+ session.getTimeout());
+ close(sessionId);
+ }
+
+ public void expire(long sessionId) {
+ LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));
+
+ close(sessionId);
+ }
+
+ public static class MissingSessionException extends IOException {
+
+ private static final long serialVersionUID = 7467414635467261007L;
+
+ public MissingSessionException(String msg) {
+ super(msg);
+ }
+
+ }
+
+ void touch(ServerCnxn cnxn) throws MissingSessionException {
+ if (cnxn == null) {
+ return;
+ }
+ long id = cnxn.getSessionId();
+ int to = cnxn.getSessionTimeout();
+ if (!sessionTracker.touchSession(id, to)) {
+ throw new MissingSessionException("No session with sessionid 0x"
+ + Long.toHexString(id)
+ + " exists, probably expired and removed");
+ }
+ }
+
+ protected void registerJMX() {
+ // register with JMX
+ try {
+ jmxServerBean = new ZooKeeperServerBean(this);
+ MBeanRegistry.getInstance().register(jmxServerBean, null);
+
+ try {
+ jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
+ MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxDataTreeBean = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxServerBean = null;
+ }
+ }
+
+ public void startdata() throws IOException, InterruptedException {
+ //check to see if zkDb is not null
+ if (zkDb == null) {
+ zkDb = new ZKDatabase(this.txnLogFactory);
+ }
+ if (!zkDb.isInitialized()) {
+ loadData();
+ }
+ }
+
+ public synchronized void startup() {
+ startupWithServerState(State.RUNNING);
+ }
+
+ public synchronized void startupWithoutServing() {
+ startupWithServerState(State.INITIAL);
+ }
+
+ public synchronized void startServing() {
+ setState(State.RUNNING);
+ notifyAll();
+ }
+
+ private void startupWithServerState(State state) {
+ if (sessionTracker == null) {
+ createSessionTracker();
+ }
+ startSessionTracker();
+ setupRequestProcessors();
+
+ startRequestThrottler();
+
+ registerJMX();
+
+ startJvmPauseMonitor();
+
+ registerMetrics();
+
+ setState(state);
+
+ requestPathMetricsCollector.start();
+
+ localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
+
+ notifyAll();
+ }
+
+ protected void startJvmPauseMonitor() {
+ if (this.jvmPauseMonitor != null) {
+ this.jvmPauseMonitor.serviceStart();
+ }
+ }
+
+ protected void startRequestThrottler() {
+ requestThrottler = new RequestThrottler(this);
+ requestThrottler.start();
+
+ }
+
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
+ ((SyncRequestProcessor) syncProcessor).start();
+ firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+ ((PrepRequestProcessor) firstProcessor).start();
+ }
+
+ public ZooKeeperServerListener getZooKeeperServerListener() {
+ return listener;
+ }
+
+ /**
+ * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
+ * {@link #startup()} being called
+ *
+ * @param newId ID to use
+ */
+ public void setCreateSessionTrackerServerId(int newId) {
+ createSessionTrackerServerId = newId;
+ }
+
+ protected void createSessionTracker() {
+ sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
+ }
+
+ protected void startSessionTracker() {
+ ((SessionTrackerImpl) sessionTracker).start();
+ }
+
+ /**
+ * Sets the state of ZooKeeper server. After changing the state, it notifies
+ * the server state change to a registered shutdown handler, if any.
+ * <p>
+ * The following are the server state transitions:
+ * <ul><li>During startup the server will be in the INITIAL state.</li>
+ * <li>After successfully starting, the server sets the state to RUNNING.
+ * </li>
+ * <li>The server transitions to the ERROR state if it hits an internal
+ * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
+ * error events, e.g., SyncRequestProcessor not being able to write a txn to
+ * disk.</li>
+ * <li>During shutdown the server sets the state to SHUTDOWN, which
+ * corresponds to the server not running.</li></ul>
+ *
+ * @param state new server state.
+ */
+ protected void setState(State state) {
+ this.state = state;
+ // Notify server state changes to the registered shutdown handler, if any.
+ if (zkShutdownHandler != null) {
+ zkShutdownHandler.handle(state);
+ } else {
+ LOG.debug(
+ "ZKShutdownHandler is not registered, so ZooKeeper server"
+ + " won't take any action on ERROR or SHUTDOWN server state changes");
+ }
+ }
+
+ /**
+ * This can be used while shutting down the server to see whether the server
+ * is already shutdown or not.
+ *
+ * @return true if the server is running or server hits an error, false
+ * otherwise.
+ */
+ protected boolean canShutdown() {
+ return state == State.RUNNING || state == State.ERROR;
+ }
+
+ /**
+ * @return true if the server is running, false otherwise.
+ */
+ public boolean isRunning() {
+ return state == State.RUNNING;
+ }
+
+ public void shutdown() {
+ shutdown(false);
+ }
+
+ /**
+ * Shut down the server instance
+ * @param fullyShutDown true if another server using the same database will not replace this one in the same process
+ */
+ public synchronized void shutdown(boolean fullyShutDown) {
+ if (!canShutdown()) {
+ if (fullyShutDown && zkDb != null) {
+ zkDb.clear();
+ }
+ LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
+ return;
+ }
+ LOG.info("shutting down");
+
+ // new RuntimeException("Calling shutdown").printStackTrace();
+ setState(State.SHUTDOWN);
+
+ // unregister all metrics that are keeping a strong reference to this object
+ // subclasses will do their specific clean up
+ unregisterMetrics();
+
+ if (requestThrottler != null) {
+ requestThrottler.shutdown();
+ }
+
+ // Since sessionTracker and syncThreads poll we just have to
+ // set running to false and they will detect it during the poll
+ // interval.
+ if (sessionTracker != null) {
+ sessionTracker.shutdown();
+ }
+ if (firstProcessor != null) {
+ firstProcessor.shutdown();
+ }
+ if (jvmPauseMonitor != null) {
+ jvmPauseMonitor.serviceStop();
+ }
+
+ if (zkDb != null) {
+ if (fullyShutDown) {
+ zkDb.clear();
+ } else {
+ // else there is no need to clear the database
+ // * When a new quorum is established we can still apply the diff
+ // on top of the same zkDb data
+ // * If we fetch a new snapshot from leader, the zkDb will be
+ // cleared anyway before loading the snapshot
+ try {
+ // This will fast-forward the database to the latest recorded transactions
+ zkDb.fastForwardDataBase();
+ } catch (IOException e) {
+ LOG.error("Error updating DB", e);
+ zkDb.clear();
+ }
+ }
+ }
+
+ requestPathMetricsCollector.shutdown();
+ unregisterJMX();
+ }
+
+ protected void unregisterJMX() {
+ // unregister from JMX
+ try {
+ if (jmxDataTreeBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ try {
+ if (jmxServerBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxServerBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxServerBean = null;
+ jmxDataTreeBean = null;
+ }
+
+ public void incInProcess() {
+ requestsInProcess.incrementAndGet();
+ }
+
+ public void decInProcess() {
+ requestsInProcess.decrementAndGet();
+ if (requestThrottler != null) {
+ requestThrottler.throttleWake();
+ }
+ }
+
+ public int getInProcess() {
+ return requestsInProcess.get();
+ }
+
+ public int getInflight() {
+ return requestThrottleInflight();
+ }
+
+ private int requestThrottleInflight() {
+ if (requestThrottler != null) {
+ return requestThrottler.getInflight();
+ }
+ return 0;
+ }
+
+ static class PrecalculatedDigest {
+ final long nodeDigest;
+ final long treeDigest;
+
+ PrecalculatedDigest(long nodeDigest, long treeDigest) {
+ this.nodeDigest = nodeDigest;
+ this.treeDigest = treeDigest;
+ }
+ }
+
+
+ /**
+ * This structure is used to facilitate information sharing between PrepRP
+ * and FinalRP.
+ */
+ static class ChangeRecord {
+ PrecalculatedDigest precalculatedDigest;
+ byte[] data;
+
+ ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
+ this.zxid = zxid;
+ this.path = path;
+ this.stat = stat;
+ this.childCount = childCount;
+ this.acl = acl;
+ }
+
+ long zxid;
+
+ String path;
+
+ StatPersisted stat; /* Make sure to create a new object when changing */
+
+ int childCount;
+
+ List<ACL> acl; /* Make sure to create a new object when changing */
+
+ ChangeRecord duplicate(long zxid) {
+ StatPersisted stat = new StatPersisted();
+ if (this.stat != null) {
+ DataTree.copyStatPersisted(this.stat, stat);
+ }
+ ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
+ acl == null ? new ArrayList<>() : new ArrayList<>(acl));
+ changeRecord.precalculatedDigest = precalculatedDigest;
+ changeRecord.data = data;
+ return changeRecord;
+ }
+
+ }
+
+ byte[] generatePasswd(long id) {
+ Random r = new Random(id ^ superSecret);
+ byte[] p = new byte[16];
+ r.nextBytes(p);
+ return p;
+ }
+
+ protected boolean checkPasswd(long sessionId, byte[] passwd) {
+ return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
+ }
+
+ long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
+ if (passwd == null) {
+ // Possible since it's just deserialized from a packet on the wire.
+ passwd = new byte[0];
+ }
+ long sessionId = sessionTracker.createSession(timeout);
+ Random r = new Random(sessionId ^ superSecret);
+ r.nextBytes(passwd);
+ ByteBuffer to = ByteBuffer.allocate(4);
+ to.putInt(timeout);
+ cnxn.setSessionId(sessionId);
+ Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+ submitRequest(si);
+ return sessionId;
+ }
+
+ /**
+ * set the owner of this session as owner
+ * @param id the session id
+ * @param owner the owner of the session
+ * @throws SessionExpiredException
+ */
+ public void setOwner(long id, Object owner) throws SessionExpiredException {
+ sessionTracker.setOwner(id, owner);
+ }
+
+ protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
+ boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
+ }
+ finishSessionInit(cnxn, rc);
+ }
+
+ public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
+ if (checkPasswd(sessionId, passwd)) {
+ revalidateSession(cnxn, sessionId, sessionTimeout);
+ } else {
+ LOG.warn(
+ "Incorrect password from {} for session 0x{}",
+ cnxn.getRemoteSocketAddress(),
+ Long.toHexString(sessionId));
+ finishSessionInit(cnxn, false);
+ }
+ }
+
+ public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
+ // register with JMX
+ try {
+ if (valid) {
+ if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
+ serverCnxnFactory.registerConnection(cnxn);
+ } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
+ secureServerCnxnFactory.registerConnection(cnxn);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+
+ try {
+ ConnectResponse rsp = new ConnectResponse(
+ 0,
+ valid ? cnxn.getSessionTimeout() : 0,
+ valid ? cnxn.getSessionId() : 0, // send 0 if session is no
+ // longer valid
+ valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+ bos.writeInt(-1, "len");
+ rsp.serialize(bos, "connect");
+ if (!cnxn.isOldClient) {
+ bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
+ }
+ baos.close();
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+ bb.putInt(bb.remaining() - 4).rewind();
+ cnxn.sendBuffer(bb);
+
+ if (valid) {
+ LOG.debug(
+ "Established session 0x{} with negotiated timeout {} for client {}",
+ Long.toHexString(cnxn.getSessionId()),
+ cnxn.getSessionTimeout(),
+ cnxn.getRemoteSocketAddress());
+ cnxn.enableRecv();
+ } else {
+
+ LOG.info(
+ "Invalid session 0x{} for client {}, probably expired",
+ Long.toHexString(cnxn.getSessionId()),
+ cnxn.getRemoteSocketAddress());
+ cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Exception while establishing session, closing", e);
+ cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
+ }
+ }
+
+ public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
+ closeSession(cnxn.getSessionId());
+ }
+
+ public long getServerId() {
+ return 0;
+ }
+
+ /**
+ * If the underlying Zookeeper server support local session, this method
+ * will set a isLocalSession to true if a request is associated with
+ * a local session.
+ *
+ * @param si
+ */
+ protected void setLocalSessionFlag(Request si) {
+ }
+
+ public void submitRequest(Request si) {
+ enqueueRequest(si);
+ }
+
+ public void enqueueRequest(Request si) {
+ if (requestThrottler == null) {
+ synchronized (this) {
+ try {
+ // Since all requests are passed to the request
+ // processor it should wait for setting up the request
+ // processor chain. The state will be updated to RUNNING
+ // after the setup.
+ while (state == State.INITIAL) {
+ wait(1000);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ if (requestThrottler == null) {
+ throw new RuntimeException("Not started");
+ }
+ }
+ }
+ requestThrottler.submitRequest(si);
+ }
+
+ public void submitRequestNow(Request si) {
+ if (firstProcessor == null) {
+ synchronized (this) {
+ try {
+ // Since all requests are passed to the request
+ // processor it should wait for setting up the request
+ // processor chain. The state will be updated to RUNNING
+ // after the setup.
+ while (state == State.INITIAL) {
+ wait(1000);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ if (firstProcessor == null || state != State.RUNNING) {
+ throw new RuntimeException("Not started");
+ }
+ }
+ }
+ try {
+ touch(si.cnxn);
+ boolean validpacket = Request.isValid(si.type);
+ if (validpacket) {
+ setLocalSessionFlag(si);
+ firstProcessor.processRequest(si);
+ if (si.cnxn != null) {
+ incInProcess();
+ }
+ } else {
+ LOG.warn("Received packet at server of unknown type {}", si.type);
+ // Update request accounting/throttling limits
+ requestFinished(si);
+ new UnimplementedRequestProcessor().processRequest(si);
+ }
+ } catch (MissingSessionException e) {
+ LOG.debug("Dropping request.", e);
+ // Update request accounting/throttling limits
+ requestFinished(si);
+ } catch (RequestProcessorException e) {
+ LOG.error("Unable to process request", e);
+ // Update request accounting/throttling limits
+ requestFinished(si);
+ }
+ }
+
+ public static int getSnapCount() {
+ int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT);
+ // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
+ if (snapCount < 2) {
+ LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
+ snapCount = 2;
+ }
+ return snapCount;
+ }
+
+ public int getGlobalOutstandingLimit() {
+ return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT);
+ }
+
+ public static long getSnapSizeInBytes() {
+ long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
+ if (size <= 0) {
+ LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
+ }
+ return size * 1024; // Convert to bytes
+ }
+
+ public void setServerCnxnFactory(ServerCnxnFactory factory) {
+ serverCnxnFactory = factory;
+ }
+
+ public ServerCnxnFactory getServerCnxnFactory() {
+ return serverCnxnFactory;
+ }
+
+ public ServerCnxnFactory getSecureServerCnxnFactory() {
+ return secureServerCnxnFactory;
+ }
+
+ public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
+ secureServerCnxnFactory = factory;
+ }
+
+ /**
+ * return the last processed id from the
+ * datatree
+ */
+ public long getLastProcessedZxid() {
+ return zkDb.getDataTreeLastProcessedZxid();
+ }
+
+ /**
+ * return the outstanding requests
+ * in the queue, which haven't been
+ * processed yet
+ */
+ public long getOutstandingRequests() {
+ return getInProcess();
+ }
+
+ /**
+ * return the total number of client connections that are alive
+ * to this server
+ */
+ public int getNumAliveConnections() {
+ int numAliveConnections = 0;
+
+ if (serverCnxnFactory != null) {
+ numAliveConnections += serverCnxnFactory.getNumAliveConnections();
+ }
+
+ if (secureServerCnxnFactory != null) {
+ numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
+ }
+
+ return numAliveConnections;
+ }
+
+ /**
+ * truncate the log to get in sync with others
+ * if in a quorum
+ * @param zxid the zxid that it needs to get in sync
+ * with others
+ * @throws IOException
+ */
+ public void truncateLog(long zxid) throws IOException {
+ this.zkDb.truncateLog(zxid);
+ }
+
+ public int getTickTime() {
+ return tickTime;
+ }
+
+ public void setTickTime(int tickTime) {
+ LOG.info("tickTime set to {} ms", tickTime);
+ this.tickTime = tickTime;
+ }
+
+ public static int getThrottledOpWaitTime() {
+ return throttledOpWaitTime;
+ }
+
+ public static void setThrottledOpWaitTime(int time) {
+ LOG.info("throttledOpWaitTime set to {} ms", time);
+ throttledOpWaitTime = time;
+ }
+
+ public int getMinSessionTimeout() {
+ return minSessionTimeout;
+ }
+
+ public void setMinSessionTimeout(int min) {
+ this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
+ LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout);
+ }
+
+ public int getMaxSessionTimeout() {
+ return maxSessionTimeout;
+ }
+
+ public void setMaxSessionTimeout(int max) {
+ this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
+ LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout);
+ }
+
+ public int getClientPortListenBacklog() {
+ return listenBacklog;
+ }
+
+ public void setClientPortListenBacklog(int backlog) {
+ this.listenBacklog = backlog;
+ LOG.info("clientPortListenBacklog set to {}", backlog);
+ }
+
+ public int getClientPort() {
+ return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
+ }
+
+ public int getSecureClientPort() {
+ return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
+ }
+
+ /** Maximum number of connections allowed from particular host (ip) */
+ public int getMaxClientCnxnsPerHost() {
+ if (serverCnxnFactory != null) {
+ return serverCnxnFactory.getMaxClientCnxnsPerHost();
+ }
+ if (secureServerCnxnFactory != null) {
+ return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
+ }
+ return -1;
+ }
+
+ public void setTxnLogFactory(FileTxnSnapLog txnLog) {
+ this.txnLogFactory = txnLog;
+ }
+
+ public FileTxnSnapLog getTxnLogFactory() {
+ return this.txnLogFactory;
+ }
+
+ /**
+ * Returns the elapsed sync of time of transaction log in milliseconds.
+ */
+ public long getTxnLogElapsedSyncTime() {
+ return txnLogFactory.getTxnLogElapsedSyncTime();
+ }
+
+ public String getState() {
+ return "standalone";
+ }
+
+ public void dumpEphemerals(PrintWriter pwriter) {
+ zkDb.dumpEphemerals(pwriter);
+ }
+
+ public Map<Long, Set<String>> getEphemerals() {
+ return zkDb.getEphemerals();
+ }
+
+ public double getConnectionDropChance() {
+ return connThrottle.getDropChance();
+ }
+
+ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+ throws IOException, ClientCnxnLimitException {
+
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+ ConnectRequest connReq = new ConnectRequest();
+ connReq.deserialize(bia, "connect");
+ LOG.debug(
+ "Session establishment request from client {} client's lastZxid is 0x{}",
+ cnxn.getRemoteSocketAddress(),
+ Long.toHexString(connReq.getLastZxidSeen()));
+
+ long sessionId = connReq.getSessionId();
+ int tokensNeeded = 1;
+ if (connThrottle.isConnectionWeightEnabled()) {
+ if (sessionId == 0) {
+ if (localSessionEnabled) {
+ tokensNeeded = connThrottle.getRequiredTokensForLocal();
+ } else {
+ tokensNeeded = connThrottle.getRequiredTokensForGlobal();
+ }
+ } else {
+ tokensNeeded = connThrottle.getRequiredTokensForRenew();
+ }
+ }
+
+ if (!connThrottle.checkLimit(tokensNeeded)) {
+ throw new ClientCnxnLimitException();
+ }
+ ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
+ ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
+
+ boolean readOnly = false;
+ try {
+ readOnly = bia.readBool("readOnly");
+ cnxn.isOldClient = false;
+ } catch (IOException e) {
+ // this is ok -- just a packet from an old client which
+ // doesn't contain readOnly field
+ LOG.warn(
+ "Connection request from old client {}; will be dropped if server is in r-o mode",
+ cnxn.getRemoteSocketAddress());
+ }
+ if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
+ String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
+ LOG.info(msg);
+ throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
+ }
+ if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+ String msg = "Refusing session request for client "
+ + cnxn.getRemoteSocketAddress()
+ + " as it has seen zxid 0x"
+ + Long.toHexString(connReq.getLastZxidSeen())
+ + " our last zxid is 0x"
+ + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ + " client must try another server";
+
+ LOG.info(msg);
+ throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
+ }
+ int sessionTimeout = connReq.getTimeOut();
+ byte[] passwd = connReq.getPasswd();
+ int minSessionTimeout = getMinSessionTimeout();
+ if (sessionTimeout < minSessionTimeout) {
+ sessionTimeout = minSessionTimeout;
+ }
+ int maxSessionTimeout = getMaxSessionTimeout();
+ if (sessionTimeout > maxSessionTimeout) {
+ sessionTimeout = maxSessionTimeout;
+ }
+ cnxn.setSessionTimeout(sessionTimeout);
+ // We don't want to receive any packets until we are sure that the
+ // session is setup
+ cnxn.disableRecv();
+ if (sessionId == 0) {
+ long id = createSession(cnxn, passwd, sessionTimeout);
+ LOG.debug(
+ "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
+ Long.toHexString(id),
+ Long.toHexString(connReq.getLastZxidSeen()),
+ connReq.getTimeOut(),
+ cnxn.getRemoteSocketAddress());
+ } else {
+ validateSession(cnxn, sessionId);
+ LOG.debug(
+ "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
+ Long.toHexString(sessionId),
+ Long.toHexString(connReq.getLastZxidSeen()),
+ connReq.getTimeOut(),
+ cnxn.getRemoteSocketAddress());
+ if (serverCnxnFactory != null) {
+ serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
+ }
+ if (secureServerCnxnFactory != null) {
+ secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
+ }
+ cnxn.setSessionId(sessionId);
+ reopenSession(cnxn, sessionId, passwd, sessionTimeout);
+ ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
+
+ }
+ }
+
+ /**
+ * Validate if a particular session can be reestablished.
+ *
+ * @param cnxn
+ * @param sessionId
+ */
+ protected void validateSession(ServerCnxn cnxn, long sessionId)
+ throws IOException {
+ // do nothing
+ }
+
+ public boolean shouldThrottle(long outStandingCount) {
+ int globalOutstandingLimit = getGlobalOutstandingLimit();
+ if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
+ return outStandingCount > 0;
+ }
+ return false;
+ }
+
+ long getFlushDelay() {
+ return flushDelay;
+ }
+
+ static void setFlushDelay(long delay) {
+ LOG.info("{} = {} ms", FLUSH_DELAY, delay);
+ flushDelay = delay;
+ }
+
+ long getMaxWriteQueuePollTime() {
+ return maxWriteQueuePollTime;
+ }
+
+ static void setMaxWriteQueuePollTime(long maxTime) {
+ LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
+ maxWriteQueuePollTime = maxTime;
+ }
+
+ int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ static void setMaxBatchSize(int size) {
+ LOG.info("{}={}", MAX_BATCH_SIZE, size);
+ maxBatchSize = size;
+ }
+
+ private void initLargeRequestThrottlingSettings() {
+ setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
+ setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
+ }
+
+ public int getLargeRequestMaxBytes() {
+ return largeRequestMaxBytes;
+ }
+
+ public void setLargeRequestMaxBytes(int bytes) {
+ if (bytes <= 0) {
+ LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
+ LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
+ } else {
+ largeRequestMaxBytes = bytes;
+ LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
+ }
+ }
+
+ public int getLargeRequestThreshold() {
+ return largeRequestThreshold;
+ }
+
+ public void setLargeRequestThreshold(int threshold) {
+ if (threshold == 0 || threshold < -1) {
+ LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
+ largeRequestThreshold = -1;
+ } else {
+ largeRequestThreshold = threshold;
+ LOG.info("The large request threshold is set to {}", largeRequestThreshold);
+ }
+ }
+
+ public int getLargeRequestBytes() {
+ return currentLargeRequestBytes.get();
+ }
+
+ private boolean isLargeRequest(int length) {
+ // The large request limit is disabled when threshold is -1
+ if (largeRequestThreshold == -1) {
+ return false;
+ }
+ return length > largeRequestThreshold;
+ }
+
+ public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
+ if (!isLargeRequest(length)) {
+ return true;
+ }
+ if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
+ return true;
+ } else {
+ ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
+ throw new IOException("Rejecting large request");
+ }
+
+ }
+
+ private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
+ if (!isLargeRequest(length)) {
+ return true;
+ }
+
+ int bytes = currentLargeRequestBytes.addAndGet(length);
+ if (bytes > largeRequestMaxBytes) {
+ currentLargeRequestBytes.addAndGet(-length);
+ ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
+ throw new IOException("Rejecting large request");
+ }
+ return true;
+ }
+
+ public void requestFinished(Request request) {
+ int largeRequestLength = request.getLargeRequestSize();
+ if (largeRequestLength != -1) {
+ currentLargeRequestBytes.addAndGet(-largeRequestLength);
+ }
+ }
+
+ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+ // We have the request, now process and setup for next
+ InputStream bais = new ByteBufferInputStream(incomingBuffer);
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+ RequestHeader h = new RequestHeader();
+ h.deserialize(bia, "header");
+
+ // Need to increase the outstanding request count first, otherwise
+ // there might be a race condition that it enabled recv after
+ // processing request and then disabled when check throttling.
+ //
+ // Be aware that we're actually checking the global outstanding
+ // request before this request.
+ //
+ // It's fine if the IOException thrown before we decrease the count
+ // in cnxn, since it will close the cnxn anyway.
+ cnxn.incrOutstandingAndCheckThrottle(h);
+
+ // Through the magic of byte buffers, txn will not be
+ // pointing
+ // to the start of the txn
+ incomingBuffer = incomingBuffer.slice();
+ if (h.getType() == OpCode.auth) {
+ LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
+ AuthPacket authPacket = new AuthPacket();
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
+ String scheme = authPacket.getScheme();
+ ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
+ Code authReturn = Code.AUTHFAILED;
+ if (ap != null) {
+ try {
+ // handleAuthentication may close the connection, to allow the client to choose
+ // a different server to connect to.
+ authReturn = ap.handleAuthentication(
+ new ServerAuthenticationProvider.ServerObjs(this, cnxn),
+ authPacket.getAuth());
+ } catch (RuntimeException e) {
+ LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
+ authReturn = Code.AUTHFAILED;
+ }
+ }
+ if (authReturn == Code.OK) {
+ LOG.info("Session 0x{}: auth success for scheme {} and address {}",
+ Long.toHexString(cnxn.getSessionId()), scheme,
+ cnxn.getRemoteSocketAddress());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue());
+ cnxn.sendResponse(rh, null, null);
+ } else {
+ if (ap == null) {
+ LOG.warn(
+ "No authentication provider for scheme: {} has {}",
+ scheme,
+ ProviderRegistry.listProviders());
+ } else {
+ LOG.warn("Authentication failed for scheme: {}", scheme);
+ }
+ // send a response...
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue());
+ cnxn.sendResponse(rh, null, null);
+ // ... and close connection
+ cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+ cnxn.disableRecv();
+ }
+ return;
+ } else if (h.getType() == OpCode.sasl) {
+ processSasl(incomingBuffer, cnxn, h);
+ } else {
+ if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
+ // Authentication enforcement is failed
+ // Already sent response to user about failure and closed the session, lets return
+ return;
+ } else {
+ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
+ int length = incomingBuffer.limit();
+ if (isLargeRequest(length)) {
+ // checkRequestSize will throw IOException if request is rejected
+ checkRequestSizeWhenMessageReceived(length);
+ si.setLargeRequestSize(length);
+ }
+ si.setOwner(ServerCnxn.me);
+ submitRequest(si);
+ }
+ }
+ }
+
+ private static boolean isSaslSuperUser(String id) {
+ if (id == null || id.isEmpty()) {
+ return false;
+ }
+
+ Properties properties = System.getProperties();
+ int prefixLen = SASL_SUPER_USER.length();
+
+ for (String k : properties.stringPropertyNames()) {
+ if (k.startsWith(SASL_SUPER_USER)
+ && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
+ String value = properties.getProperty(k);
+
+ if (value != null && value.equals(id)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private static boolean shouldAllowSaslFailedClientsConnect() {
+ return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
+ }
+
+ private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
+ LOG.debug("Responding to client SASL token.");
+ GetSASLRequest clientTokenRecord = new GetSASLRequest();
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
+ byte[] clientToken = clientTokenRecord.getToken();
+ LOG.debug("Size of client SASL token: {}", clientToken.length);
+ byte[] responseToken = null;
+ try {
+ ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
+ try {
+ // note that clientToken might be empty (clientToken.length == 0):
+ // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
+ // SASL negotiation process.
+ responseToken = saslServer.evaluateResponse(clientToken);
+ if (saslServer.isComplete()) {
+ String authorizationID = saslServer.getAuthorizationID();
+ LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
+ Long.toHexString(cnxn.getSessionId()), authorizationID);
+ cnxn.addAuthInfo(new Id("sasl", authorizationID));
+
+ if (isSaslSuperUser(authorizationID)) {
+ cnxn.addAuthInfo(new Id("super", ""));
+ LOG.info(
+ "Session 0x{}: Authenticated Id '{}' as super user",
+ Long.toHexString(cnxn.getSessionId()),
+ authorizationID);
+ }
+ }
+ } catch (SaslException e) {
+ LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
+ if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
+ LOG.warn("Maintaining client connection despite SASL authentication failure.");
+ } else {
+ int error;
+ if (authHelper.isSaslAuthRequired()) {
+ LOG.warn(
+ "Closing client connection due to server requires client SASL authenticaiton,"
+ + "but client SASL authentication has failed, or client is not configured with SASL "
+ + "authentication.");
+ error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
+ } else {
+ LOG.warn("Closing client connection due to SASL authentication failure.");
+ error = Code.AUTHFAILED.intValue();
+ }
+
+ ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
+ cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
+ cnxn.sendCloseSession();
+ cnxn.disableRecv();
+ return;
+ }
+ }
+ } catch (NullPointerException e) {
+ LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
+ }
+ if (responseToken != null) {
+ LOG.debug("Size of server SASL response: {}", responseToken.length);
+ }
+
+ ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
+ Record record = new SetSASLResponse(responseToken);
+ cnxn.sendResponse(replyHeader, record, "response");
+ }
+
+ // entry point for quorum/Learner.java
+ public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+ processTxnForSessionEvents(null, hdr, txn);
+ return processTxnInDB(hdr, txn, null);
+ }
+
+ // entry point for FinalRequestProcessor.java
+ public ProcessTxnResult processTxn(Request request) {
+ TxnHeader hdr = request.getHdr();
+ processTxnForSessionEvents(request, hdr, request.getTxn());
+
+ final boolean writeRequest = (hdr != null);
+ final boolean quorumRequest = request.isQuorum();
+
+ // return fast w/o synchronization when we get a read
+ if (!writeRequest && !quorumRequest) {
+ return new ProcessTxnResult();
+ }
+ synchronized (outstandingChanges) {
+ ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
+
+ // request.hdr is set for write requests, which are the only ones
+ // that add to outstandingChanges.
+ if (writeRequest) {
+ long zxid = hdr.getZxid();
+ while (!outstandingChanges.isEmpty()
+ && outstandingChanges.peek().zxid <= zxid) {
+ ChangeRecord cr = outstandingChanges.remove();
+ ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
+ if (cr.zxid < zxid) {
+ LOG.warn(
+ "Zxid outstanding 0x{} is less than current 0x{}",
+ Long.toHexString(cr.zxid),
+ Long.toHexString(zxid));
+ }
+ if (outstandingChangesForPath.get(cr.path) == cr) {
+ outstandingChangesForPath.remove(cr.path);
+ }
+ }
+ }
+
+ // do not add non quorum packets to the queue.
+ if (quorumRequest) {
+ getZKDatabase().addCommittedProposal(request);
+ }
+ return rc;
+ }
+ }
+
+ private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
+ int opCode = (request == null) ? hdr.getType() : request.type;
+ long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
+
+ if (opCode == OpCode.createSession) {
+ if (hdr != null && txn instanceof CreateSessionTxn) {
+ CreateSessionTxn cst = (CreateSessionTxn) txn;
+ sessionTracker.commitSession(sessionId, cst.getTimeOut());
+ } else if (request == null || !request.isLocalSession()) {
+ LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString());
+ }
+ } else if (opCode == OpCode.closeSession) {
+ sessionTracker.removeSession(sessionId);
+ }
+ }
+
+ private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
+ if (hdr == null) {
+ return new ProcessTxnResult();
+ } else {
+ return getZKDatabase().processTxn(hdr, txn, digest);
+ }
+ }
+
+ public Map<Long, Set<Long>> getSessionExpiryMap() {
+ return sessionTracker.getSessionExpiryMap();
+ }
+
+ /**
+ * This method is used to register the ZooKeeperServerShutdownHandler to get
+ * server's error or shutdown state change notifications.
+ * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
+ * every server state changes {@link #setState(State)}.
+ *
+ * @param zkShutdownHandler shutdown handler
+ */
+ void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
+ this.zkShutdownHandler = zkShutdownHandler;
+ }
+
+ public boolean isResponseCachingEnabled() {
+ return isResponseCachingEnabled;
+ }
+
+ public void setResponseCachingEnabled(boolean isEnabled) {
+ isResponseCachingEnabled = isEnabled;
+ }
+
+ public ResponseCache getReadResponseCache() {
+ return isResponseCachingEnabled ? readResponseCache : null;
+ }
+
+ public ResponseCache getGetChildrenResponseCache() {
+ return isResponseCachingEnabled ? getChildrenResponseCache : null;
+ }
+
+ protected void registerMetrics() {
+ MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
+
+ final ZKDatabase zkdb = this.getZKDatabase();
+ final ServerStats stats = this.serverStats();
+
+ rootContext.registerGauge("avg_latency", stats::getAvgLatency);
+
+ rootContext.registerGauge("max_latency", stats::getMaxLatency);
+ rootContext.registerGauge("min_latency", stats::getMinLatency);
+
+ rootContext.registerGauge("packets_received", stats::getPacketsReceived);
+ rootContext.registerGauge("packets_sent", stats::getPacketsSent);
+ rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);
+
+ rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
+ rootContext.registerGauge("uptime", stats::getUptime);
+
+ rootContext.registerGauge("znode_count", zkdb::getNodeCount);
+
+ rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
+ rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);
+
+ rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);
+
+ rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
+ rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);
+
+ OSMXBean osMbean = new OSMXBean();
+ rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
+ rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
+ rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);
+
+ rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
+ rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
+ rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);
+
+ rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
+ rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
+ rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
+ rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
+
+ rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
+ () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree()));
+ rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE,
+ () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree()));
+ rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE,
+ () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree()));
+ rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE,
+ () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree()));
+ }
+
+ protected void unregisterMetrics() {
+
+ MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
+
+ rootContext.unregisterGauge("avg_latency");
+
+ rootContext.unregisterGauge("max_latency");
+ rootContext.unregisterGauge("min_latency");
+
+ rootContext.unregisterGauge("packets_received");
+ rootContext.unregisterGauge("packets_sent");
+ rootContext.unregisterGauge("num_alive_connections");
+
+ rootContext.unregisterGauge("outstanding_requests");
+ rootContext.unregisterGauge("uptime");
+
+ rootContext.unregisterGauge("znode_count");
+
+ rootContext.unregisterGauge("watch_count");
+ rootContext.unregisterGauge("ephemerals_count");
+ rootContext.unregisterGauge("approximate_data_size");
+
+ rootContext.unregisterGauge("global_sessions");
+ rootContext.unregisterGauge("local_sessions");
+
+ rootContext.unregisterGauge("open_file_descriptor_count");
+ rootContext.unregisterGauge("max_file_descriptor_count");
+ rootContext.unregisterGauge("connection_drop_probability");
+
+ rootContext.unregisterGauge("last_client_response_size");
+ rootContext.unregisterGauge("max_client_response_size");
+ rootContext.unregisterGauge("min_client_response_size");
+
+ rootContext.unregisterGauge("auth_failed_count");
+ rootContext.unregisterGauge("non_mtls_remote_conn_count");
+ rootContext.unregisterGauge("non_mtls_local_conn_count");
+
+ rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE);
+ rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE);
+ rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE);
+ rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE);
+ }
+
+ /**
+ * Hook into admin server, useful to expose additional data
+ * that do not represent metrics.
+ *
+ * @param response a sink which collects the data.
+ */
+ public void dumpMonitorValues(BiConsumer<String, Object> response) {
+ ServerStats stats = serverStats();
+ response.accept("version", Version.getFullVersion());
+ response.accept("server_state", stats.getServerState());
+ }
+
+ /**
+ * Grant or deny authorization to an operation on a node as a function of:
+ * @param cnxn : the server connection
+ * @param acl : set of ACLs for the node
+ * @param perm : the permission that the client is requesting
+ * @param ids : the credentials supplied by the client
+ * @param path : the ZNode path
+ * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
+ */
+ public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
+ if (skipACL) {
+ return;
+ }
+
+ LOG.debug("Permission requested: {} ", perm);
+ LOG.debug("ACLs for node: {}", acl);
+ LOG.debug("Client credentials: {}", ids);
+
+ if (acl == null || acl.size() == 0) {
+ return;
+ }
+ for (Id authId : ids) {
+ if (authId.getScheme().equals("super")) {
+ return;
+ }
+ }
+ for (ACL a : acl) {
+ Id id = a.getId();
+ if ((a.getPerms() & perm) != 0) {
+ if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
+ return;
+ }
+ ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
+ if (ap != null) {
+ for (Id authId : ids) {
+ if (authId.getScheme().equals(id.getScheme())
+ && ap.matches(
+ new ServerAuthenticationProvider.ServerObjs(this, cnxn),
+ new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
+ return;
+ }
+ }
+ }
+ }
+ }
+ throw new KeeperException.NoAuthException();
+ }
+
+ /**
+ * check a path whether exceeded the quota.
+ *
+ * @param path
+ * the path of the node, used for the quota prefix check
+ * @param lastData
+ * the current node data, {@code null} for none
+ * @param data
+ * the data to be set, or {@code null} for none
+ * @param type
+ * currently, create and setData need to check quota
+ */
+ public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
+ if (!enforceQuota) {
+ return;
+ }
+ long dataBytes = (data == null) ? 0 : data.length;
+ ZKDatabase zkDatabase = getZKDatabase();
+ String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
+ if (StringUtils.isEmpty(lastPrefix)) {
+ return;
+ }
+
+ final String namespace = PathUtils.getTopNamespace(path);
+ switch (type) {
+ case OpCode.create:
+ checkQuota(lastPrefix, dataBytes, 1, namespace);
+ break;
+ case OpCode.setData:
+ checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
+ }
+ }
+
+ /**
+ * check a path whether exceeded the quota.
+ *
+ * @param lastPrefix
+ the path of the node which has a quota.
+ * @param bytesDiff
+ * the diff to be added to number of bytes
+ * @param countDiff
+ * the diff to be added to the count
+ * @param namespace
+ * the namespace for collecting quota exceeded errors
+ */
+ private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace)
+ throws KeeperException.QuotaExceededException {
+ LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
+
+ // now check the quota we set
+ String limitNode = Quotas.limitPath(lastPrefix);
+ DataNode node = getZKDatabase().getNode(limitNode);
+ StatsTrack limitStats;
+ if (node == null) {
+ // should not happen
+ LOG.error("Missing limit node for quota {}", limitNode);
+ return;
+ }
+ synchronized (node) {
+ limitStats = new StatsTrack(node.data);
+ }
+ //check the quota
+ boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
+ boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
+
+ if (!checkCountQuota && !checkByteQuota) {
+ return;
+ }
+
+ //check the statPath quota
+ String statNode = Quotas.statPath(lastPrefix);
+ node = getZKDatabase().getNode(statNode);
+
+ StatsTrack currentStats;
+ if (node == null) {
+ // should not happen
+ LOG.error("Missing node for stat {}", statNode);
+ return;
+ }
+ synchronized (node) {
+ currentStats = new StatsTrack(node.data);
+ }
+
+ //check the Count Quota
+ if (checkCountQuota) {
+ long newCount = currentStats.getCount() + countDiff;
+ boolean isCountHardLimit = limitStats.getCountHardLimit() > -1;
+ long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
+
+ if (newCount > countLimit) {
+ String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
+ RATE_LOGGER.rateLimitLog(msg);
+ if (isCountHardLimit) {
+ updateQuotaExceededMetrics(namespace);
+ throw new KeeperException.QuotaExceededException(lastPrefix);
+ }
+ }
+ }
+
+ //check the Byte Quota
+ if (checkByteQuota) {
+ long newBytes = currentStats.getBytes() + bytesDiff;
+ boolean isByteHardLimit = limitStats.getByteHardLimit() > -1;
+ long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
+ if (newBytes > byteLimit) {
+ String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
+ RATE_LOGGER.rateLimitLog(msg);
+ if (isByteHardLimit) {
+ updateQuotaExceededMetrics(namespace);
+ throw new KeeperException.QuotaExceededException(lastPrefix);
+ }
+ }
+ }
+ }
+
+ public static boolean isDigestEnabled() {
+ return digestEnabled;
+ }
+
+ public static void setDigestEnabled(boolean digestEnabled) {
+ LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
+ ZooKeeperServer.digestEnabled = digestEnabled;
+ }
+
+ /**
+ * Trim a path to get the immediate predecessor.
+ *
+ * @param path
+ * @return
+ * @throws KeeperException.BadArgumentsException
+ */
+ private String parentPath(String path) throws KeeperException.BadArgumentsException {
+ int lastSlash = path.lastIndexOf('/');
+ if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
+ throw new KeeperException.BadArgumentsException(path);
+ }
+ return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
+ }
+
+ private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
+ boolean mustCheckACL = false;
+ String path = null;
+ List<ACL> acl = null;
+
+ switch (request.type) {
+ case OpCode.create:
+ case OpCode.create2: {
+ CreateRequest req = new CreateRequest();
+ if (buffer2Record(request.request, req)) {
+ mustCheckACL = true;
+ acl = req.getAcl();
+ path = parentPath(req.getPath());
+ }
+ break;
+ }
+ case OpCode.delete: {
+ DeleteRequest req = new DeleteRequest();
+ if (buffer2Record(request.request, req)) {
+ path = parentPath(req.getPath());
+ }
+ break;
+ }
+ case OpCode.setData: {
+ SetDataRequest req = new SetDataRequest();
+ if (buffer2Record(request.request, req)) {
+ path = req.getPath();
+ }
+ break;
+ }
+ case OpCode.setACL: {
+ SetACLRequest req = new SetACLRequest();
+ if (buffer2Record(request.request, req)) {
+ mustCheckACL = true;
+ acl = req.getAcl();
+ path = req.getPath();
+ }
+ break;
+ }
+ }
+
+ if (mustCheckACL) {
+ /* we ignore the extrapolated ACL returned by fixupACL because
+ * we only care about it being well-formed (and if it isn't, an
+ * exception will be raised).
+ */
+ PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
+ }
+
+ return path;
+ }
+
+ private int effectiveACLPerms(Request request) {
+ switch (request.type) {
+ case OpCode.create:
+ case OpCode.create2:
+ return ZooDefs.Perms.CREATE;
+ case OpCode.delete:
+ return ZooDefs.Perms.DELETE;
+ case OpCode.setData:
+ return ZooDefs.Perms.WRITE;
+ case OpCode.setACL:
+ return ZooDefs.Perms.ADMIN;
+ default:
+ return ZooDefs.Perms.ALL;
+ }
+ }
+
+ /**
+ * Check Write Requests for Potential Access Restrictions
+ * <p>
+ * Before a request is being proposed to the quorum, lets check it
+ * against local ACLs. Non-write requests (read, session, etc.)
+ * are passed along. Invalid requests are sent a response.
+ * <p>
+ * While we are at it, if the request will set an ACL: make sure it's
+ * a valid one.
+ *
+ * @param request
+ * @return true if request is permitted, false if not.
+ */
+ public boolean authWriteRequest(Request request) {
+ int err;
+ String pathToCheck;
+
+ if (!enableEagerACLCheck) {
+ return true;
+ }
+
+ err = Code.OK.intValue();
+
+ try {
+ pathToCheck = effectiveACLPath(request);
+ if (pathToCheck != null) {
+ checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
+ }
+ } catch (KeeperException.NoAuthException e) {
+ LOG.debug("Request failed ACL check", e);
+ err = e.code().intValue();
+ } catch (KeeperException.InvalidACLException e) {
+ LOG.debug("Request has an invalid ACL check", e);
+ err = e.code().intValue();
+ } catch (KeeperException.NoNodeException e) {
+ LOG.debug("ACL check against non-existent node: {}", e.getMessage());
+ } catch (KeeperException.BadArgumentsException e) {
+ LOG.debug("ACL check against illegal node path: {}", e.getMessage());
+ } catch (Throwable t) {
+ LOG.error("Uncaught exception in authWriteRequest with: ", t);
+ throw t;
+ } finally {
+ if (err != Code.OK.intValue()) {
+ /* This request has a bad ACL, so we are dismissing it early. */
+ decInProcess();
+ ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
+ try {
+ request.cnxn.sendResponse(rh, null, null);
+ } catch (IOException e) {
+ LOG.error("IOException : {}", e);
+ }
+ }
+ }
+
+ return err == Code.OK.intValue();
+ }
+
+ private boolean buffer2Record(ByteBuffer request, Record record) {
+ boolean rv = false;
+ try {
+ ByteBufferInputStream.byteBuffer2Record(request, record);
+ request.rewind();
+ rv = true;
+ } catch (IOException ex) {
+ }
+
+ return rv;
+ }
+
+ public int getOutstandingHandshakeNum() {
+ if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
+ return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
+ } else {
+ return 0;
+ }
+ }
+
+ public boolean isReconfigEnabled() {
+ return this.reconfigEnabled;
+ }
+
+ public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
+ return zkShutdownHandler;
+ }
+
+ static void updateQuotaExceededMetrics(final String namespace) {
+ if (namespace == null) {
+ return;
+ }
+ ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1);
+ }
+}
+
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
new file mode 100644
index 00000000000..fd2ea277a40
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.metrics.MetricsContext;
+import org.apache.zookeeper.server.ContainerManager;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import javax.management.JMException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+/**
+ *
+ * Just like the standard ZooKeeperServer. We just replace the request
+ * processors: PrepRequestProcessor -&gt; ProposalRequestProcessor -&gt;
+ * CommitProcessor -&gt; Leader.ToBeAppliedRequestProcessor -&gt;
+ * FinalRequestProcessor
+ */
+public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
+
+ private ContainerManager containerManager; // guarded by sync
+
+ CommitProcessor commitProcessor;
+
+ PrepRequestProcessor prepRequestProcessor;
+
+ /**
+ * @throws IOException
+ */
+ public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
+ super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
+ }
+
+ public Leader getLeader() {
+ return self.leader;
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
+ commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
+ commitProcessor.start();
+ ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
+ proposalProcessor.initialize();
+ prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+ prepRequestProcessor.start();
+ firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
+
+ setupContainerManager();
+ }
+
+ private synchronized void setupContainerManager() {
+ containerManager = new ContainerManager(
+ getZKDatabase(),
+ prepRequestProcessor,
+ Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
+ Integer.getInteger("znode.container.maxPerMinute", 10000),
+ Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
+ );
+ }
+
+ @Override
+ public synchronized void startup() {
+ super.startup();
+ if (containerManager != null) {
+ containerManager.start();
+ }
+ }
+
+ @Override
+ protected void registerMetrics() {
+ super.registerMetrics();
+
+ MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
+ rootContext.registerGauge("learners", gaugeWithLeader(
+ (leader) -> leader.getLearners().size())
+ );
+ rootContext.registerGauge("synced_followers", gaugeWithLeader(
+ (leader) -> leader.getForwardingFollowers().size()
+ ));
+ rootContext.registerGauge("synced_non_voting_followers", gaugeWithLeader(
+ (leader) -> leader.getNonVotingFollowers().size()
+ ));
+ rootContext.registerGauge("synced_observers", self::getSynced_observers_metric);
+ rootContext.registerGauge("pending_syncs", gaugeWithLeader(
+ (leader) -> leader.getNumPendingSyncs()
+ ));
+ rootContext.registerGauge("leader_uptime", gaugeWithLeader(
+ (leader) -> leader.getUptime()
+ ));
+ rootContext.registerGauge("last_proposal_size", gaugeWithLeader(
+ (leader) -> leader.getProposalStats().getLastBufferSize()
+ ));
+ rootContext.registerGauge("max_proposal_size", gaugeWithLeader(
+ (leader) -> leader.getProposalStats().getMaxBufferSize()
+ ));
+ rootContext.registerGauge("min_proposal_size", gaugeWithLeader(
+ (leader) -> leader.getProposalStats().getMinBufferSize()
+ ));
+ }
+
+ private org.apache.zookeeper.metrics.Gauge gaugeWithLeader(Function<Leader, Number> supplier) {
+ return () -> {
+ final Leader leader = getLeader();
+ if (leader == null) {
+ return null;
+ }
+ return supplier.apply(leader);
+ };
+ }
+
+ @Override
+ protected void unregisterMetrics() {
+ super.unregisterMetrics();
+
+ MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
+ rootContext.unregisterGauge("learners");
+ rootContext.unregisterGauge("synced_followers");
+ rootContext.unregisterGauge("synced_non_voting_followers");
+ rootContext.unregisterGauge("synced_observers");
+ rootContext.unregisterGauge("pending_syncs");
+ rootContext.unregisterGauge("leader_uptime");
+
+ rootContext.unregisterGauge("last_proposal_size");
+ rootContext.unregisterGauge("max_proposal_size");
+ rootContext.unregisterGauge("min_proposal_size");
+ }
+
+ @Override
+ public synchronized void shutdown(boolean fullyShutDown) {
+ if (containerManager != null) {
+ containerManager.stop();
+ }
+ super.shutdown(fullyShutDown);
+ }
+
+ @Override
+ public int getGlobalOutstandingLimit() {
+ int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
+ int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor;
+ return globalOutstandingLimit;
+ }
+
+ @Override
+ public void createSessionTracker() {
+ sessionTracker = new LeaderSessionTracker(
+ this,
+ getZKDatabase().getSessionWithTimeOuts(),
+ tickTime,
+ self.getId(),
+ self.areLocalSessionsEnabled(),
+ getZooKeeperServerListener());
+ }
+
+ public boolean touch(long sess, int to) {
+ return sessionTracker.touchSession(sess, to);
+ }
+
+ public boolean checkIfValidGlobalSession(long sess, int to) {
+ if (self.areLocalSessionsEnabled() && !upgradeableSessionTracker.isGlobalSession(sess)) {
+ return false;
+ }
+ return sessionTracker.touchSession(sess, to);
+ }
+
+ /**
+ * Requests coming from the learner should go directly to
+ * PrepRequestProcessor
+ *
+ * @param request
+ */
+ public void submitLearnerRequest(Request request) {
+ /*
+ * Requests coming from the learner should have gone through
+ * submitRequest() on each server which already perform some request
+ * validation, so we don't need to do it again.
+ *
+ * Additionally, LearnerHandler should start submitting requests into
+ * the leader's pipeline only when the leader's server is started, so we
+ * can submit the request directly into PrepRequestProcessor.
+ *
+ * This is done so that requests from learners won't go through
+ * LeaderRequestProcessor which perform local session upgrade.
+ */
+ prepRequestProcessor.processRequest(request);
+ }
+
+ @Override
+ protected void registerJMX() {
+ // register with JMX
+ try {
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
+ MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxDataTreeBean = null;
+ }
+ }
+
+ public void registerJMX(LeaderBean leaderBean, LocalPeerBean localPeerBean) {
+ // register with JMX
+ if (self.jmxLeaderElectionBean != null) {
+ try {
+ MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+ self.jmxLeaderElectionBean = null;
+ }
+
+ try {
+ jmxServerBean = leaderBean;
+ MBeanRegistry.getInstance().register(leaderBean, localPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxServerBean = null;
+ }
+ }
+
+ boolean registerJMX(LearnerHandlerBean handlerBean) {
+ try {
+ MBeanRegistry.getInstance().register(handlerBean, jmxServerBean);
+ return true;
+ } catch (JMException e) {
+ LOG.warn("Could not register connection", e);
+ }
+ return false;
+ }
+
+ @Override
+ protected void unregisterJMX() {
+ // unregister from JMX
+ try {
+ if (jmxDataTreeBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxDataTreeBean = null;
+ }
+
+ protected void unregisterJMX(Leader leader) {
+ // unregister from JMX
+ try {
+ if (jmxServerBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxServerBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxServerBean = null;
+ }
+
+ @Override
+ public String getState() {
+ return "leader";
+ }
+
+ /**
+ * Returns the id of the associated QuorumPeer, which will do for a unique
+ * id of this server.
+ */
+ @Override
+ public long getServerId() {
+ return self.getId();
+ }
+
+ @Override
+ protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
+ super.revalidateSession(cnxn, sessionId, sessionTimeout);
+ try {
+ // setowner as the leader itself, unless updated
+ // via the follower handlers
+ setOwner(sessionId, ServerCnxn.me);
+ } catch (SessionExpiredException e) {
+ // this is ok, it just means that the session revalidation failed.
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
new file mode 100644
index 00000000000..8e80fae57dc
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -0,0 +1,920 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.net.ssl.SSLSocket;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.server.ExitCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.TxnLogEntry;
+import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ConfigUtils;
+import org.apache.zookeeper.server.util.MessageTracker;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnDigest;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.util.ServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the superclass of two of the three main actors in a ZK
+ * ensemble: Followers and Observers. Both Followers and Observers share
+ * a good deal of code which is moved into Peer to avoid duplication.
+ */
+public class Learner {
+
+ static class PacketInFlight {
+
+ TxnHeader hdr;
+ Record rec;
+ TxnDigest digest;
+
+ }
+
+ QuorumPeer self;
+ LearnerZooKeeperServer zk;
+
+ protected BufferedOutputStream bufferedOutput;
+
+ protected Socket sock;
+ protected MultipleAddresses leaderAddr;
+ protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
+
+ /**
+ * Socket getter
+ */
+ public Socket getSocket() {
+ return sock;
+ }
+
+ LearnerSender sender = null;
+ protected InputArchive leaderIs;
+ protected OutputArchive leaderOs;
+ /** the protocol version of the leader */
+ protected int leaderProtocolVersion = 0x01;
+
+ private static final int BUFFERED_MESSAGE_SIZE = 10;
+ protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
+
+ protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
+
+ /**
+ * Time to wait after connection attempt with the Leader or LearnerMaster before this
+ * Learner tries to connect again.
+ */
+ private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
+
+ private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
+
+ public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
+ private static boolean asyncSending =
+ Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
+ public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
+ public static final boolean closeSocketAsync = Boolean
+ .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
+
+ static {
+ LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
+ LOG.info("TCP NoDelay set to: {}", nodelay);
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+ LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
+ }
+
+ final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
+
+ public int getPendingRevalidationsCount() {
+ return pendingRevalidations.size();
+ }
+
+ // for testing
+ protected static void setAsyncSending(boolean newMode) {
+ asyncSending = newMode;
+ LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
+
+ }
+ protected static boolean getAsyncSending() {
+ return asyncSending;
+ }
+ /**
+ * validate a session for a client
+ *
+ * @param clientId
+ * the client to be revalidated
+ * @param timeout
+ * the timeout for which the session is valid
+ * @throws IOException
+ */
+ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
+ LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeLong(clientId);
+ dos.writeInt(timeout);
+ dos.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
+ pendingRevalidations.put(clientId, cnxn);
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "To validate session 0x" + Long.toHexString(clientId));
+ }
+ writePacket(qp, true);
+ }
+
+ /**
+ * write a packet to the leader.
+ *
+ * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
+ * When packets are sent synchronously, writing is done within a synchronization block.
+ * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
+ * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
+ * So we have only one thread writing to leaderOs at a time in either case.
+ *
+ * @param pp
+ * the proposal packet to be sent to the leader
+ * @throws IOException
+ */
+ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+ if (asyncSending) {
+ sender.queuePacket(pp);
+ } else {
+ writePacketNow(pp, flush);
+ }
+ }
+
+ void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
+ synchronized (leaderOs) {
+ if (pp != null) {
+ messageTracker.trackSent(pp.getType());
+ leaderOs.writeRecord(pp, "packet");
+ }
+ if (flush) {
+ bufferedOutput.flush();
+ }
+ }
+ }
+
+ /**
+ * Start thread that will forward any packet in the queue to the leader
+ */
+ protected void startSendingThread() {
+ sender = new LearnerSender(this);
+ sender.start();
+ }
+
+ /**
+ * read a packet from the leader
+ *
+ * @param pp
+ * the packet to be instantiated
+ * @throws IOException
+ */
+ void readPacket(QuorumPacket pp) throws IOException {
+ synchronized (leaderIs) {
+ leaderIs.readRecord(pp, "packet");
+ messageTracker.trackReceived(pp.getType());
+ }
+ if (LOG.isTraceEnabled()) {
+ final long traceMask =
+ (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
+ : ZooTrace.SERVER_PACKET_TRACE_MASK;
+
+ ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
+ }
+ }
+
+ /**
+ * send a request packet to the leader
+ *
+ * @param request
+ * the request from the client
+ * @throws IOException
+ */
+ void request(Request request) throws IOException {
+ if (request.isThrottled()) {
+ LOG.error("Throttled request sent to leader: {}. Exiting", request);
+ ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream oa = new DataOutputStream(baos);
+ oa.writeLong(request.sessionId);
+ oa.writeInt(request.cxid);
+ oa.writeInt(request.type);
+ if (request.request != null) {
+ request.request.rewind();
+ int len = request.request.remaining();
+ byte[] b = new byte[len];
+ request.request.get(b);
+ request.request.rewind();
+ oa.write(b);
+ }
+ oa.close();
+ QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
+ writePacket(qp, true);
+ }
+
+ /**
+ * Returns the address of the node we think is the leader.
+ */
+ protected QuorumServer findLeader() {
+ QuorumServer leaderServer = null;
+ // Find the leader by id
+ Vote current = self.getCurrentVote();
+ for (QuorumServer s : self.getView().values()) {
+ if (s.id == current.getId()) {
+ // Ensure we have the leader's correct IP address before
+ // attempting to connect.
+ s.recreateSocketAddresses();
+ leaderServer = s;
+ break;
+ }
+ }
+ if (leaderServer == null) {
+ LOG.warn("Couldn't find the leader with id = {}", current.getId());
+ }
+ return leaderServer;
+ }
+
+ /**
+ * Overridable helper method to return the System.nanoTime().
+ * This method behaves identical to System.nanoTime().
+ */
+ protected long nanoTime() {
+ return System.nanoTime();
+ }
+
+ /**
+ * Overridable helper method to simply call sock.connect(). This can be
+ * overriden in tests to fake connection success/failure for connectToLeader.
+ */
+ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
+ sock.connect(addr, timeout);
+ }
+
+ /**
+ * Establish a connection with the LearnerMaster found by findLearnerMaster.
+ * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
+ * Retries until either initLimit time has elapsed or 5 tries have happened.
+ * @param multiAddr - the address of the Peer to connect to.
+ * @throws IOException - if the socket connection fails on the 5th attempt
+ * if there is an authentication failure while connecting to leader
+ */
+ protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
+
+ this.leaderAddr = multiAddr;
+ Set<InetSocketAddress> addresses;
+ if (self.isMultiAddressReachabilityCheckEnabled()) {
+ // even if none of the addresses are reachable, we want to try to establish connection
+ // see ZOOKEEPER-3758
+ addresses = multiAddr.getAllReachableAddressesOrAll();
+ } else {
+ addresses = multiAddr.getAllAddresses();
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
+ CountDownLatch latch = new CountDownLatch(addresses.size());
+ AtomicReference<Socket> socket = new AtomicReference<>(null);
+ addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while trying to connect to Leader", e);
+ } finally {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.error("not all the LeaderConnector terminated properly");
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
+ }
+ }
+
+ if (socket.get() == null) {
+ throw new IOException("Failed connect to " + multiAddr);
+ } else {
+ sock = socket.get();
+ sockBeingClosed.set(false);
+ }
+
+ self.authLearner.authenticate(sock, hostname);
+
+ leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
+ bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
+ leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
+ if (asyncSending) {
+ startSendingThread();
+ }
+ }
+
+ class LeaderConnector implements Runnable {
+
+ private AtomicReference<Socket> socket;
+ private InetSocketAddress address;
+ private CountDownLatch latch;
+
+ LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) {
+ this.address = address;
+ this.socket = socket;
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.currentThread().setName("LeaderConnector-" + address);
+ Socket sock = connectToLeader();
+
+ if (sock != null && sock.isConnected()) {
+ if (socket.compareAndSet(null, sock)) {
+ LOG.info("Successfully connected to leader, using address: {}", address);
+ } else {
+ LOG.info("Connection to the leader is already established, close the redundant connection");
+ sock.close();
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed connect to {}", address, e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
+ Socket sock = createSocket();
+
+ // leader connection timeout defaults to tickTime * initLimit
+ int connectTimeout = self.tickTime * self.initLimit;
+
+ // but if connectToLearnerMasterLimit is specified, use that value to calculate
+ // timeout instead of using the initLimit value
+ if (self.connectToLearnerMasterLimit > 0) {
+ connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
+ }
+
+ int remainingTimeout;
+ long startNanoTime = nanoTime();
+
+ for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
+ try {
+ // recalculate the init limit time because retries sleep for 1000 milliseconds
+ remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
+ if (remainingTimeout <= 0) {
+ LOG.error("connectToLeader exceeded on retries.");
+ throw new IOException("connectToLeader exceeded on retries.");
+ }
+
+ sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
+ if (self.isSslQuorum()) {
+ ((SSLSocket) sock).startHandshake();
+ }
+ sock.setTcpNoDelay(nodelay);
+ break;
+ } catch (IOException e) {
+ remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
+
+ if (remainingTimeout <= leaderConnectDelayDuringRetryMs) {
+ LOG.error(
+ "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ throw e;
+ } else if (tries >= 4) {
+ LOG.error(
+ "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ throw e;
+ } else {
+ LOG.warn(
+ "Unexpected exception, tries={}, remaining init limit={}, connecting to {}",
+ tries,
+ remainingTimeout,
+ address,
+ e);
+ sock = createSocket();
+ }
+ }
+ Thread.sleep(leaderConnectDelayDuringRetryMs);
+ }
+
+ return sock;
+ }
+ }
+
+ /**
+ * Creating a simple or and SSL socket.
+ * This can be overridden in tests to fake already connected sockets for connectToLeader.
+ */
+ protected Socket createSocket() throws X509Exception, IOException {
+ Socket sock;
+ if (self.isSslQuorum()) {
+ sock = self.getX509Util().createSSLSocket();
+ } else {
+ sock = new Socket();
+ }
+ sock.setSoTimeout(self.tickTime * self.initLimit);
+ return sock;
+ }
+
+ /**
+ * Once connected to the leader or learner master, perform the handshake
+ * protocol to establish a following / observing connection.
+ * @param pktType
+ * @return the zxid the Leader sends for synchronization purposes.
+ * @throws IOException
+ */
+ protected long registerWithLeader(int pktType) throws IOException {
+ /*
+ * Send follower info, including last zxid and sid
+ */
+ long lastLoggedZxid = self.getLastLoggedZxid();
+ QuorumPacket qp = new QuorumPacket();
+ qp.setType(pktType);
+ qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
+
+ /*
+ * Add sid to payload
+ */
+ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
+ ByteArrayOutputStream bsid = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
+ boa.writeRecord(li, "LearnerInfo");
+ qp.setData(bsid.toByteArray());
+
+ writePacket(qp, true);
+ readPacket(qp);
+ final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
+ if (qp.getType() == Leader.LEADERINFO) {
+ // we are connected to a 1.0 server so accept the new epoch and read the next packet
+ leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
+ byte[] epochBytes = new byte[4];
+ final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+ if (newEpoch > self.getAcceptedEpoch()) {
+ wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
+ self.setAcceptedEpoch(newEpoch);
+ } else if (newEpoch == self.getAcceptedEpoch()) {
+ // since we have already acked an epoch equal to the leaders, we cannot ack
+ // again, but we still need to send our lastZxid to the leader so that we can
+ // sync with it if it does assume leadership of the epoch.
+ // the -1 indicates that this reply should not count as an ack for the new epoch
+ wrappedEpochBytes.putInt(-1);
+ } else {
+ throw new IOException("Leaders epoch, "
+ + newEpoch
+ + " is less than accepted epoch, "
+ + self.getAcceptedEpoch());
+ }
+ QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
+ writePacket(ackNewEpoch, true);
+ return ZxidUtils.makeZxid(newEpoch, 0);
+ } else {
+ if (newEpoch > self.getAcceptedEpoch()) {
+ self.setAcceptedEpoch(newEpoch);
+ }
+ if (qp.getType() != Leader.NEWLEADER) {
+ LOG.error("First packet should have been NEWLEADER");
+ throw new IOException("First packet should have been NEWLEADER");
+ }
+ return qp.getZxid();
+ }
+ }
+
+ /**
+ * Finally, synchronize our history with the Leader (if Follower)
+ * or the LearnerMaster (if Observer).
+ * @param newLeaderZxid
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void syncWithLeader(long newLeaderZxid) throws Exception {
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+ QuorumPacket qp = new QuorumPacket();
+ long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
+
+ QuorumVerifier newLeaderQV = null;
+
+ // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
+ // For SNAP and TRUNC the snapshot is needed to save that history
+ boolean snapshotNeeded = true;
+ boolean syncSnapshot = false;
+ readPacket(qp);
+ Deque<Long> packetsCommitted = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+ synchronized (zk) {
+ if (qp.getType() == Leader.DIFF) {
+ LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+ self.setSyncMode(QuorumPeer.SyncMode.DIFF);
+ if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
+ LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
+ snapshotNeeded = true;
+ syncSnapshot = true;
+ } else {
+ snapshotNeeded = false;
+ }
+ } else if (qp.getType() == Leader.SNAP) {
+ self.setSyncMode(QuorumPeer.SyncMode.SNAP);
+ LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
+ // The leader is going to dump the database
+ // db is clear as part of deserializeSnapshot()
+ zk.getZKDatabase().deserializeSnapshot(leaderIs);
+ // ZOOKEEPER-2819: overwrite config node content extracted
+ // from leader snapshot with local config, to avoid potential
+ // inconsistency of config node content during rolling restart.
+ if (!self.isReconfigEnabled()) {
+ LOG.debug("Reset config node content from local config after deserialization of snapshot.");
+ zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
+ }
+ String signature = leaderIs.readString("signature");
+ if (!signature.equals("BenWasHere")) {
+ LOG.error("Missing signature. Got {}", signature);
+ throw new IOException("Missing signature");
+ }
+ zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ // immediately persist the latest snapshot when there is txn log gap
+ syncSnapshot = true;
+ } else if (qp.getType() == Leader.TRUNC) {
+ //we need to truncate the log to the lastzxid of the leader
+ self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
+ LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
+ boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
+ if (!truncated) {
+ // not able to truncate the log
+ LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
+ ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
+ }
+ zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
+ } else {
+ LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
+ ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
+ }
+ zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
+ zk.createSessionTracker();
+
+ long lastQueued = 0;
+
+ // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
+ // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
+ // we need to make sure that we don't take the snapshot twice.
+ boolean isPreZAB1_0 = true;
+ //If we are not going to take the snapshot be sure the transactions are not applied in memory
+ // but written out to the transaction log
+ boolean writeToTxnLog = !snapshotNeeded;
+ TxnLogEntry logEntry;
+ // we are now going to start getting transactions to apply followed by an UPTODATE
+ outerLoop:
+ while (self.isRunning()) {
+ readPacket(qp);
+ switch (qp.getType()) {
+ case Leader.PROPOSAL:
+ PacketInFlight pif = new PacketInFlight();
+ logEntry = SerializeUtils.deserializeTxn(qp.getData());
+ pif.hdr = logEntry.getHeader();
+ pif.rec = logEntry.getTxn();
+ pif.digest = logEntry.getDigest();
+ if (pif.hdr.getZxid() != lastQueued + 1) {
+ LOG.warn(
+ "Got zxid 0x{} expected 0x{}",
+ Long.toHexString(pif.hdr.getZxid()),
+ Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = pif.hdr.getZxid();
+
+ if (pif.hdr.getType() == OpCode.reconfig) {
+ SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
+ QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
+ self.setLastSeenQuorumVerifier(qv, true);
+ }
+
+ packetsNotLogged.add(pif);
+ packetsNotCommitted.add(pif);
+ break;
+ case Leader.COMMIT:
+ case Leader.COMMITANDACTIVATE:
+ pif = packetsNotCommitted.peekFirst();
+ if (pif.hdr.getZxid() != qp.getZxid()) {
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(qp.getZxid()),
+ Long.toHexString(pif.hdr.getZxid()));
+ } else {
+ if (qp.getType() == Leader.COMMITANDACTIVATE) {
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(
+ qv,
+ ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
+ true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
+ if (!writeToTxnLog) {
+ zk.processTxn(pif.hdr, pif.rec);
+ packetsNotLogged.remove();
+ packetsNotCommitted.remove();
+ } else {
+ packetsNotCommitted.remove();
+ packetsCommitted.add(qp.getZxid());
+ }
+ }
+ break;
+ case Leader.INFORM:
+ case Leader.INFORMANDACTIVATE:
+ PacketInFlight packet = new PacketInFlight();
+
+ if (qp.getType() == Leader.INFORMANDACTIVATE) {
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
+ byte[] remainingdata = new byte[buffer.remaining()];
+ buffer.get(remainingdata);
+ logEntry = SerializeUtils.deserializeTxn(remainingdata);
+ packet.hdr = logEntry.getHeader();
+ packet.rec = logEntry.getTxn();
+ packet.digest = logEntry.getDigest();
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ } else {
+ logEntry = SerializeUtils.deserializeTxn(qp.getData());
+ packet.rec = logEntry.getTxn();
+ packet.hdr = logEntry.getHeader();
+ packet.digest = logEntry.getDigest();
+ // Log warning message if txn comes out-of-order
+ if (packet.hdr.getZxid() != lastQueued + 1) {
+ LOG.warn(
+ "Got zxid 0x{} expected 0x{}",
+ Long.toHexString(packet.hdr.getZxid()),
+ Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = packet.hdr.getZxid();
+ }
+ if (!writeToTxnLog) {
+ // Apply to db directly if we haven't taken the snapshot
+ zk.processTxn(packet.hdr, packet.rec);
+ } else {
+ packetsNotLogged.add(packet);
+ packetsCommitted.add(qp.getZxid());
+ }
+
+ break;
+ case Leader.UPTODATE:
+ LOG.info("Learner received UPTODATE message");
+ if (newLeaderQV != null) {
+ boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
+ if (isPreZAB1_0) {
+ zk.takeSnapshot(syncSnapshot);
+ self.setCurrentEpoch(newEpoch);
+ }
+ self.setZooKeeperServer(zk);
+ self.adminServer.setZooKeeperServer(zk);
+ break outerLoop;
+ case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
+ // means this is Zab 1.0
+ LOG.info("Learner received NEWLEADER message");
+ if (qp.getData() != null && qp.getData().length > 1) {
+ try {
+ QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
+ self.setLastSeenQuorumVerifier(qv, true);
+ newLeaderQV = qv;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ if (snapshotNeeded) {
+ zk.takeSnapshot(syncSnapshot);
+ }
+
+ self.setCurrentEpoch(newEpoch);
+ writeToTxnLog = true;
+ //Anything after this needs to go to the transaction log, not applied directly in memory
+ isPreZAB1_0 = false;
+
+ // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(true);
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ packetsNotLogged.clear();
+ fzk.syncProcessor.syncFlush();
+ }
+
+ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(false);
+ fzk.syncProcessor.syncFlush();
+ }
+ break;
+ }
+ }
+ }
+ ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
+ writePacket(ack, true);
+ zk.startServing();
+ /*
+ * Update the election vote here to ensure that all members of the
+ * ensemble report the same vote to new servers that start up and
+ * send leader election notifications to the ensemble.
+ *
+ * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
+ */
+ self.updateElectionVote(newEpoch);
+
+ // We need to log the stuff that came in between the snapshot and the uptodate
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ for (Long zxid : packetsCommitted) {
+ fzk.commit(zxid);
+ }
+ } else if (zk instanceof ObserverZooKeeperServer) {
+ // Similar to follower, we need to log requests between the snapshot
+ // and UPTODATE
+ ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ Long zxid = packetsCommitted.peekFirst();
+ if (p.hdr.getZxid() != zxid) {
+ // log warning message if there is no matching commit
+ // old leader send outstanding proposal to observer
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(zxid),
+ Long.toHexString(p.hdr.getZxid()));
+ continue;
+ }
+ packetsCommitted.remove();
+ Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
+ request.setTxn(p.rec);
+ request.setHdr(p.hdr);
+ request.setTxnDigest(p.digest);
+ ozk.commitRequest(request);
+ }
+ } else {
+ // New server type need to handle in-flight packets
+ throw new UnsupportedOperationException("Unknown server type");
+ }
+ }
+
+ protected void revalidate(QuorumPacket qp) throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
+ DataInputStream dis = new DataInputStream(bis);
+ long sessionId = dis.readLong();
+ boolean valid = dis.readBoolean();
+ ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
+ if (cnxn == null) {
+ LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId));
+ } else {
+ zk.finishSessionInit(cnxn, valid);
+ }
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(
+ LOG,
+ ZooTrace.SESSION_TRACE_MASK,
+ "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
+ }
+ }
+
+ protected void ping(QuorumPacket qp) throws IOException {
+ // Send back the ping with our session data
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ Map<Long, Integer> touchTable = zk.getTouchSnapshot();
+ for (Entry<Long, Integer> entry : touchTable.entrySet()) {
+ dos.writeLong(entry.getKey());
+ dos.writeInt(entry.getValue());
+ }
+
+ QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
+ writePacket(pingReply, true);
+ }
+
+ /**
+ * Shutdown the Peer
+ */
+ public void shutdown() {
+ self.setZooKeeperServer(null);
+ self.closeAllConnections();
+ self.adminServer.setZooKeeperServer(null);
+
+ if (sender != null) {
+ sender.shutdown();
+ }
+
+ closeSocket();
+ // shutdown previous zookeeper
+ if (zk != null) {
+ // If we haven't finished SNAP sync, force fully shutdown
+ // to avoid potential inconsistency
+ zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
+ }
+ }
+
+ boolean isRunning() {
+ return self.isRunning() && zk.isRunning();
+ }
+
+ void closeSocket() {
+ if (sock != null) {
+ if (sockBeingClosed.compareAndSet(false, true)) {
+ if (closeSocketAsync) {
+ final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
+ closingThread.setDaemon(true);
+ closingThread.start();
+ } else {
+ closeSockSync();
+ }
+ }
+ }
+ }
+
+ void closeSockSync() {
+ try {
+ long startTime = Time.currentElapsedTime();
+ if (sock != null) {
+ sock.close();
+ sock = null;
+ }
+ ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
+ } catch (IOException e) {
+ LOG.warn("Ignoring error closing connection to leader", e);
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
new file mode 100644
index 00000000000..990f75bee9e
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Parent class for all ZooKeeperServers for Learners
+ */
+public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
+
+ /*
+ * Request processors
+ */
+ protected CommitProcessor commitProcessor;
+ protected SyncRequestProcessor syncProcessor;
+
+ public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) throws IOException {
+ super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self);
+ }
+
+ /**
+ * Abstract method to return the learner associated with this server.
+ * Since the Learner may change under our feet (when QuorumPeer reassigns
+ * it) we can't simply take a reference here. Instead, we need the
+ * subclasses to implement this.
+ */
+ public abstract Learner getLearner();
+
+ /**
+ * Returns the current state of the session tracker. This is only currently
+ * used by a Learner to build a ping response packet.
+ *
+ */
+ protected Map<Long, Integer> getTouchSnapshot() {
+ if (sessionTracker != null) {
+ return ((LearnerSessionTracker) sessionTracker).snapshot();
+ }
+ Map<Long, Integer> map = Collections.emptyMap();
+ return map;
+ }
+
+ /**
+ * Returns the id of the associated QuorumPeer, which will do for a unique
+ * id of this server.
+ */
+ @Override
+ public long getServerId() {
+ return self.getId();
+ }
+
+ @Override
+ public void createSessionTracker() {
+ sessionTracker = new LearnerSessionTracker(
+ this,
+ getZKDatabase().getSessionWithTimeOuts(),
+ this.tickTime,
+ self.getId(),
+ self.areLocalSessionsEnabled(),
+ getZooKeeperServerListener());
+ }
+
+ @Override
+ protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
+ if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+ super.revalidateSession(cnxn, sessionId, sessionTimeout);
+ } else {
+ getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+ }
+ }
+
+ @Override
+ protected void registerJMX() {
+ // register with JMX
+ try {
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
+ MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxDataTreeBean = null;
+ }
+ }
+
+ public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
+ // register with JMX
+ if (self.jmxLeaderElectionBean != null) {
+ try {
+ MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+ self.jmxLeaderElectionBean = null;
+ }
+
+ try {
+ jmxServerBean = serverBean;
+ MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxServerBean = null;
+ }
+ }
+
+ @Override
+ protected void unregisterJMX() {
+ // unregister from JMX
+ try {
+ if (jmxDataTreeBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxDataTreeBean = null;
+ }
+
+ protected void unregisterJMX(Learner peer) {
+ // unregister from JMX
+ try {
+ if (jmxServerBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxServerBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxServerBean = null;
+ }
+
+ @Override
+ public synchronized void shutdown(boolean fullyShutDown) {
+ if (!canShutdown()) {
+ LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
+ }
+ else {
+ LOG.info("Shutting down");
+ try {
+ if (syncProcessor != null) {
+ // Shutting down the syncProcessor here, first, ensures queued transactions here are written to
+ // permanent storage, which ensures that crash recovery data is consistent with what is used for a
+ // leader election immediately following shutdown, because of the old leader going down; and also
+ // that any state on its way to being written is also loaded in the potential call to
+ // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader
+ // that contains entries we have already written to our transaction log.
+ syncProcessor.shutdown();
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
+ }
+ }
+ try {
+ super.shutdown(fullyShutDown);
+ } catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception during shutdown", e);
+ }
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
new file mode 100644
index 00000000000..b71720aec89
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
+
+/**
+ * A ZooKeeperServer for the Observer node type. Not much is different, but
+ * we anticipate specializing the request processors in the future.
+ *
+ */
+public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ObserverZooKeeperServer.class);
+
+ /**
+ * Enable since request processor for writing txnlog to disk and
+ * take periodic snapshot. Default is ON.
+ */
+
+ private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
+
+ /*
+ * Pending sync requests
+ */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
+
+ ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
+ super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
+ LOG.info("syncEnabled ={}", syncRequestProcessorEnabled);
+ }
+
+ public Observer getObserver() {
+ return self.observer;
+ }
+
+ @Override
+ public Learner getLearner() {
+ return self.observer;
+ }
+
+ /**
+ * Unlike a Follower, which sees a full request only during the PROPOSAL
+ * phase, Observers get all the data required with the INFORM packet.
+ * This method commits a request that has been unpacked by from an INFORM
+ * received from the Leader.
+ *
+ * @param request
+ */
+ public void commitRequest(Request request) {
+ if (syncProcessor != null) {
+ // Write to txnlog and take periodic snapshot
+ syncProcessor.processRequest(request);
+ }
+ commitProcessor.commit(request);
+ }
+
+ /**
+ * Set up the request processors for an Observer:
+ * firstProcesor-&gt;commitProcessor-&gt;finalProcessor
+ */
+ @Override
+ protected void setupRequestProcessors() {
+ // We might consider changing the processor behaviour of
+ // Observers to, for example, remove the disk sync requirements.
+ // Currently, they behave almost exactly the same as followers.
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
+ commitProcessor.start();
+ firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
+ ((ObserverRequestProcessor) firstProcessor).start();
+
+ /*
+ * Observer should write to disk, so that the it won't request
+ * too old txn from the leader which may lead to getting an entire
+ * snapshot.
+ *
+ * However, this may degrade performance as it has to write to disk
+ * and do periodic snapshot which may double the memory requirements
+ */
+ if (syncRequestProcessorEnabled) {
+ syncProcessor = new SyncRequestProcessor(this, null);
+ syncProcessor.start();
+ }
+ else {
+ syncProcessor = null;
+ }
+ }
+
+ /*
+ * Process a sync request
+ */
+ public synchronized void sync() {
+ if (pendingSyncs.size() == 0) {
+ LOG.warn("Not expecting a sync.");
+ return;
+ }
+
+ Request r = pendingSyncs.remove();
+ commitProcessor.commit(r);
+ }
+
+ @Override
+ public String getState() {
+ return "observer";
+ }
+
+ @Override
+ public void dumpMonitorValues(BiConsumer<String, Object> response) {
+ super.dumpMonitorValues(response);
+ response.accept("observer_master_id", getObserver().getLearnerMasterId());
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
new file mode 100644
index 00000000000..b1403ecd59b
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A ZooKeeperServer which comes into play when peer is partitioned from the
+ * majority. Handles read-only clients, but drops connections from not-read-only
+ * ones.
+ * <p>
+ * The very first processor in the chain of request processors is a
+ * ReadOnlyRequestProcessor which drops state-changing requests.
+ */
+public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
+
+ protected final QuorumPeer self;
+ private volatile boolean shutdown = false;
+
+ ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
+ super(
+ logFactory,
+ self.tickTime,
+ self.minSessionTimeout,
+ self.maxSessionTimeout,
+ self.clientPortListenBacklog,
+ zkDb,
+ self.getInitialConfig(),
+ self.isReconfigEnabled());
+ this.self = self;
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
+ ((PrepRequestProcessor) prepProcessor).start();
+ firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
+ ((ReadOnlyRequestProcessor) firstProcessor).start();
+ }
+
+ @Override
+ public synchronized void startup() {
+ // check to avoid startup follows shutdown
+ if (shutdown) {
+ LOG.warn("Not starting Read-only server as startup follows shutdown!");
+ return;
+ }
+ registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean);
+ super.startup();
+ self.setZooKeeperServer(this);
+ self.adminServer.setZooKeeperServer(this);
+ LOG.info("Read-only server started");
+ }
+
+ @Override
+ public void createSessionTracker() {
+ sessionTracker = new LearnerSessionTracker(
+ this, getZKDatabase().getSessionWithTimeOuts(),
+ this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
+ getZooKeeperServerListener());
+ }
+
+ @Override
+ protected void startSessionTracker() {
+ ((LearnerSessionTracker) sessionTracker).start();
+ }
+
+ @Override
+ protected void setLocalSessionFlag(Request si) {
+ switch (si.type) {
+ case OpCode.createSession:
+ if (self.areLocalSessionsEnabled()) {
+ si.setLocalSession(true);
+ }
+ break;
+ case OpCode.closeSession:
+ if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) {
+ si.setLocalSession(true);
+ } else {
+ LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode",
+ Long.toHexString(si.sessionId));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
+ if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
+ String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
+ LOG.info(msg);
+ throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
+ }
+ }
+
+ @Override
+ protected void registerJMX() {
+ // register with JMX
+ try {
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
+ MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxDataTreeBean = null;
+ }
+ }
+
+ public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
+ // register with JMX
+ try {
+ jmxServerBean = serverBean;
+ MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxServerBean = null;
+ }
+ }
+
+ @Override
+ protected void unregisterJMX() {
+ // unregister from JMX
+ try {
+ if (jmxDataTreeBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxDataTreeBean = null;
+ }
+
+ protected void unregisterJMX(ZooKeeperServer zks) {
+ // unregister from JMX
+ try {
+ if (jmxServerBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxServerBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxServerBean = null;
+ }
+
+ @Override
+ public String getState() {
+ return "read-only";
+ }
+
+ /**
+ * Returns the id of the associated QuorumPeer, which will do for a unique
+ * id of this server.
+ */
+ @Override
+ public long getServerId() {
+ return self.getId();
+ }
+
+ @Override
+ public synchronized void shutdown(boolean fullyShutDown) {
+ if (!canShutdown()) {
+ super.shutdown(fullyShutDown);
+ LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
+ }
+ else {
+ shutdown = true;
+ unregisterJMX(this);
+
+ // set peer's server to null
+ self.setZooKeeperServer(null);
+ // clear all the connections
+ self.closeAllConnections();
+
+ self.adminServer.setZooKeeperServer(null);
+ }
+ // shutdown the server itself
+ super.shutdown(fullyShutDown);
+ }
+
+ @Override
+ public void dumpConf(PrintWriter pwriter) {
+ super.dumpConf(pwriter);
+
+ pwriter.print("initLimit=");
+ pwriter.println(self.getInitLimit());
+ pwriter.print("syncLimit=");
+ pwriter.println(self.getSyncLimit());
+ pwriter.print("electionAlg=");
+ pwriter.println(self.getElectionType());
+ pwriter.print("electionPort=");
+ pwriter.println(self.getElectionAddress().getAllPorts()
+ .stream().map(Objects::toString).collect(Collectors.joining("|")));
+ pwriter.print("quorumPort=");
+ pwriter.println(self.getQuorumAddress().getAllPorts()
+ .stream().map(Objects::toString).collect(Collectors.joining("|")));
+ pwriter.print("peerType=");
+ pwriter.println(self.getLearnerType().ordinal());
+ }
+
+ @Override
+ protected void setState(State state) {
+ this.state = state;
+ }
+
+}
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
new file mode 100644
index 00000000000..3b7a9dfc331
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.Socket;
+
+public class SendAckRequestProcessor implements RequestProcessor, Flushable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
+
+ Learner learner;
+
+ SendAckRequestProcessor(Learner peer) {
+ this.learner = peer;
+ }
+
+ public void processRequest(Request si) {
+ if (si.type != OpCode.sync) {
+ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
+ try {
+ si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
+
+ learner.writePacket(qp, false);
+ } catch (IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!learner.sock.isClosed()) {
+ learner.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
+ }
+ }
+
+ public void flush() throws IOException {
+ try {
+ learner.writePacket(null, true);
+ } catch (IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ Socket socket = learner.sock;
+ if (socket != null && !socket.isClosed()) {
+ learner.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
+ }
+
+ public void shutdown() {
+ // Nothing needed
+ }
+
+}