aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-06 15:47:57 +0200
committerjonmv <venstad@gmail.com>2024-05-06 15:47:57 +0200
commit064f4c925303ca73eb6a9a1e9b2f7f5c01ace1a8 (patch)
tree65ba251def425cad3a9d1340774d5d8f25bf7fe9 /zookeeper-server
parent76554d31b3b865ed6095aee8fe67168ea0571368 (diff)
Remove ZK 3.9.1
Diffstat (limited to 'zookeeper-server')
-rw-r--r--zookeeper-server/pom.xml1
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/CMakeLists.txt2
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/pom.xml99
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java45
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java49
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java48
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java60
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java96
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java56
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/ClientX509Util.java230
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/NetUtils.java94
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java353
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java37
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java2412
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java309
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java928
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java180
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java136
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java2711
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java236
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java83
-rw-r--r--zookeeper-server/zookeeper-server/CMakeLists.txt4
-rw-r--r--zookeeper-server/zookeeper-server/pom.xml4
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java1
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java16
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java3
26 files changed, 16 insertions, 8177 deletions
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 4b7f4be7a7f..e0838a9eefa 100644
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -14,7 +14,6 @@
<modules>
<module>zookeeper-server-common</module>
<module>zookeeper-server</module>
- <module>zookeeper-server-3.9.2</module>
</modules>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server-3.9.2/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.9.2/CMakeLists.txt
deleted file mode 100644
index de5780610d9..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/CMakeLists.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-install_jar(zookeeper-server-3.9.2-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server-3.9.2/pom.xml b/zookeeper-server/zookeeper-server-3.9.2/pom.xml
deleted file mode 100644
index 791c026234a..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<?xml version="1.0"?>
-<!-- Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>zookeeper-server-parent</artifactId>
- <version>8-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <artifactId>zookeeper-server-3.9.2</artifactId>
- <packaging>container-plugin</packaging>
- <version>8-SNAPSHOT</version>
- <properties>
- <zookeeper.version>3.9.2</zookeeper.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>zookeeper-server-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>zookeeper-client-common</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <!-- Don't use ZK version from zookeeper-client-common -->
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
- <exclusions>
- <!--
- Container provides wiring for all common log libraries
- Duplicate embedding results in various warnings being printed to stderr
- -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- snappy-java and metrics-core are included here
- to be able to work with ZooKeeper 3.7.0 due to
- class loading issues -->
- <dependency>
- <groupId>io.dropwizard.metrics</groupId>
- <artifactId>metrics-core</artifactId>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <scope>compile</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <compilerArgs>
- <!-- Turn off classfile warnings where spotbugs is pulled in transitively. -->
- <arg>-Xlint:all</arg>
- <arg>-Xlint:-classfile</arg>
- <arg>-Werror</arg>
- </compilerArgs>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-install-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>bundle-plugin</artifactId>
- <extensions>true</extensions>
- <configuration>
- <importPackage>com.sun.management</importPackage>
- <bundleSymbolicName>zookeeper-server</bundleSymbolicName>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
deleted file mode 100644
index a7cd14c415f..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.component.AbstractComponent;
-import com.yahoo.component.annotation.Inject;
-import com.yahoo.vespa.zookeeper.server.VespaZooKeeperServer;
-
-import java.nio.file.Path;
-
-/**
- *
- * Server used for starting config server, needed to be able to have different behavior for hosted and
- * self-hosted Vespa (controlled by zookeeperServerConfig.dynamicReconfiguration).
- *
- * @author Harald Musum
- */
-public class ConfigServerZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
-
- private final VespaZooKeeperServer zooKeeperServer;
-
- @Inject
- public ConfigServerZooKeeperServer(ZookeeperServerConfig zookeeperServerConfig) {
- this.zooKeeperServer = zookeeperServerConfig.dynamicReconfiguration()
- ? new ReconfigurableVespaZooKeeperServer(new Reconfigurer(new VespaZooKeeperAdminImpl()), zookeeperServerConfig)
- : new VespaZooKeeperServerImpl(zookeeperServerConfig);
- }
-
- @Override
- public void deconstruct() { zooKeeperServer.shutdown(); }
-
- @Override
- public void shutdown() {
- zooKeeperServer.shutdown();
- }
-
- @Override
- public void start(Path configFilePath) {
- zooKeeperServer.start(configFilePath);
- }
-
- @Override
- public boolean reconfigurable() { return zooKeeperServer.reconfigurable(); }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
deleted file mode 100644
index d869cbb6938..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import ai.vespa.validation.Validation;
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.component.AbstractComponent;
-import com.yahoo.component.annotation.Inject;
-import com.yahoo.vespa.zookeeper.server.VespaZooKeeperServer;
-
-import java.nio.file.Path;
-import java.time.Duration;
-
-/**
- * Starts or reconfigures zookeeper cluster.
- * The QuorumPeer conditionally created here is owned by the Reconfigurer;
- * when it already has a peer, that peer is used here in case start or shutdown is required.
- * Guarantees that server is up by writing a node to ZooKeeper successfully before
- * returning from constructor.
- *
- * @author hmusum
- */
-public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer {
-
- private QuorumPeer peer;
-
- @Inject
- public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) {
- Validation.require(zookeeperServerConfig.dynamicReconfiguration(),
- zookeeperServerConfig.dynamicReconfiguration(),
- "dynamicReconfiguration must be true");
- peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer());
- }
-
- @Override
- public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
- }
-
- @Override
- public void start(Path configFilePath) {
- peer.start(configFilePath);
- }
-
- @Override
- public boolean reconfigurable() {
- return true;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
deleted file mode 100644
index 90554910293..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import com.yahoo.security.X509SslContext;
-import com.yahoo.security.tls.TlsContext;
-import com.yahoo.security.tls.TransportSecurityUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.auth.AuthenticationProvider;
-import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.X509KeyManager;
-import javax.net.ssl.X509TrustManager;
-import java.security.cert.X509Certificate;
-import java.util.logging.Logger;
-
-/**
- * A {@link AuthenticationProvider} to be used in combination with Vespa mTLS.
- *
- * @author bjorncs
- */
-public class VespaMtlsAuthenticationProvider extends X509AuthenticationProvider {
-
- private static final Logger log = Logger.getLogger(VespaMtlsAuthenticationProvider.class.getName());
-
- public VespaMtlsAuthenticationProvider() {
- super(null, null);
- }
-
- @Override
- public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
- // Vespa's mTLS peer authorization rules are performed by the underlying trust manager implementation.
- // The client is authorized once the SSL handshake has completed.
- X509Certificate[] certificateChain = (X509Certificate[]) cnxn.getClientCertificateChain();
- if (certificateChain == null || certificateChain.length == 0) {
- log.warning("Client not authenticated - should not be possible with clientAuth=NEED");
- return KeeperException.Code.AUTHFAILED;
- }
- X509Certificate certificate = certificateChain[0];
- cnxn.addAuthInfo(new Id(getScheme(), certificate.getSubjectX500Principal().getName()));
- return KeeperException.Code.OK;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
deleted file mode 100644
index dd5ac4e252b..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import com.yahoo.protect.Process;
-import org.apache.zookeeper.server.admin.AdminServer;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Starts or stops a ZooKeeper server. Extends QuorumPeerMain to be able to call initializeAndRun() and wraps
- * exceptions so that it can be used by code that does not depend on ZooKeeper.
- *
- * @author hmusum
- */
-class VespaQuorumPeer extends QuorumPeerMain implements QuorumPeer {
-
- private static final Logger log = java.util.logging.Logger.getLogger(VespaQuorumPeer.class.getName());
-
- @Override
- public void start(Path path) {
- initializeAndRun(new String[]{ path.toFile().getAbsolutePath()});
- }
-
- @Override
- public void shutdown(Duration timeout) {
- if (quorumPeer != null) {
- log.log(Level.FINE, "Shutting down ZooKeeper server");
- try {
- quorumPeer.shutdown();
- quorumPeer.join(timeout.toMillis()); // Wait for shutdown to complete
- if (quorumPeer.isAlive())
- throw new IllegalStateException("Peer still alive after " + timeout);
- } catch (RuntimeException | InterruptedException e) {
- // If shutdown fails, we have no other option than forcing the JVM to stop and letting it be restarted.
- //
- // When a VespaZooKeeperServer component receives a new config, the container will try to start a new
- // server with the new config, this will fail until the old server is deconstructed. If the old server
- // fails to deconstruct/shutdown, the new one will never start and if that happens forcing a restart is
- // the better option.
- Process.logAndDie("Failed to shut down ZooKeeper server properly, forcing shutdown", e);
- }
- }
- }
-
- @Override
- protected void initializeAndRun(String[] args) {
- try {
- super.initializeAndRun(args);
- } catch (QuorumPeerConfig.ConfigException | IOException | AdminServer.AdminServerException e) {
- throw new RuntimeException("Exception when initializing or running ZooKeeper server", e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
deleted file mode 100644
index c74a020bcf4..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.net.HostName;
-import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.admin.ZooKeeperAdmin;
-import org.apache.zookeeper.data.ACL;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static com.yahoo.yolean.Exceptions.uncheck;
-
-/**
- * @author hmusum
- */
-@SuppressWarnings("unused") // Created by injection
-public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
-
- private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
-
-
- @SuppressWarnings("try")
- @Override
- public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) {
- long fromConfig = -1;
- // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0).
- log.log(Level.INFO, "Applying ZooKeeper config: " + servers);
- byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null);
- log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
-
- // Verify by issuing a write operation; this is only accepted once new quorum is obtained.
- List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL);
- zooKeeperAdmin.delete(node, -1);
-
- log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8));
- }
- catch ( KeeperException.ReconfigInProgress
- | KeeperException.ConnectionLossException
- | KeeperException.NewConfigNoQuorum e) {
- throw new ReconfigException(e);
- }
- catch (KeeperException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private ZooKeeperAdmin createAdmin(String connectionSpec) {
- return uncheck(() -> new ZooKeeperAdmin(connectionSpec, (int) sessionTimeout().toMillis(),
- (event) -> log.log(Level.FINE, event.toString()), new ZkClientConfigBuilder().toConfig()));
- }
-
- /** Creates a node in zookeeper, with hostname as part of node name, this ensures that server is up and working before returning */
- @SuppressWarnings("try")
- void createDummyNode(ZookeeperServerConfig zookeeperServerConfig) {
- int sleepTime = 2_000;
- try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(localConnectionSpec(zookeeperServerConfig))) {
- Instant end = Instant.now().plus(Duration.ofMinutes(5));
- Exception exception = null;
- do {
- try {
- zooKeeperAdmin.create("/dummy-node-" + HostName.getLocalhost(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- return;
- } catch (KeeperException e) {
- if (e instanceof KeeperException.NodeExistsException) {
- try {
- zooKeeperAdmin.setData("/dummy-node-" + HostName.getLocalhost(), new byte[0], -1);
- return;
- } catch (KeeperException ex) {
- log.log(Level.FINE, e.getMessage());
- Thread.sleep(sleepTime);
- continue;
- }
- }
- log.log(Level.FINE, e.getMessage());
- exception = e;
- Thread.sleep(sleepTime);
- }
- } while (Instant.now().isBefore(end));
- throw new RuntimeException("Unable to create dummy node: ", exception);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
-
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
deleted file mode 100644
index 4f93eb0efa5..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.zookeeper;
-
-import ai.vespa.validation.Validation;
-import com.yahoo.cloud.config.ZookeeperServerConfig;
-import com.yahoo.component.AbstractComponent;
-import com.yahoo.component.annotation.Inject;
-import com.yahoo.vespa.zookeeper.server.VespaZooKeeperServer;
-
-import java.nio.file.Path;
-import java.time.Duration;
-
-/**
- * ZooKeeper server. Guarantees that the server is up by writing a node to ZooKeeper successfully before
- * returning from constructor.
- *
- * @author Ulf Lilleengen
- * @author Harald Musum
- */
-public class VespaZooKeeperServerImpl extends AbstractComponent implements VespaZooKeeperServer {
-
- private final VespaQuorumPeer peer;
- private final ZooKeeperRunner runner;
-
- @Inject
- public VespaZooKeeperServerImpl(ZookeeperServerConfig zookeeperServerConfig) {
- Validation.require(! zookeeperServerConfig.dynamicReconfiguration(),
- ! zookeeperServerConfig.dynamicReconfiguration(),
- "dynamicReconfiguration must be false");
- this.peer = new VespaQuorumPeer();
- this.runner = new ZooKeeperRunner(zookeeperServerConfig, this);
- new VespaZooKeeperAdminImpl().createDummyNode(zookeeperServerConfig);
- }
-
- @Override
- public void deconstruct() {
- runner.shutdown();
- super.deconstruct();
- }
-
- @Override
- public void shutdown() {
- peer.shutdown(Duration.ofMinutes(1));
- }
-
- @Override
- public void start(Path configFilePath) {
- peer.start(configFilePath);
- }
-
- @Override
- public boolean reconfigurable() {
- return false;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/ClientX509Util.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
deleted file mode 100644
index f6dfb0fa4d9..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.common;
-
-import com.yahoo.vespa.zookeeper.tls.VespaZookeeperTlsContextUtils;
-import io.netty.handler.ssl.DelegatingSslContext;
-import io.netty.handler.ssl.SslContext;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * X509 utilities specific for client-server communication framework.
- * <p>
- * <em>Modified to use Vespa's TLS context, whenever it is available, instead of the file-based key and trust stores of ZK 3.9.
- * Based on https://github.com/apache/zookeeper/blob/branch-3.9/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java</em>
- *
- * @author jonmv
- */
-public class ClientX509Util extends X509Util {
-
- private static final Logger LOG = LoggerFactory.getLogger(ClientX509Util.class);
-
- private final String sslAuthProviderProperty = getConfigPrefix() + "authProvider";
- private final String sslProviderProperty = getConfigPrefix() + "sslProvider";
-
- @Override
- protected String getConfigPrefix() {
- return "zookeeper.ssl.";
- }
-
- @Override
- protected boolean shouldVerifyClientHostname() {
- return false;
- }
-
- public String getSslAuthProviderProperty() {
- return sslAuthProviderProperty;
- }
-
- public String getSslProviderProperty() {
- return sslProviderProperty;
- }
-
- public SslContext createNettySslContextForClient(ZKConfig config)
- throws X509Exception.KeyManagerException, X509Exception.TrustManagerException, SSLException {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
- KeyManager km;
- TrustManager tm;
- if (VespaZookeeperTlsContextUtils.tlsContext().isPresent()) {
- km = VespaZookeeperTlsContextUtils.tlsContext().get().sslContext().keyManager();
- tm = VespaZookeeperTlsContextUtils.tlsContext().get().sslContext().trustManager();
- }
- else {
- String keyStoreLocation = config.getProperty(getSslKeystoreLocationProperty(), "");
- String keyStorePassword = getPasswordFromConfigPropertyOrFile(config, getSslKeystorePasswdProperty(),
- getSslKeystorePasswdPathProperty());
- String keyStoreType = config.getProperty(getSslKeystoreTypeProperty());
-
- if (keyStoreLocation.isEmpty()) {
- LOG.warn("{} not specified", getSslKeystoreLocationProperty());
- km = null;
- }
- else {
- km = createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType);
- }
-
- tm = getTrustManager(config);
- }
-
- if (km != null) {
- sslContextBuilder.keyManager(km);
- }
- if (tm != null) {
- sslContextBuilder.trustManager(tm);
- }
-
- sslContextBuilder.enableOcsp(config.getBoolean(getSslOcspEnabledProperty()));
- sslContextBuilder.protocols(getEnabledProtocols(config));
- Iterable<String> enabledCiphers = getCipherSuites(config);
- if (enabledCiphers != null) {
- sslContextBuilder.ciphers(enabledCiphers);
- }
- sslContextBuilder.sslProvider(getSslProvider(config));
-
- SslContext sslContext1 = sslContextBuilder.build();
-
- if (getFipsMode(config) && isServerHostnameVerificationEnabled(config)) {
- return addHostnameVerification(sslContext1, "Server");
- } else {
- return sslContext1;
- }
- }
-
- public SslContext createNettySslContextForServer(ZKConfig config)
- throws X509Exception.SSLContextException, X509Exception.KeyManagerException, X509Exception.TrustManagerException, SSLException {
- KeyManager km;
- TrustManager tm;
- if (VespaZookeeperTlsContextUtils.tlsContext().isPresent()) {
- km = VespaZookeeperTlsContextUtils.tlsContext().get().sslContext().keyManager();
- tm = VespaZookeeperTlsContextUtils.tlsContext().get().sslContext().trustManager();
- }
- else {
- String keyStoreLocation = config.getProperty(getSslKeystoreLocationProperty(), "");
- String keyStorePassword = getPasswordFromConfigPropertyOrFile(config, getSslKeystorePasswdProperty(),
- getSslKeystorePasswdPathProperty());
- String keyStoreType = config.getProperty(getSslKeystoreTypeProperty());
-
- if (keyStoreLocation.isEmpty()) {
- throw new X509Exception.SSLContextException(
- "Keystore is required for SSL server: " + getSslKeystoreLocationProperty());
- }
- km = createKeyManager(keyStoreLocation, keyStorePassword, keyStoreType);
- tm = getTrustManager(config);
- }
- return createNettySslContextForServer(config, km, tm);
- }
-
- public SslContext createNettySslContextForServer(ZKConfig config, KeyManager keyManager, TrustManager trustManager) throws SSLException {
- SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager);
-
- if (trustManager != null) {
- sslContextBuilder.trustManager(trustManager);
- }
-
- sslContextBuilder.enableOcsp(config.getBoolean(getSslOcspEnabledProperty()));
- sslContextBuilder.protocols(getEnabledProtocols(config));
- sslContextBuilder.clientAuth(getClientAuth(config).toNettyClientAuth());
- Iterable<String> enabledCiphers = getCipherSuites(config);
- if (enabledCiphers != null) {
- sslContextBuilder.ciphers(enabledCiphers);
- }
- sslContextBuilder.sslProvider(getSslProvider(config));
-
- SslContext sslContext1 = sslContextBuilder.build();
-
- if (getFipsMode(config) && isClientHostnameVerificationEnabled(config)) {
- return addHostnameVerification(sslContext1, "Client");
- } else {
- return sslContext1;
- }
- }
-
- private SslContext addHostnameVerification(SslContext sslContext, String clientOrServer) {
- return new DelegatingSslContext(sslContext) {
- @Override
- protected void initEngine(SSLEngine sslEngine) {
- SSLParameters sslParameters = sslEngine.getSSLParameters();
- sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
- sslEngine.setSSLParameters(sslParameters);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} hostname verification: enabled HTTPS style endpoint identification algorithm", clientOrServer);
- }
- }
- };
- }
-
- private String[] getEnabledProtocols(final ZKConfig config) {
- String enabledProtocolsInput = config.getProperty(getSslEnabledProtocolsProperty());
- if (enabledProtocolsInput == null) {
- return new String[]{ config.getProperty(getSslProtocolProperty(), DEFAULT_PROTOCOL) };
- }
- return enabledProtocolsInput.split(",");
- }
-
- private X509Util.ClientAuth getClientAuth(final ZKConfig config) {
- return X509Util.ClientAuth.fromPropertyValue(config.getProperty(getSslClientAuthProperty()));
- }
-
- private Iterable<String> getCipherSuites(final ZKConfig config) {
- String cipherSuitesInput = config.getProperty(getSslCipherSuitesProperty());
- if (cipherSuitesInput == null) {
- if (getSslProvider(config) != SslProvider.JDK) {
- return null;
- }
- return List.of(X509Util.getDefaultCipherSuites());
- } else {
- return List.of(cipherSuitesInput.split(","));
- }
- }
-
- public SslProvider getSslProvider(ZKConfig config) {
- return SslProvider.valueOf(config.getProperty(getSslProviderProperty(), "JDK"));
- }
-
- private TrustManager getTrustManager(ZKConfig config) throws X509Exception.TrustManagerException {
- String trustStoreLocation = config.getProperty(getSslTruststoreLocationProperty(), "");
- String trustStorePassword = getPasswordFromConfigPropertyOrFile(config, getSslTruststorePasswdProperty(),
- getSslTruststorePasswdPathProperty());
- String trustStoreType = config.getProperty(getSslTruststoreTypeProperty());
-
- boolean sslCrlEnabled = config.getBoolean(getSslCrlEnabledProperty());
- boolean sslOcspEnabled = config.getBoolean(getSslOcspEnabledProperty());
- boolean sslServerHostnameVerificationEnabled = isServerHostnameVerificationEnabled(config);
- boolean sslClientHostnameVerificationEnabled = isClientHostnameVerificationEnabled(config);
-
- if (trustStoreLocation.isEmpty()) {
- LOG.warn("{} not specified", getSslTruststoreLocationProperty());
- return null;
- } else {
- return createTrustManager(trustStoreLocation, trustStorePassword, trustStoreType,
- sslCrlEnabled, sslOcspEnabled, sslServerHostnameVerificationEnabled,
- sslClientHostnameVerificationEnabled, getFipsMode(config));
- }
- }
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/NetUtils.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/NetUtils.java
deleted file mode 100644
index baa69f12968..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/common/NetUtils.java
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.common;
-
-import java.net.Inet6Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-/**
- * This class contains common utilities for netstuff. Like printing IPv6 literals correctly
- */
-public class NetUtils {
-
- // Note: Changed from original to use hostname from InetSocketAddress if there exists one
- public static String formatInetAddr(InetSocketAddress addr) {
- String hostName = addr.getHostName();
- if (hostName != null) {
- return String.format("%s:%s", hostName, addr.getPort());
- }
-
- InetAddress ia = addr.getAddress();
-
- if (ia == null) {
- return String.format("%s:%s", addr.getHostString(), addr.getPort());
- }
- if (ia instanceof Inet6Address) {
- return String.format("[%s]:%s", ia.getHostAddress(), addr.getPort());
- } else {
- return String.format("%s:%s", ia.getHostAddress(), addr.getPort());
- }
- }
-
- /**
- * Separates host and port from given host port string if host port string is enclosed
- * within square bracket.
- *
- * @param hostPort host port string
- * @return String[]{host, port} if host port string is host:port
- * or String[] {host, port:port} if host port string is host:port:port
- * or String[] {host} if host port string is host
- * or String[]{} if not a ipv6 host port string.
- */
- public static String[] getIPV6HostAndPort(String hostPort) {
- if (hostPort.startsWith("[")) {
- int i = hostPort.lastIndexOf(']');
- if (i < 0) {
- throw new IllegalArgumentException(
- hostPort + " starts with '[' but has no matching ']'");
- }
- String host = hostPort.substring(1, i);
- if (host.isEmpty()) {
- throw new IllegalArgumentException(host + " is empty.");
- }
- if (hostPort.length() > i + 1) {
- return getHostPort(hostPort, i, host);
- }
- return new String[] { host };
- } else {
- //Not an IPV6 host port string
- return new String[] {};
- }
- }
-
- private static String[] getHostPort(String hostPort, int indexOfClosingBracket, String host) {
- // [127::1]:2181 , check separator : exits
- if (hostPort.charAt(indexOfClosingBracket + 1) != ':') {
- throw new IllegalArgumentException(hostPort + " does not have : after ]");
- }
- // [127::1]: scenario
- if (indexOfClosingBracket + 2 == hostPort.length()) {
- throw new IllegalArgumentException(hostPort + " doesn't have a port after colon.");
- }
- //do not include
- String port = hostPort.substring(indexOfClosingBracket + 2);
- return new String[] { host, port };
- }
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
deleted file mode 100644
index cf7f4c44015..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.Flushable;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This RequestProcessor logs requests to disk. It batches the requests to do
- * the io efficiently. The request is not passed to the next RequestProcessor
- * until its log has been synced to disk.
- *
- * SyncRequestProcessor is used in 3 different cases
- * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
- * send ack back to itself.
- * 2. Follower - Sync request to disk and forward request to
- * SendAckRequestProcessor which send the packets to leader.
- * SendAckRequestProcessor is flushable which allow us to force
- * push packets to leader.
- * 3. Observer - Sync committed request to disk (received as INFORM packet).
- * It never send ack back to the leader, so the nextProcessor will
- * be null. This change the semantic of txnlog on the observer
- * since it only contains committed txns.
- */
-public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
-
- private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
-
- private static class FlushRequest extends Request {
- private final CountDownLatch latch = new CountDownLatch(1);
- public FlushRequest() {
- super(null, 0, 0, 0, null, null);
- }
- }
-
- private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null);
- private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null);
-
- private static class DelayingProcessor implements RequestProcessor, Flushable {
- private final RequestProcessor next;
- private Queue<Request> delayed = null;
- private DelayingProcessor(RequestProcessor next) {
- this.next = next;
- }
- @Override
- public void flush() throws IOException {
- if (delayed == null && next instanceof Flushable) {
- ((Flushable) next).flush();
- }
- }
- @Override
- public void processRequest(Request request) throws RequestProcessorException {
- if (delayed == null) {
- next.processRequest(request);
- } else {
- delayed.add(request);
- }
- }
- @Override
- public void shutdown() {
- next.shutdown();
- }
- private void startDelaying() {
- if (delayed == null) {
- delayed = new ArrayDeque<>();
- }
- }
- private void flushAndStopDelaying() throws RequestProcessorException {
- if (delayed != null) {
- for (Request request : delayed) {
- next.processRequest(request);
- }
- delayed = null;
- }
- }
- }
-
- /** The number of log entries to log before starting a snapshot */
- private static int snapCount = ZooKeeperServer.getSnapCount();
-
- /**
- * The total size of log entries before starting a snapshot
- */
- private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
-
- /**
- * Random numbers used to vary snapshot timing
- */
- private int randRoll;
- private long randSize;
-
- private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>();
-
- private final Semaphore snapThreadMutex = new Semaphore(1);
-
- private final ZooKeeperServer zks;
-
- private final DelayingProcessor nextProcessor;
-
- /**
- * Transactions that have been written and are waiting to be flushed to
- * disk. Basically this is the list of SyncItems whose callbacks will be
- * invoked after flush returns successfully.
- */
- private final Queue<Request> toFlush;
- private long lastFlushTime;
-
- public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
- super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
- this.zks = zks;
- this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
- this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
- }
-
- /**
- * used by tests to check for changing
- * snapcounts
- * @param count
- */
- public static void setSnapCount(int count) {
- snapCount = count;
- }
-
- /**
- * used by tests to get the snapcount
- * @return the snapcount
- */
- public static int getSnapCount() {
- return snapCount;
- }
-
- private long getRemainingDelay() {
- long flushDelay = zks.getFlushDelay();
- long duration = Time.currentElapsedTime() - lastFlushTime;
- if (duration < flushDelay) {
- return flushDelay - duration;
- }
- return 0;
- }
-
- /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
- * whenever either condition is hit. If only one or the other is
- * set, flush only when the relevant condition is hit.
- */
- private boolean shouldFlush() {
- long flushDelay = zks.getFlushDelay();
- long maxBatchSize = zks.getMaxBatchSize();
- if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
- return true;
- }
- return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
- }
-
- /**
- * used by tests to check for changing
- * snapcounts
- * @param size
- */
- public static void setSnapSizeInBytes(long size) {
- snapSizeInBytes = size;
- }
-
- private boolean shouldSnapshot() {
- int logCount = zks.getZKDatabase().getTxnCount();
- long logSize = zks.getZKDatabase().getTxnSize();
- return (logCount > (snapCount / 2 + randRoll))
- || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
- }
-
- private void resetSnapshotStats() {
- randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
- randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
- }
-
- @Override
- public void run() {
- try {
- // we do this in an attempt to ensure that not all of the servers
- // in the ensemble take a snapshot at the same time
- resetSnapshotStats();
- lastFlushTime = Time.currentElapsedTime();
- while (true) {
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
-
- long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
- Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
- if (si == null) {
- /* We timed out looking for more writes to batch, go ahead and flush immediately */
- flush();
- si = queuedRequests.take();
- }
-
- if (si == REQUEST_OF_DEATH) {
- break;
- }
-
- if (si == TURN_FORWARDING_DELAY_ON_REQUEST) {
- nextProcessor.startDelaying();
- continue;
- }
- if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) {
- nextProcessor.flushAndStopDelaying();
- continue;
- }
-
- if (si instanceof FlushRequest) {
- flush();
- ((FlushRequest) si).latch.countDown();
- continue;
- }
-
- long startProcessTime = Time.currentElapsedTime();
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
-
- // track the number of records written to the log
- if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
- if (shouldSnapshot()) {
- resetSnapshotStats();
- // roll the log
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (!snapThreadMutex.tryAcquire()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- new ZooKeeperThread("Snapshot Thread") {
- public void run() {
- try {
- zks.takeSnapshot();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- snapThreadMutex.release();
- }
- }
- }.start();
- }
- }
- } else if (toFlush.isEmpty()) {
- // optimization for read heavy workloads
- // iff this is a read or a throttled request(which doesn't need to be written to the disk),
- // and there are no pending flushes (writes), then just pass this to the next processor
- if (nextProcessor != null) {
- nextProcessor.processRequest(si);
- nextProcessor.flush();
- }
- continue;
- }
- toFlush.add(si);
- if (shouldFlush()) {
- flush();
- }
- ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
- }
- } catch (Throwable t) {
- handleException(this.getName(), t);
- }
- LOG.info("SyncRequestProcessor exited!");
- }
-
- /** Flushes all pending writes, and waits for this to complete. */
- public void syncFlush() throws InterruptedException {
- FlushRequest marker = new FlushRequest();
- queuedRequests.add(marker);
- marker.latch.await();
- }
-
- public void setDelayForwarding(boolean delayForwarding) {
- queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST);
- }
-
- private void flush() throws IOException, RequestProcessorException {
- if (this.toFlush.isEmpty()) {
- return;
- }
-
- ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
-
- long flushStartTime = Time.currentElapsedTime();
- zks.getZKDatabase().commit();
- ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
-
- if (this.nextProcessor == null) {
- this.toFlush.clear();
- } else {
- while (!this.toFlush.isEmpty()) {
- final Request i = this.toFlush.remove();
- long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
- this.nextProcessor.processRequest(i);
- }
- nextProcessor.flush();
- }
- lastFlushTime = Time.currentElapsedTime();
- }
-
- public void shutdown() {
- LOG.info("Shutting down");
- queuedRequests.add(REQUEST_OF_DEATH);
- try {
- this.join();
- this.flush();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while wating for {} to finish", this);
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- LOG.warn("Got IO exception during shutdown");
- } catch (RequestProcessorException e) {
- LOG.warn("Got request processor exception during shutdown");
- }
- if (nextProcessor != null) {
- nextProcessor.shutdown();
- }
- }
-
- public void processRequest(final Request request) {
- Objects.requireNonNull(request, "Request cannot be null");
-
- request.syncQueueStartTime = Time.currentElapsedTime();
- queuedRequests.add(request);
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
deleted file mode 100644
index 114d2987fe2..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package org.apache.zookeeper.server;
-
-import com.yahoo.vespa.zookeeper.Configurator;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.logging.Logger;
-
-/**
- * Overrides secure setting with value from {@link Configurator}.
- * Workaround for incorrect handling of clientSecurePort in combination with ZooKeeper Dynamic Reconfiguration in 3.6.2
- * See https://issues.apache.org/jira/browse/ZOOKEEPER-3577.
- *
- * Using package {@link org.apache.zookeeper.server} as {@link NettyServerCnxnFactory#NettyServerCnxnFactory()} is package-private.
- *
- * @author bjorncs
- */
-public class VespaNettyServerCnxnFactory extends NettyServerCnxnFactory {
-
- private static final Logger log = Logger.getLogger(VespaNettyServerCnxnFactory.class.getName());
-
- private final boolean isSecure;
-
- public VespaNettyServerCnxnFactory() {
- super();
- this.isSecure = Configurator.VespaNettyServerCnxnFactory_isSecure;
- boolean portUnificationEnabled = Boolean.getBoolean(NettyServerCnxnFactory.PORT_UNIFICATION_KEY);
- log.info(String.format("For %h: isSecure=%b, portUnification=%b", this, isSecure, portUnificationEnabled));
- }
-
- @Override
- public void configure(InetSocketAddress addr, int maxClientCnxns, int backlog, boolean secure) throws IOException {
- log.info(String.format("For %h: configured() invoked with parameter 'secure'=%b, overridden to %b", this, secure, isSecure));
- super.configure(addr, maxClientCnxns, backlog, isSecure);
- }
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
deleted file mode 100644
index 00af31b46d4..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ /dev/null
@@ -1,2412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.zip.Adler32;
-import java.util.zip.CheckedInputStream;
-import javax.security.sasl.SaslException;
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.Environment;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.Quotas;
-import org.apache.zookeeper.StatsTrack;
-import org.apache.zookeeper.Version;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.ZookeeperBanner;
-import org.apache.zookeeper.common.PathUtils;
-import org.apache.zookeeper.common.StringUtils;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.data.StatPersisted;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.metrics.MetricsContext;
-import org.apache.zookeeper.proto.AuthPacket;
-import org.apache.zookeeper.proto.ConnectRequest;
-import org.apache.zookeeper.proto.ConnectResponse;
-import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.proto.DeleteRequest;
-import org.apache.zookeeper.proto.GetSASLRequest;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.proto.RequestHeader;
-import org.apache.zookeeper.proto.SetACLRequest;
-import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.proto.SetSASLResponse;
-import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
-import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
-import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
-import org.apache.zookeeper.server.SessionTracker.Session;
-import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
-import org.apache.zookeeper.server.auth.ProviderRegistry;
-import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
-import org.apache.zookeeper.server.util.JvmPauseMonitor;
-import org.apache.zookeeper.server.util.OSMXBean;
-import org.apache.zookeeper.server.util.QuotaMetricsUtils;
-import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
-import org.apache.zookeeper.txn.CreateSessionTxn;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.apache.zookeeper.util.ServiceUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class implements a simple standalone ZooKeeperServer. It sets up the
- * following chain of RequestProcessors to process requests:
- * PrepRequestProcessor -&gt; SyncRequestProcessor -&gt; FinalRequestProcessor
- */
-public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
-
- protected static final Logger LOG;
- private static final RateLogger RATE_LOGGER;
-
- public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
-
- public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
- public static final String SKIP_ACL = "zookeeper.skipACL";
- public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
-
- // When enabled, will check ACL constraints appertained to the requests first,
- // before sending the requests to the quorum.
- static boolean enableEagerACLCheck;
-
- static final boolean skipACL;
-
- public static final boolean enforceQuota;
-
- public static final String SASL_SUPER_USER = "zookeeper.superUser";
-
- public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
- public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
- private static boolean digestEnabled;
-
- public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
- private static boolean serializeLastProcessedZxidEnabled;
-
- // Add a enable/disable option for now, we should remove this one when
- // this feature is confirmed to be stable
- public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
- private static boolean closeSessionTxnEnabled = true;
- private volatile CountDownLatch restoreLatch;
-
- static {
- LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
-
- RATE_LOGGER = new RateLogger(LOG);
-
- ZookeeperBanner.printBanner(LOG);
-
- Environment.logEnv("Server environment:", LOG);
-
- enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
- LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);
-
- skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
- if (skipACL) {
- LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
- }
-
- enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
- if (enforceQuota) {
- LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
- }
-
- digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
- LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
-
- closeSessionTxnEnabled = Boolean.parseBoolean(
- System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
- LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
-
- setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
- System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
- }
-
- // @VisibleForTesting
- public static boolean isEnableEagerACLCheck() {
- return enableEagerACLCheck;
- }
-
- // @VisibleForTesting
- public static void setEnableEagerACLCheck(boolean enabled) {
- ZooKeeperServer.enableEagerACLCheck = enabled;
- LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled);
- }
-
- public static boolean isCloseSessionTxnEnabled() {
- return closeSessionTxnEnabled;
- }
-
- public static void setCloseSessionTxnEnabled(boolean enabled) {
- ZooKeeperServer.closeSessionTxnEnabled = enabled;
- LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
- ZooKeeperServer.closeSessionTxnEnabled);
- }
-
- protected ZooKeeperServerBean jmxServerBean;
- protected DataTreeBean jmxDataTreeBean;
-
- public static final int DEFAULT_TICK_TIME = 3000;
- protected int tickTime = DEFAULT_TICK_TIME;
- public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
- protected static volatile int throttledOpWaitTime =
- Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
- /** value of -1 indicates unset, use default */
- protected int minSessionTimeout = -1;
- /** value of -1 indicates unset, use default */
- protected int maxSessionTimeout = -1;
- /** Socket listen backlog. Value of -1 indicates unset */
- protected int listenBacklog = -1;
- protected SessionTracker sessionTracker;
- private FileTxnSnapLog txnLogFactory = null;
- private ZKDatabase zkDb;
- private ResponseCache readResponseCache;
- private ResponseCache getChildrenResponseCache;
- private final AtomicLong hzxid = new AtomicLong(0);
- public static final Exception ok = new Exception("No prob");
- protected RequestProcessor firstProcessor;
- protected JvmPauseMonitor jvmPauseMonitor;
- protected volatile State state = State.INITIAL;
- private boolean isResponseCachingEnabled = true;
- /* contains the configuration file content read at startup */
- protected String initialConfig;
- protected boolean reconfigEnabled;
- private final RequestPathMetricsCollector requestPathMetricsCollector;
- private static final int DEFAULT_SNAP_COUNT = 100000;
- private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000;
-
- private boolean localSessionEnabled = false;
- protected enum State {
- INITIAL,
- RUNNING,
- SHUTDOWN,
- ERROR
- }
-
- /**
- * This is the secret that we use to generate passwords. For the moment,
- * it's more of a checksum that's used in reconnection, which carries no
- * security weight, and is treated internally as if it carries no
- * security weight.
- */
- private static final long superSecret = 0XB3415C00L;
-
- private final AtomicInteger requestsInProcess = new AtomicInteger(0);
- final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
- // this data structure must be accessed under the outstandingChanges lock
- final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
-
- protected ServerCnxnFactory serverCnxnFactory;
- protected ServerCnxnFactory secureServerCnxnFactory;
-
- private final ServerStats serverStats;
- private final ZooKeeperServerListener listener;
- private ZooKeeperServerShutdownHandler zkShutdownHandler;
- private volatile int createSessionTrackerServerId = 1;
-
- private static final String FLUSH_DELAY = "zookeeper.flushDelay";
- private static volatile long flushDelay;
- private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
- private static volatile long maxWriteQueuePollTime;
- private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
- private static volatile int maxBatchSize;
-
- /**
- * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
- * Flag not used for small transfers like connectResponses.
- */
- public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
- public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
- public static final int intBufferStartingSizeBytes;
-
- public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
- public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
-
- static {
- long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
- setFlushDelay(configuredFlushDelay);
- setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
- setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
-
- intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);
-
- if (intBufferStartingSizeBytes < 32) {
- String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. "
- + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
-
- LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
- }
-
- // Connection throttling
- private final BlueThrottle connThrottle = new BlueThrottle();
-
- private RequestThrottler requestThrottler;
- public static final String SNAP_COUNT = "zookeeper.snapCount";
-
- /**
- * This setting sets a limit on the total number of large requests that
- * can be inflight and is designed to prevent ZooKeeper from accepting
- * too many large requests such that the JVM runs out of usable heap and
- * ultimately crashes.
- *
- * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
- * method which is called by the connection layer ({@link NIOServerCnxn},
- * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
- * data off the TCP socket. The limit is then checked again by the
- * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
- * also atomically updates {@link #currentLargeRequestBytes}. The request is
- * then marked as a large request, with the request size stored in the Request
- * object so that it can later be decremented from {@link #currentLargeRequestBytes}.
- *
- * When a request is completed or dropped, the relevant code path calls the
- * {@link #requestFinished(Request)} method which performs the decrement if
- * needed.
- */
- private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
-
- /**
- * The size threshold after which a request is considered a large request
- * and is checked against the large request byte limit.
- */
- private volatile int largeRequestThreshold = -1;
-
- private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
-
- private final AuthenticationHelper authHelper = new AuthenticationHelper();
-
- void removeCnxn(ServerCnxn cnxn) {
- zkDb.removeCnxn(cnxn);
- }
-
- /**
- * Creates a ZooKeeperServer instance. Nothing is setup, use the setX
- * methods to prepare the instance (eg datadir, datalogdir, ticktime,
- * builder, etc...)
- *
- */
- public ZooKeeperServer() {
- listener = new ZooKeeperServerListenerImpl(this);
- serverStats = new ServerStats(this);
- this.requestPathMetricsCollector = new RequestPathMetricsCollector();
- }
-
- /**
- * Keeping this constructor for backward compatibility
- */
- public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
- this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
- }
-
- /**
- * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
- * actually start listening for clients until run() is invoked.
- *
- */
- public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
- serverStats = new ServerStats(this);
- this.txnLogFactory = txnLogFactory;
- this.txnLogFactory.setServerStats(this.serverStats);
- this.zkDb = zkDb;
- this.tickTime = tickTime;
- setMinSessionTimeout(minSessionTimeout);
- setMaxSessionTimeout(maxSessionTimeout);
- this.listenBacklog = clientPortListenBacklog;
- this.reconfigEnabled = reconfigEnabled;
-
- listener = new ZooKeeperServerListenerImpl(this);
-
- readResponseCache = new ResponseCache(Integer.getInteger(
- GET_DATA_RESPONSE_CACHE_SIZE,
- ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
-
- getChildrenResponseCache = new ResponseCache(Integer.getInteger(
- GET_CHILDREN_RESPONSE_CACHE_SIZE,
- ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
-
- this.initialConfig = initialConfig;
-
- this.requestPathMetricsCollector = new RequestPathMetricsCollector();
-
- this.initLargeRequestThrottlingSettings();
-
- LOG.info(
- "Created server with"
- + " tickTime {} ms"
- + " minSessionTimeout {} ms"
- + " maxSessionTimeout {} ms"
- + " clientPortListenBacklog {}"
- + " dataLogdir {}"
- + " snapdir {}",
- tickTime,
- getMinSessionTimeout(),
- getMaxSessionTimeout(),
- getClientPortListenBacklog(),
- txnLogFactory.getDataLogDir(),
- txnLogFactory.getSnapDir());
- }
-
- public String getInitialConfig() {
- return initialConfig;
- }
-
- /**
- * Adds JvmPauseMonitor and calls
- * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
- *
- */
- public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
- this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
- this.jvmPauseMonitor = jvmPauseMonitor;
- if (jvmPauseMonitor != null) {
- LOG.info("Added JvmPauseMonitor to server");
- }
- }
-
- /**
- * creates a zookeeperserver instance.
- * @param txnLogFactory the file transaction snapshot logging class
- * @param tickTime the ticktime for the server
- */
- public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
- this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
- }
-
- public ServerStats serverStats() {
- return serverStats;
- }
-
- public RequestPathMetricsCollector getRequestPathMetricsCollector() {
- return requestPathMetricsCollector;
- }
-
- public BlueThrottle connThrottle() {
- return connThrottle;
- }
-
- public void dumpConf(PrintWriter pwriter) {
- pwriter.print("clientPort=");
- pwriter.println(getClientPort());
- pwriter.print("secureClientPort=");
- pwriter.println(getSecureClientPort());
- pwriter.print("dataDir=");
- pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
- pwriter.print("dataDirSize=");
- pwriter.println(getDataDirSize());
- pwriter.print("dataLogDir=");
- pwriter.println(zkDb.snapLog.getDataLogDir().getAbsolutePath());
- pwriter.print("dataLogSize=");
- pwriter.println(getLogDirSize());
- pwriter.print("tickTime=");
- pwriter.println(getTickTime());
- pwriter.print("maxClientCnxns=");
- pwriter.println(getMaxClientCnxnsPerHost());
- pwriter.print("minSessionTimeout=");
- pwriter.println(getMinSessionTimeout());
- pwriter.print("maxSessionTimeout=");
- pwriter.println(getMaxSessionTimeout());
- pwriter.print("clientPortListenBacklog=");
- pwriter.println(getClientPortListenBacklog());
-
- pwriter.print("serverId=");
- pwriter.println(getServerId());
- }
-
- public ZooKeeperServerConf getConf() {
- return new ZooKeeperServerConf(
- getClientPort(),
- zkDb.snapLog.getSnapDir().getAbsolutePath(),
- zkDb.snapLog.getDataLogDir().getAbsolutePath(),
- getTickTime(),
- getMaxClientCnxnsPerHost(),
- getMinSessionTimeout(),
- getMaxSessionTimeout(),
- getServerId(),
- getClientPortListenBacklog());
- }
-
- /**
- * This constructor is for backward compatibility with the existing unit
- * test code.
- * It defaults to FileLogProvider persistence provider.
- */
- public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
- this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
- }
-
- /**
- * Default constructor, relies on the config for its argument values
- *
- * @throws IOException
- */
- public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
- this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
- }
-
- /**
- * get the zookeeper database for this server
- * @return the zookeeper database for this server
- */
- public ZKDatabase getZKDatabase() {
- return this.zkDb;
- }
-
- /**
- * set the zkdatabase for this zookeeper server
- * @param zkDb
- */
- public void setZKDatabase(ZKDatabase zkDb) {
- this.zkDb = zkDb;
- }
-
- /**
- * Restore sessions and data
- */
- public void loadData() throws IOException, InterruptedException {
- /*
- * When a new leader starts executing Leader#lead, it
- * invokes this method. The database, however, has been
- * initialized before running leader election so that
- * the server could pick its zxid for its initial vote.
- * It does it by invoking QuorumPeer#getLastLoggedZxid.
- * Consequently, we don't need to initialize it once more
- * and avoid the penalty of loading it a second time. Not
- * reloading it is particularly important for applications
- * that host a large database.
- *
- * The following if block checks whether the database has
- * been initialized or not. Note that this method is
- * invoked by at least one other method:
- * ZooKeeperServer#startdata.
- *
- * See ZOOKEEPER-1642 for more detail.
- */
- if (zkDb.isInitialized()) {
- setZxid(zkDb.getDataTreeLastProcessedZxid());
- } else {
- setZxid(zkDb.loadDataBase());
- }
-
- // Clean up dead sessions
- zkDb.getSessions().stream()
- .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
- .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
-
- // Make a clean snapshot
- takeSnapshot();
- }
-
- public File takeSnapshot() throws IOException {
- return takeSnapshot(false);
- }
-
- public File takeSnapshot(boolean syncSnap) throws IOException {
- return takeSnapshot(syncSnap, true, false);
- }
-
- /**
- * Takes a snapshot on the server.
- *
- * @param syncSnap syncSnap sync the snapshot immediately after write
- * @param isSevere if true system exist, otherwise throw IOException
- * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
- *
- * @return file snapshot file object
- * @throws IOException
- */
- public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
- long start = Time.currentElapsedTime();
- File snapFile = null;
- try {
- if (fastForwardFromEdits) {
- zkDb.fastForwardDataBase();
- }
- snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
- } catch (IOException e) {
- if (isSevere) {
- LOG.error("Severe unrecoverable error, exiting", e);
- // This is a severe error that we cannot recover from,
- // so we need to exit
- ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
- } else {
- throw e;
- }
- }
- long elapsed = Time.currentElapsedTime() - start;
- LOG.info("Snapshot taken in {} ms", elapsed);
- ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
- return snapFile;
- }
-
- /**
- * Restores database from a snapshot. It is used by the restore admin server command.
- *
- * @param inputStream input stream of snapshot
- * @return last processed zxid
- */
- public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
- if (inputStream == null) {
- throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
- }
-
- long start = Time.currentElapsedTime();
- LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
- getZKDatabase().getDataTreeLastProcessedZxid(),
- getZKDatabase().dataTree.getNodeCount(),
- getZKDatabase().getSessionCount());
-
- // restore to a new zkDatabase
- final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
- final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
- final InputArchive ia = BinaryInputArchive.getArchive(cis);
- newZKDatabase.deserializeSnapshot(ia, cis);
- LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
- newZKDatabase.getDataTreeLastProcessedZxid(),
- newZKDatabase.dataTree.getNodeCount(),
- newZKDatabase.getSessionCount());
-
- // create a CountDownLatch
- restoreLatch = new CountDownLatch(1);
-
- try {
- // set to the new zkDatabase
- setZKDatabase(newZKDatabase);
-
- // re-create SessionTrack
- createSessionTracker();
- } finally {
- // unblock request submission
- restoreLatch.countDown();
- restoreLatch = null;
- }
-
- LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
- getZKDatabase().getDataTreeLastProcessedZxid(),
- getZKDatabase().dataTree.getNodeCount(),
- getZKDatabase().getSessionCount());
-
- long elapsed = Time.currentElapsedTime() - start;
- LOG.info("Restore taken in {} ms", elapsed);
- ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
-
- return getLastProcessedZxid();
- }
-
- public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
- return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
- }
-
- @Override
- public long getDataDirSize() {
- if (zkDb == null) {
- return 0L;
- }
- File path = zkDb.snapLog.getSnapDir();
- return getDirSize(path);
- }
-
- @Override
- public long getLogDirSize() {
- if (zkDb == null) {
- return 0L;
- }
- File path = zkDb.snapLog.getDataLogDir();
- return getDirSize(path);
- }
-
- private long getDirSize(File file) {
- long size = 0L;
- if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files != null) {
- for (File f : files) {
- size += getDirSize(f);
- }
- }
- } else {
- size = file.length();
- }
- return size;
- }
-
- public long getZxid() {
- return hzxid.get();
- }
-
- public SessionTracker getSessionTracker() {
- return sessionTracker;
- }
-
- long getNextZxid() {
- return hzxid.incrementAndGet();
- }
-
- public void setZxid(long zxid) {
- hzxid.set(zxid);
- }
-
- private void close(long sessionId) {
- Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
- submitRequest(si);
- }
-
- public void closeSession(long sessionId) {
- LOG.info("Closing session 0x{}", Long.toHexString(sessionId));
-
- // we do not want to wait for a session close. send it as soon as we
- // detect it!
- close(sessionId);
- }
-
- protected void killSession(long sessionId, long zxid) {
- zkDb.killSession(sessionId, zxid);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
- }
- if (sessionTracker != null) {
- sessionTracker.removeSession(sessionId);
- }
- }
-
- public void expire(Session session) {
- long sessionId = session.getSessionId();
- LOG.info(
- "Expiring session 0x{}, timeout of {}ms exceeded",
- Long.toHexString(sessionId),
- session.getTimeout());
- close(sessionId);
- }
-
- public void expire(long sessionId) {
- LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));
-
- close(sessionId);
- }
-
- public static class MissingSessionException extends IOException {
-
- private static final long serialVersionUID = 7467414635467261007L;
-
- public MissingSessionException(String msg) {
- super(msg);
- }
-
- }
-
- void touch(ServerCnxn cnxn) throws MissingSessionException {
- if (cnxn == null) {
- return;
- }
- long id = cnxn.getSessionId();
- int to = cnxn.getSessionTimeout();
- if (!sessionTracker.touchSession(id, to)) {
- throw new MissingSessionException("No session with sessionid 0x"
- + Long.toHexString(id)
- + " exists, probably expired and removed");
- }
- }
-
- protected void registerJMX() {
- // register with JMX
- try {
- jmxServerBean = new ZooKeeperServerBean(this);
- MBeanRegistry.getInstance().register(jmxServerBean, null);
-
- try {
- jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- public void startdata() throws IOException, InterruptedException {
- //check to see if zkDb is not null
- if (zkDb == null) {
- zkDb = new ZKDatabase(this.txnLogFactory);
- }
- if (!zkDb.isInitialized()) {
- loadData();
- }
- }
-
- public synchronized void startup() {
- startupWithServerState(State.RUNNING);
- }
-
- public synchronized void startupWithoutServing() {
- startupWithServerState(State.INITIAL);
- }
-
- public synchronized void startServing() {
- setState(State.RUNNING);
- notifyAll();
- }
-
- private void startupWithServerState(State state) {
- if (sessionTracker == null) {
- createSessionTracker();
- }
- startSessionTracker();
- setupRequestProcessors();
-
- startRequestThrottler();
-
- registerJMX();
-
- startJvmPauseMonitor();
-
- registerMetrics();
-
- setState(state);
-
- requestPathMetricsCollector.start();
-
- localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
-
- notifyAll();
- }
-
- protected void startJvmPauseMonitor() {
- if (this.jvmPauseMonitor != null) {
- this.jvmPauseMonitor.serviceStart();
- }
- }
-
- protected void startRequestThrottler() {
- requestThrottler = createRequestThrottler();
- requestThrottler.start();
- }
-
- protected RequestThrottler createRequestThrottler() {
- return new RequestThrottler(this);
- }
-
- protected void setupRequestProcessors() {
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
- ((SyncRequestProcessor) syncProcessor).start();
- firstProcessor = new PrepRequestProcessor(this, syncProcessor);
- ((PrepRequestProcessor) firstProcessor).start();
- }
-
- public ZooKeeperServerListener getZooKeeperServerListener() {
- return listener;
- }
-
- /**
- * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
- * {@link #startup()} being called
- *
- * @param newId ID to use
- */
- public void setCreateSessionTrackerServerId(int newId) {
- createSessionTrackerServerId = newId;
- }
-
- protected void createSessionTracker() {
- sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
- }
-
- protected void startSessionTracker() {
- ((SessionTrackerImpl) sessionTracker).start();
- }
-
- /**
- * Sets the state of ZooKeeper server. After changing the state, it notifies
- * the server state change to a registered shutdown handler, if any.
- * <p>
- * The following are the server state transitions:
- * <ul><li>During startup the server will be in the INITIAL state.</li>
- * <li>After successfully starting, the server sets the state to RUNNING.
- * </li>
- * <li>The server transitions to the ERROR state if it hits an internal
- * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
- * error events, e.g., SyncRequestProcessor not being able to write a txn to
- * disk.</li>
- * <li>During shutdown the server sets the state to SHUTDOWN, which
- * corresponds to the server not running.</li>
- *
- * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
- * </li></ul>
- *
- * @param state new server state.
- */
- protected void setState(State state) {
- this.state = state;
- // Notify server state changes to the registered shutdown handler, if any.
- if (zkShutdownHandler != null) {
- zkShutdownHandler.handle(state);
- } else {
- LOG.debug(
- "ZKShutdownHandler is not registered, so ZooKeeper server"
- + " won't take any action on ERROR or SHUTDOWN server state changes");
- }
- }
-
- /**
- * This can be used while shutting down the server to see whether the server
- * is already shutdown or not.
- *
- * @return true if the server is running or server hits an error, false
- * otherwise.
- */
- protected boolean canShutdown() {
- return state == State.RUNNING || state == State.ERROR;
- }
-
- /**
- * @return true if the server is running, false otherwise.
- */
- public boolean isRunning() {
- return state == State.RUNNING;
- }
-
- public void shutdown() {
- shutdown(false);
- }
-
- /**
- * Shut down the server instance
- * @param fullyShutDown true if another server using the same database will not replace this one in the same process
- */
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- if (fullyShutDown && zkDb != null) {
- zkDb.clear();
- }
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- return;
- }
- LOG.info("shutting down");
-
- // new RuntimeException("Calling shutdown").printStackTrace();
- setState(State.SHUTDOWN);
-
- // unregister all metrics that are keeping a strong reference to this object
- // subclasses will do their specific clean up
- unregisterMetrics();
-
- if (requestThrottler != null) {
- requestThrottler.shutdown();
- }
-
- // Since sessionTracker and syncThreads poll we just have to
- // set running to false and they will detect it during the poll
- // interval.
- if (sessionTracker != null) {
- sessionTracker.shutdown();
- }
- if (firstProcessor != null) {
- firstProcessor.shutdown();
- }
- if (jvmPauseMonitor != null) {
- jvmPauseMonitor.serviceStop();
- }
-
- if (zkDb != null) {
- if (fullyShutDown) {
- zkDb.clear();
- } else {
- // else there is no need to clear the database
- // * When a new quorum is established we can still apply the diff
- // on top of the same zkDb data
- // * If we fetch a new snapshot from leader, the zkDb will be
- // cleared anyway before loading the snapshot
- try {
- // This will fast-forward the database to the latest recorded transactions
- zkDb.fastForwardDataBase();
- } catch (IOException e) {
- LOG.error("Error updating DB", e);
- zkDb.clear();
- }
- }
- }
-
- requestPathMetricsCollector.shutdown();
- unregisterJMX();
- }
-
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- jmxDataTreeBean = null;
- }
-
- public void incInProcess() {
- requestsInProcess.incrementAndGet();
- }
-
- public void decInProcess() {
- requestsInProcess.decrementAndGet();
- if (requestThrottler != null) {
- requestThrottler.throttleWake();
- }
- }
-
- public int getInProcess() {
- return requestsInProcess.get();
- }
-
- public int getInflight() {
- return requestThrottleInflight();
- }
-
- private int requestThrottleInflight() {
- if (requestThrottler != null) {
- return requestThrottler.getInflight();
- }
- return 0;
- }
-
- static class PrecalculatedDigest {
- final long nodeDigest;
- final long treeDigest;
-
- PrecalculatedDigest(long nodeDigest, long treeDigest) {
- this.nodeDigest = nodeDigest;
- this.treeDigest = treeDigest;
- }
- }
-
-
- /**
- * This structure is used to facilitate information sharing between PrepRP
- * and FinalRP.
- */
- static class ChangeRecord {
- PrecalculatedDigest precalculatedDigest;
- byte[] data;
-
- ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
- this.zxid = zxid;
- this.path = path;
- this.stat = stat;
- this.childCount = childCount;
- this.acl = acl;
- }
-
- long zxid;
-
- String path;
-
- StatPersisted stat; /* Make sure to create a new object when changing */
-
- int childCount;
-
- List<ACL> acl; /* Make sure to create a new object when changing */
-
- ChangeRecord duplicate(long zxid) {
- StatPersisted stat = new StatPersisted();
- if (this.stat != null) {
- DataTree.copyStatPersisted(this.stat, stat);
- }
- ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
- acl == null ? new ArrayList<>() : new ArrayList<>(acl));
- changeRecord.precalculatedDigest = precalculatedDigest;
- changeRecord.data = data;
- return changeRecord;
- }
-
- }
-
- byte[] generatePasswd(long id) {
- Random r = new Random(id ^ superSecret);
- byte[] p = new byte[16];
- r.nextBytes(p);
- return p;
- }
-
- protected boolean checkPasswd(long sessionId, byte[] passwd) {
- return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
- }
-
- long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
- if (passwd == null) {
- // Possible since it's just deserialized from a packet on the wire.
- passwd = new byte[0];
- }
- long sessionId = sessionTracker.createSession(timeout);
- Random r = new Random(sessionId ^ superSecret);
- r.nextBytes(passwd);
- CreateSessionTxn txn = new CreateSessionTxn(timeout);
- cnxn.setSessionId(sessionId);
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
- submitRequest(si);
- return sessionId;
- }
-
- /**
- * set the owner of this session as owner
- * @param id the session id
- * @param owner the owner of the session
- * @throws SessionExpiredException
- */
- public void setOwner(long id, Object owner) throws SessionExpiredException {
- sessionTracker.setOwner(id, owner);
- }
-
- protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
- boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
- }
- finishSessionInit(cnxn, rc);
- }
-
- public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
- if (checkPasswd(sessionId, passwd)) {
- revalidateSession(cnxn, sessionId, sessionTimeout);
- } else {
- LOG.warn(
- "Incorrect password from {} for session 0x{}",
- cnxn.getRemoteSocketAddress(),
- Long.toHexString(sessionId));
- finishSessionInit(cnxn, false);
- }
- }
-
- public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
- // register with JMX
- try {
- if (valid) {
- if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
- serverCnxnFactory.registerConnection(cnxn);
- } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
- secureServerCnxnFactory.registerConnection(cnxn);
- }
- }
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
-
- try {
- ConnectResponse rsp = new ConnectResponse(
- 0,
- valid ? cnxn.getSessionTimeout() : 0,
- valid ? cnxn.getSessionId() : 0, // send 0 if session is no
- // longer valid
- valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
- this instanceof ReadOnlyZooKeeperServer);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- bos.writeInt(-1, "len");
- rsp.serialize(bos, "connect");
- baos.close();
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.remaining() - 4).rewind();
- cnxn.sendBuffer(bb);
-
- if (valid) {
- LOG.debug(
- "Established session 0x{} with negotiated timeout {} for client {}",
- Long.toHexString(cnxn.getSessionId()),
- cnxn.getSessionTimeout(),
- cnxn.getRemoteSocketAddress());
- cnxn.enableRecv();
- } else {
-
- LOG.info(
- "Invalid session 0x{} for client {}, probably expired",
- Long.toHexString(cnxn.getSessionId()),
- cnxn.getRemoteSocketAddress());
- cnxn.sendBuffer(ServerCnxnFactory.closeConn);
- }
-
- } catch (Exception e) {
- LOG.warn("Exception while establishing session, closing", e);
- cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
- }
- }
-
- public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
- closeSession(cnxn.getSessionId());
- }
-
- public long getServerId() {
- return 0;
- }
-
- /**
- * If the underlying Zookeeper server support local session, this method
- * will set a isLocalSession to true if a request is associated with
- * a local session.
- *
- * @param si
- */
- protected void setLocalSessionFlag(Request si) {
- }
-
- public void submitRequest(Request si) {
- if (restoreLatch != null) {
- try {
- LOG.info("Blocking request submission while restore is in progress");
- restoreLatch.await();
- } catch (final InterruptedException e) {
- LOG.warn("Unexpected interruption", e);
- }
- }
- enqueueRequest(si);
- }
-
- public void enqueueRequest(Request si) {
- if (requestThrottler == null) {
- synchronized (this) {
- try {
- // Since all requests are passed to the request
- // processor it should wait for setting up the request
- // processor chain. The state will be updated to RUNNING
- // after the setup.
- while (state == State.INITIAL) {
- wait(1000);
- }
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption", e);
- }
- if (requestThrottler == null) {
- throw new RuntimeException("Not started");
- }
- }
- }
- requestThrottler.submitRequest(si);
- }
-
- public void submitRequestNow(Request si) {
- if (firstProcessor == null) {
- synchronized (this) {
- try {
- // Since all requests are passed to the request
- // processor it should wait for setting up the request
- // processor chain. The state will be updated to RUNNING
- // after the setup.
- while (state == State.INITIAL) {
- wait(1000);
- }
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption", e);
- }
- if (firstProcessor == null || state != State.RUNNING) {
- throw new RuntimeException("Not started");
- }
- }
- }
- try {
- touch(si.cnxn);
- boolean validpacket = Request.isValid(si.type);
- if (validpacket) {
- setLocalSessionFlag(si);
- firstProcessor.processRequest(si);
- if (si.cnxn != null) {
- incInProcess();
- }
- } else {
- LOG.warn("Received packet at server of unknown type {}", si.type);
- // Update request accounting/throttling limits
- requestFinished(si);
- new UnimplementedRequestProcessor().processRequest(si);
- }
- } catch (MissingSessionException e) {
- LOG.debug("Dropping request.", e);
- // Update request accounting/throttling limits
- requestFinished(si);
- } catch (RequestProcessorException e) {
- LOG.error("Unable to process request", e);
- // Update request accounting/throttling limits
- requestFinished(si);
- }
- }
-
- public static int getSnapCount() {
- int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT);
- // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
- if (snapCount < 2) {
- LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
- snapCount = 2;
- }
- return snapCount;
- }
-
- public int getGlobalOutstandingLimit() {
- return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT);
- }
-
- public static long getSnapSizeInBytes() {
- long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
- if (size <= 0) {
- LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
- }
- return size * 1024; // Convert to bytes
- }
-
- public void setServerCnxnFactory(ServerCnxnFactory factory) {
- serverCnxnFactory = factory;
- }
-
- public ServerCnxnFactory getServerCnxnFactory() {
- return serverCnxnFactory;
- }
-
- public ServerCnxnFactory getSecureServerCnxnFactory() {
- return secureServerCnxnFactory;
- }
-
- public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
- secureServerCnxnFactory = factory;
- }
-
- /**
- * return the last processed id from the
- * datatree
- */
- public long getLastProcessedZxid() {
- return zkDb.getDataTreeLastProcessedZxid();
- }
-
- /**
- * return the outstanding requests
- * in the queue, which haven't been
- * processed yet
- */
- public long getOutstandingRequests() {
- return getInProcess();
- }
-
- /**
- * return the total number of client connections that are alive
- * to this server
- */
- public int getNumAliveConnections() {
- int numAliveConnections = 0;
-
- if (serverCnxnFactory != null) {
- numAliveConnections += serverCnxnFactory.getNumAliveConnections();
- }
-
- if (secureServerCnxnFactory != null) {
- numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
- }
-
- return numAliveConnections;
- }
-
- /**
- * truncate the log to get in sync with others
- * if in a quorum
- * @param zxid the zxid that it needs to get in sync
- * with others
- * @throws IOException
- */
- public void truncateLog(long zxid) throws IOException {
- this.zkDb.truncateLog(zxid);
- }
-
- public int getTickTime() {
- return tickTime;
- }
-
- public void setTickTime(int tickTime) {
- LOG.info("tickTime set to {} ms", tickTime);
- this.tickTime = tickTime;
- }
-
- public static int getThrottledOpWaitTime() {
- return throttledOpWaitTime;
- }
-
- public static void setThrottledOpWaitTime(int time) {
- LOG.info("throttledOpWaitTime set to {} ms", time);
- throttledOpWaitTime = time;
- }
-
- public int getMinSessionTimeout() {
- return minSessionTimeout;
- }
-
- public void setMinSessionTimeout(int min) {
- this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
- LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout);
- }
-
- public int getMaxSessionTimeout() {
- return maxSessionTimeout;
- }
-
- public void setMaxSessionTimeout(int max) {
- this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
- LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout);
- }
-
- public int getClientPortListenBacklog() {
- return listenBacklog;
- }
-
- public void setClientPortListenBacklog(int backlog) {
- this.listenBacklog = backlog;
- LOG.info("clientPortListenBacklog set to {}", backlog);
- }
-
- public int getClientPort() {
- return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
- }
-
- public int getSecureClientPort() {
- return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
- }
-
- /** Maximum number of connections allowed from particular host (ip) */
- public int getMaxClientCnxnsPerHost() {
- if (serverCnxnFactory != null) {
- return serverCnxnFactory.getMaxClientCnxnsPerHost();
- }
- if (secureServerCnxnFactory != null) {
- return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
- }
- return -1;
- }
-
- public void setTxnLogFactory(FileTxnSnapLog txnLog) {
- this.txnLogFactory = txnLog;
- }
-
- public FileTxnSnapLog getTxnLogFactory() {
- return this.txnLogFactory;
- }
-
- /**
- * Returns the elapsed sync of time of transaction log in milliseconds.
- */
- public long getTxnLogElapsedSyncTime() {
- return txnLogFactory.getTxnLogElapsedSyncTime();
- }
-
- public String getState() {
- return "standalone";
- }
-
- public void dumpEphemerals(PrintWriter pwriter) {
- zkDb.dumpEphemerals(pwriter);
- }
-
- public Map<Long, Set<String>> getEphemerals() {
- return zkDb.getEphemerals();
- }
-
- public double getConnectionDropChance() {
- return connThrottle.getDropChance();
- }
-
- public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
- LOG.debug(
- "Session establishment request from client {} client's lastZxid is 0x{}",
- cnxn.getRemoteSocketAddress(),
- Long.toHexString(request.getLastZxidSeen()));
-
- long sessionId = request.getSessionId();
- int tokensNeeded = 1;
- if (connThrottle.isConnectionWeightEnabled()) {
- if (sessionId == 0) {
- if (localSessionEnabled) {
- tokensNeeded = connThrottle.getRequiredTokensForLocal();
- } else {
- tokensNeeded = connThrottle.getRequiredTokensForGlobal();
- }
- } else {
- tokensNeeded = connThrottle.getRequiredTokensForRenew();
- }
- }
-
- if (!connThrottle.checkLimit(tokensNeeded)) {
- throw new ClientCnxnLimitException();
- }
- ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
- ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
-
- if (!cnxn.protocolManager.isReadonlyAvailable()) {
- LOG.warn(
- "Connection request from old client {}; will be dropped if server is in r-o mode",
- cnxn.getRemoteSocketAddress());
- }
-
- if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
- String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
- LOG.info(msg);
- throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
- }
- if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
- String msg = "Refusing session(0x"
- + Long.toHexString(sessionId)
- + ") request for client "
- + cnxn.getRemoteSocketAddress()
- + " as it has seen zxid 0x"
- + Long.toHexString(request.getLastZxidSeen())
- + " our last zxid is 0x"
- + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
- + " client must try another server";
-
- LOG.info(msg);
- throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
- }
- int sessionTimeout = request.getTimeOut();
- byte[] passwd = request.getPasswd();
- int minSessionTimeout = getMinSessionTimeout();
- if (sessionTimeout < minSessionTimeout) {
- sessionTimeout = minSessionTimeout;
- }
- int maxSessionTimeout = getMaxSessionTimeout();
- if (sessionTimeout > maxSessionTimeout) {
- sessionTimeout = maxSessionTimeout;
- }
- cnxn.setSessionTimeout(sessionTimeout);
- // We don't want to receive any packets until we are sure that the
- // session is setup
- cnxn.disableRecv();
- if (sessionId == 0) {
- long id = createSession(cnxn, passwd, sessionTimeout);
- LOG.debug(
- "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
- Long.toHexString(id),
- Long.toHexString(request.getLastZxidSeen()),
- request.getTimeOut(),
- cnxn.getRemoteSocketAddress());
- } else {
- validateSession(cnxn, sessionId);
- LOG.debug(
- "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
- Long.toHexString(sessionId),
- Long.toHexString(request.getLastZxidSeen()),
- request.getTimeOut(),
- cnxn.getRemoteSocketAddress());
- if (serverCnxnFactory != null) {
- serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
- }
- if (secureServerCnxnFactory != null) {
- secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
- }
- cnxn.setSessionId(sessionId);
- reopenSession(cnxn, sessionId, passwd, sessionTimeout);
- ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
-
- }
- }
-
- /**
- * Validate if a particular session can be reestablished.
- *
- * @param cnxn
- * @param sessionId
- */
- protected void validateSession(ServerCnxn cnxn, long sessionId)
- throws IOException {
- // do nothing
- }
-
- public boolean shouldThrottle(long outStandingCount) {
- int globalOutstandingLimit = getGlobalOutstandingLimit();
- if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
- return outStandingCount > 0;
- }
- return false;
- }
-
- long getFlushDelay() {
- return flushDelay;
- }
-
- static void setFlushDelay(long delay) {
- LOG.info("{} = {} ms", FLUSH_DELAY, delay);
- flushDelay = delay;
- }
-
- long getMaxWriteQueuePollTime() {
- return maxWriteQueuePollTime;
- }
-
- static void setMaxWriteQueuePollTime(long maxTime) {
- LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
- maxWriteQueuePollTime = maxTime;
- }
-
- int getMaxBatchSize() {
- return maxBatchSize;
- }
-
- static void setMaxBatchSize(int size) {
- LOG.info("{}={}", MAX_BATCH_SIZE, size);
- maxBatchSize = size;
- }
-
- private void initLargeRequestThrottlingSettings() {
- setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
- setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
- }
-
- public int getLargeRequestMaxBytes() {
- return largeRequestMaxBytes;
- }
-
- public void setLargeRequestMaxBytes(int bytes) {
- if (bytes <= 0) {
- LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
- LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
- } else {
- largeRequestMaxBytes = bytes;
- LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
- }
- }
-
- public int getLargeRequestThreshold() {
- return largeRequestThreshold;
- }
-
- public void setLargeRequestThreshold(int threshold) {
- if (threshold == 0 || threshold < -1) {
- LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
- largeRequestThreshold = -1;
- } else {
- largeRequestThreshold = threshold;
- LOG.info("The large request threshold is set to {}", largeRequestThreshold);
- }
- }
-
- public int getLargeRequestBytes() {
- return currentLargeRequestBytes.get();
- }
-
- private boolean isLargeRequest(int length) {
- // The large request limit is disabled when threshold is -1
- if (largeRequestThreshold == -1) {
- return false;
- }
- return length > largeRequestThreshold;
- }
-
- public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
- if (!isLargeRequest(length)) {
- return true;
- }
- if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
- return true;
- } else {
- ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
- throw new IOException("Rejecting large request");
- }
-
- }
-
- private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
- if (!isLargeRequest(length)) {
- return true;
- }
-
- int bytes = currentLargeRequestBytes.addAndGet(length);
- if (bytes > largeRequestMaxBytes) {
- currentLargeRequestBytes.addAndGet(-length);
- ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
- throw new IOException("Rejecting large request");
- }
- return true;
- }
-
- public void requestFinished(Request request) {
- int largeRequestLength = request.getLargeRequestSize();
- if (largeRequestLength != -1) {
- currentLargeRequestBytes.addAndGet(-largeRequestLength);
- }
- }
-
- public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
- // Need to increase the outstanding request count first, otherwise
- // there might be a race condition that it enabled recv after
- // processing request and then disabled when check throttling.
- //
- // Be aware that we're actually checking the global outstanding
- // request before this request.
- //
- // It's fine if the IOException thrown before we decrease the count
- // in cnxn, since it will close the cnxn anyway.
- cnxn.incrOutstandingAndCheckThrottle(h);
-
- if (h.getType() == OpCode.auth) {
- LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
- AuthPacket authPacket = request.readRecord(AuthPacket::new);
- String scheme = authPacket.getScheme();
- ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
- Code authReturn = KeeperException.Code.AUTHFAILED;
- if (ap != null) {
- try {
- // handleAuthentication may close the connection, to allow the client to choose
- // a different server to connect to.
- authReturn = ap.handleAuthentication(
- new ServerAuthenticationProvider.ServerObjs(this, cnxn),
- authPacket.getAuth());
- } catch (RuntimeException e) {
- LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
- authReturn = KeeperException.Code.AUTHFAILED;
- }
- }
- if (authReturn == KeeperException.Code.OK) {
- LOG.info("Session 0x{}: auth success for scheme {} and address {}",
- Long.toHexString(cnxn.getSessionId()), scheme,
- cnxn.getRemoteSocketAddress());
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
- cnxn.sendResponse(rh, null, null);
- } else {
- if (ap == null) {
- LOG.warn(
- "No authentication provider for scheme: {} has {}",
- scheme,
- ProviderRegistry.listProviders());
- } else {
- LOG.warn("Authentication failed for scheme: {}", scheme);
- }
- // send a response...
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
- cnxn.sendResponse(rh, null, null);
- // ... and close connection
- cnxn.sendBuffer(ServerCnxnFactory.closeConn);
- cnxn.disableRecv();
- }
- return;
- } else if (h.getType() == OpCode.sasl) {
- processSasl(request, cnxn, h);
- } else {
- if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
- // Authentication enforcement is failed
- // Already sent response to user about failure and closed the session, lets return
- return;
- } else {
- Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
- int length = request.limit();
- if (isLargeRequest(length)) {
- // checkRequestSize will throw IOException if request is rejected
- checkRequestSizeWhenMessageReceived(length);
- si.setLargeRequestSize(length);
- }
- si.setOwner(ServerCnxn.me);
- submitRequest(si);
- }
- }
- }
-
- private static boolean isSaslSuperUser(String id) {
- if (id == null || id.isEmpty()) {
- return false;
- }
-
- Properties properties = System.getProperties();
- int prefixLen = SASL_SUPER_USER.length();
-
- for (String k : properties.stringPropertyNames()) {
- if (k.startsWith(SASL_SUPER_USER)
- && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
- String value = properties.getProperty(k);
-
- if (value != null && value.equals(id)) {
- return true;
- }
- }
- }
-
- return false;
- }
-
- private static boolean shouldAllowSaslFailedClientsConnect() {
- return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
- }
-
- private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
- LOG.debug("Responding to client SASL token.");
- GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
- byte[] clientToken = clientTokenRecord.getToken();
- LOG.debug("Size of client SASL token: {}", clientToken.length);
- byte[] responseToken = null;
- try {
- ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
- try {
- // note that clientToken might be empty (clientToken.length == 0):
- // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
- // SASL negotiation process.
- responseToken = saslServer.evaluateResponse(clientToken);
- if (saslServer.isComplete()) {
- String authorizationID = saslServer.getAuthorizationID();
- LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
- Long.toHexString(cnxn.getSessionId()), authorizationID);
- cnxn.addAuthInfo(new Id("sasl", authorizationID));
-
- if (isSaslSuperUser(authorizationID)) {
- cnxn.addAuthInfo(new Id("super", ""));
- LOG.info(
- "Session 0x{}: Authenticated Id '{}' as super user",
- Long.toHexString(cnxn.getSessionId()),
- authorizationID);
- }
- }
- } catch (SaslException e) {
- LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
- if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
- LOG.warn("Maintaining client connection despite SASL authentication failure.");
- } else {
- int error;
- if (authHelper.isSaslAuthRequired()) {
- LOG.warn(
- "Closing client connection due to server requires client SASL authenticaiton,"
- + "but client SASL authentication has failed, or client is not configured with SASL "
- + "authentication.");
- error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
- } else {
- LOG.warn("Closing client connection due to SASL authentication failure.");
- error = Code.AUTHFAILED.intValue();
- }
-
- ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
- cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
- cnxn.sendCloseSession();
- cnxn.disableRecv();
- return;
- }
- }
- } catch (NullPointerException e) {
- LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
- }
- if (responseToken != null) {
- LOG.debug("Size of server SASL response: {}", responseToken.length);
- }
-
- ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
- Record record = new SetSASLResponse(responseToken);
- cnxn.sendResponse(replyHeader, record, "response");
- }
-
- // entry point for quorum/Learner.java
- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
- processTxnForSessionEvents(null, hdr, txn);
- return processTxnInDB(hdr, txn, null);
- }
-
- // entry point for FinalRequestProcessor.java
- public ProcessTxnResult processTxn(Request request) {
- TxnHeader hdr = request.getHdr();
- processTxnForSessionEvents(request, hdr, request.getTxn());
-
- final boolean writeRequest = (hdr != null);
- final boolean quorumRequest = request.isQuorum();
-
- // return fast w/o synchronization when we get a read
- if (!writeRequest && !quorumRequest) {
- return new ProcessTxnResult();
- }
- synchronized (outstandingChanges) {
- ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
-
- // request.hdr is set for write requests, which are the only ones
- // that add to outstandingChanges.
- if (writeRequest) {
- long zxid = hdr.getZxid();
- while (!outstandingChanges.isEmpty()
- && outstandingChanges.peek().zxid <= zxid) {
- ChangeRecord cr = outstandingChanges.remove();
- ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
- if (cr.zxid < zxid) {
- LOG.warn(
- "Zxid outstanding 0x{} is less than current 0x{}",
- Long.toHexString(cr.zxid),
- Long.toHexString(zxid));
- }
- if (outstandingChangesForPath.get(cr.path) == cr) {
- outstandingChangesForPath.remove(cr.path);
- }
- }
- }
-
- // do not add non quorum packets to the queue.
- if (quorumRequest) {
- getZKDatabase().addCommittedProposal(request);
- }
- return rc;
- }
- }
-
- private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
- int opCode = (request == null) ? hdr.getType() : request.type;
- long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
-
- if (opCode == OpCode.createSession) {
- if (hdr != null && txn instanceof CreateSessionTxn) {
- CreateSessionTxn cst = (CreateSessionTxn) txn;
- sessionTracker.commitSession(sessionId, cst.getTimeOut());
- } else if (request == null || !request.isLocalSession()) {
- LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString());
- }
- } else if (opCode == OpCode.closeSession) {
- sessionTracker.removeSession(sessionId);
- }
- }
-
- private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
- if (hdr == null) {
- return new ProcessTxnResult();
- } else {
- return getZKDatabase().processTxn(hdr, txn, digest);
- }
- }
-
- public Map<Long, Set<Long>> getSessionExpiryMap() {
- return sessionTracker.getSessionExpiryMap();
- }
-
- /**
- * This method is used to register the ZooKeeperServerShutdownHandler to get
- * server's error or shutdown state change notifications.
- * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
- * every server state changes {@link #setState(State)}.
- *
- * @param zkShutdownHandler shutdown handler
- */
- void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
- this.zkShutdownHandler = zkShutdownHandler;
- }
-
- public boolean isResponseCachingEnabled() {
- return isResponseCachingEnabled;
- }
-
- public void setResponseCachingEnabled(boolean isEnabled) {
- isResponseCachingEnabled = isEnabled;
- }
-
- public ResponseCache getReadResponseCache() {
- return isResponseCachingEnabled ? readResponseCache : null;
- }
-
- public ResponseCache getGetChildrenResponseCache() {
- return isResponseCachingEnabled ? getChildrenResponseCache : null;
- }
-
- protected void registerMetrics() {
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
-
- final ZKDatabase zkdb = this.getZKDatabase();
- final ServerStats stats = this.serverStats();
-
- rootContext.registerGauge("avg_latency", stats::getAvgLatency);
-
- rootContext.registerGauge("max_latency", stats::getMaxLatency);
- rootContext.registerGauge("min_latency", stats::getMinLatency);
-
- rootContext.registerGauge("packets_received", stats::getPacketsReceived);
- rootContext.registerGauge("packets_sent", stats::getPacketsSent);
- rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);
-
- rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
- rootContext.registerGauge("uptime", stats::getUptime);
-
- rootContext.registerGauge("znode_count", zkdb::getNodeCount);
-
- rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
- rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);
-
- rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);
-
- rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
- rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);
-
- OSMXBean osMbean = new OSMXBean();
- rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
- rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
- rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);
-
- rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
- rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
- rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);
-
- rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
- rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
- rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
- rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
-
- rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
- () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree()));
- rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE,
- () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree()));
- rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE,
- () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree()));
- rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE,
- () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree()));
- }
-
- protected void unregisterMetrics() {
-
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
-
- rootContext.unregisterGauge("avg_latency");
-
- rootContext.unregisterGauge("max_latency");
- rootContext.unregisterGauge("min_latency");
-
- rootContext.unregisterGauge("packets_received");
- rootContext.unregisterGauge("packets_sent");
- rootContext.unregisterGauge("num_alive_connections");
-
- rootContext.unregisterGauge("outstanding_requests");
- rootContext.unregisterGauge("uptime");
-
- rootContext.unregisterGauge("znode_count");
-
- rootContext.unregisterGauge("watch_count");
- rootContext.unregisterGauge("ephemerals_count");
- rootContext.unregisterGauge("approximate_data_size");
-
- rootContext.unregisterGauge("global_sessions");
- rootContext.unregisterGauge("local_sessions");
-
- rootContext.unregisterGauge("open_file_descriptor_count");
- rootContext.unregisterGauge("max_file_descriptor_count");
- rootContext.unregisterGauge("connection_drop_probability");
-
- rootContext.unregisterGauge("last_client_response_size");
- rootContext.unregisterGauge("max_client_response_size");
- rootContext.unregisterGauge("min_client_response_size");
-
- rootContext.unregisterGauge("auth_failed_count");
- rootContext.unregisterGauge("non_mtls_remote_conn_count");
- rootContext.unregisterGauge("non_mtls_local_conn_count");
-
- rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE);
- rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE);
- rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE);
- rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE);
- }
-
- /**
- * Hook into admin server, useful to expose additional data
- * that do not represent metrics.
- *
- * @param response a sink which collects the data.
- */
- public void dumpMonitorValues(BiConsumer<String, Object> response) {
- ServerStats stats = serverStats();
- response.accept("version", Version.getFullVersion());
- response.accept("server_state", stats.getServerState());
- }
-
- /**
- * Grant or deny authorization to an operation on a node as a function of:
- * @param cnxn : the server connection or null for admin server commands
- * @param acl : set of ACLs for the node
- * @param perm : the permission that the client is requesting
- * @param ids : the credentials supplied by the client
- * @param path : the ZNode path
- * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
- */
- public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
- if (skipACL) {
- return;
- }
-
- LOG.debug("Permission requested: {} ", perm);
- LOG.debug("ACLs for node: {}", acl);
- LOG.debug("Client credentials: {}", ids);
-
- if (acl == null || acl.size() == 0) {
- return;
- }
- for (Id authId : ids) {
- if (authId.getScheme().equals("super")) {
- return;
- }
- }
- for (ACL a : acl) {
- Id id = a.getId();
- if ((a.getPerms() & perm) != 0) {
- if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
- return;
- }
- ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
- if (ap != null) {
- for (Id authId : ids) {
- if (authId.getScheme().equals(id.getScheme())
- && ap.matches(
- new ServerAuthenticationProvider.ServerObjs(this, cnxn),
- new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
- return;
- }
- }
- }
- }
- }
- throw new KeeperException.NoAuthException();
- }
-
- /**
- * check a path whether exceeded the quota.
- *
- * @param path
- * the path of the node, used for the quota prefix check
- * @param lastData
- * the current node data, {@code null} for none
- * @param data
- * the data to be set, or {@code null} for none
- * @param type
- * currently, create and setData need to check quota
- */
- public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
- if (!enforceQuota) {
- return;
- }
- long dataBytes = (data == null) ? 0 : data.length;
- ZKDatabase zkDatabase = getZKDatabase();
- String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
- if (StringUtils.isEmpty(lastPrefix)) {
- return;
- }
-
- final String namespace = PathUtils.getTopNamespace(path);
- switch (type) {
- case OpCode.create:
- checkQuota(lastPrefix, dataBytes, 1, namespace);
- break;
- case OpCode.setData:
- checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace);
- break;
- default:
- throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
- }
- }
-
- /**
- * check a path whether exceeded the quota.
- *
- * @param lastPrefix
- the path of the node which has a quota.
- * @param bytesDiff
- * the diff to be added to number of bytes
- * @param countDiff
- * the diff to be added to the count
- * @param namespace
- * the namespace for collecting quota exceeded errors
- */
- private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace)
- throws KeeperException.QuotaExceededException {
- LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
-
- // now check the quota we set
- String limitNode = Quotas.limitPath(lastPrefix);
- DataNode node = getZKDatabase().getNode(limitNode);
- StatsTrack limitStats;
- if (node == null) {
- // should not happen
- LOG.error("Missing limit node for quota {}", limitNode);
- return;
- }
- synchronized (node) {
- limitStats = new StatsTrack(node.data);
- }
- //check the quota
- boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
- boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
-
- if (!checkCountQuota && !checkByteQuota) {
- return;
- }
-
- //check the statPath quota
- String statNode = Quotas.statPath(lastPrefix);
- node = getZKDatabase().getNode(statNode);
-
- StatsTrack currentStats;
- if (node == null) {
- // should not happen
- LOG.error("Missing node for stat {}", statNode);
- return;
- }
- synchronized (node) {
- currentStats = new StatsTrack(node.data);
- }
-
- //check the Count Quota
- if (checkCountQuota) {
- long newCount = currentStats.getCount() + countDiff;
- boolean isCountHardLimit = limitStats.getCountHardLimit() > -1;
- long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
-
- if (newCount > countLimit) {
- String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
- RATE_LOGGER.rateLimitLog(msg);
- if (isCountHardLimit) {
- updateQuotaExceededMetrics(namespace);
- throw new KeeperException.QuotaExceededException(lastPrefix);
- }
- }
- }
-
- //check the Byte Quota
- if (checkByteQuota) {
- long newBytes = currentStats.getBytes() + bytesDiff;
- boolean isByteHardLimit = limitStats.getByteHardLimit() > -1;
- long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
- if (newBytes > byteLimit) {
- String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
- RATE_LOGGER.rateLimitLog(msg);
- if (isByteHardLimit) {
- updateQuotaExceededMetrics(namespace);
- throw new KeeperException.QuotaExceededException(lastPrefix);
- }
- }
- }
- }
-
- public static boolean isDigestEnabled() {
- return digestEnabled;
- }
-
- public static void setDigestEnabled(boolean digestEnabled) {
- LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
- ZooKeeperServer.digestEnabled = digestEnabled;
- }
-
- public static boolean isSerializeLastProcessedZxidEnabled() {
- return serializeLastProcessedZxidEnabled;
- }
-
- public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
- serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
- LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
- }
-
- /**
- * Trim a path to get the immediate predecessor.
- *
- * @param path
- * @return
- * @throws KeeperException.BadArgumentsException
- */
- private String parentPath(String path) throws KeeperException.BadArgumentsException {
- int lastSlash = path.lastIndexOf('/');
- if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
- throw new KeeperException.BadArgumentsException(path);
- }
- return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
- }
-
- private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
- boolean mustCheckACL = false;
- String path = null;
- List<ACL> acl = null;
-
- switch (request.type) {
- case OpCode.create:
- case OpCode.create2: {
- CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
- if (req != null) {
- mustCheckACL = true;
- acl = req.getAcl();
- path = parentPath(req.getPath());
- }
- break;
- }
- case OpCode.delete: {
- DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
- if (req != null) {
- path = parentPath(req.getPath());
- }
- break;
- }
- case OpCode.setData: {
- SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
- if (req != null) {
- path = req.getPath();
- }
- break;
- }
- case OpCode.setACL: {
- SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
- if (req != null) {
- mustCheckACL = true;
- acl = req.getAcl();
- path = req.getPath();
- }
- break;
- }
- }
-
- if (mustCheckACL) {
- /* we ignore the extrapolated ACL returned by fixupACL because
- * we only care about it being well-formed (and if it isn't, an
- * exception will be raised).
- */
- PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
- }
-
- return path;
- }
-
- private int effectiveACLPerms(Request request) {
- switch (request.type) {
- case OpCode.create:
- case OpCode.create2:
- return ZooDefs.Perms.CREATE;
- case OpCode.delete:
- return ZooDefs.Perms.DELETE;
- case OpCode.setData:
- return ZooDefs.Perms.WRITE;
- case OpCode.setACL:
- return ZooDefs.Perms.ADMIN;
- default:
- return ZooDefs.Perms.ALL;
- }
- }
-
- /**
- * Check Write Requests for Potential Access Restrictions
- * <p>
- * Before a request is being proposed to the quorum, lets check it
- * against local ACLs. Non-write requests (read, session, etc.)
- * are passed along. Invalid requests are sent a response.
- * <p>
- * While we are at it, if the request will set an ACL: make sure it's
- * a valid one.
- *
- * @param request
- * @return true if request is permitted, false if not.
- */
- public boolean authWriteRequest(Request request) {
- int err;
- String pathToCheck;
-
- if (!enableEagerACLCheck) {
- return true;
- }
-
- err = KeeperException.Code.OK.intValue();
-
- try {
- pathToCheck = effectiveACLPath(request);
- if (pathToCheck != null) {
- checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
- }
- } catch (KeeperException.NoAuthException e) {
- LOG.debug("Request failed ACL check", e);
- err = e.code().intValue();
- } catch (KeeperException.InvalidACLException e) {
- LOG.debug("Request has an invalid ACL check", e);
- err = e.code().intValue();
- } catch (KeeperException.NoNodeException e) {
- LOG.debug("ACL check against non-existent node: {}", e.getMessage());
- } catch (KeeperException.BadArgumentsException e) {
- LOG.debug("ACL check against illegal node path: {}", e.getMessage());
- } catch (Throwable t) {
- LOG.error("Uncaught exception in authWriteRequest with: ", t);
- throw t;
- } finally {
- if (err != KeeperException.Code.OK.intValue()) {
- /* This request has a bad ACL, so we are dismissing it early. */
- decInProcess();
- ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
- try {
- request.cnxn.sendResponse(rh, null, null);
- } catch (IOException e) {
- LOG.error("IOException : {}", e);
- }
- }
- }
-
- return err == KeeperException.Code.OK.intValue();
- }
-
- public int getOutstandingHandshakeNum() {
- if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
- return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
- } else {
- return 0;
- }
- }
-
- public boolean isReconfigEnabled() {
- return this.reconfigEnabled;
- }
-
- public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
- return zkShutdownHandler;
- }
-
- static void updateQuotaExceededMetrics(final String namespace) {
- if (namespace == null) {
- return;
- }
- ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1);
- }
-}
-
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
deleted file mode 100644
index 1f629bed73d..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import javax.management.JMException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.metrics.MetricsContext;
-import org.apache.zookeeper.server.ContainerManager;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.PrepRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- *
- * Just like the standard ZooKeeperServer. We just replace the request
- * processors: PrepRequestProcessor -&gt; ProposalRequestProcessor -&gt;
- * CommitProcessor -&gt; Leader.ToBeAppliedRequestProcessor -&gt;
- * FinalRequestProcessor
- */
-public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
-
- private ContainerManager containerManager; // guarded by sync
-
- CommitProcessor commitProcessor;
-
- PrepRequestProcessor prepRequestProcessor;
-
- /**
- * @throws IOException
- */
- public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
- }
-
- public Leader getLeader() {
- return self.leader;
- }
-
- @Override
- protected void setupRequestProcessors() {
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
- commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
- commitProcessor.start();
- ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
- proposalProcessor.initialize();
- prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
- prepRequestProcessor.start();
- firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
-
- setupContainerManager();
- }
-
- private synchronized void setupContainerManager() {
- containerManager = new ContainerManager(
- getZKDatabase(),
- prepRequestProcessor,
- Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
- Integer.getInteger("znode.container.maxPerMinute", 10000),
- Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
- );
- }
-
- @Override
- public synchronized void startup() {
- super.startup();
- if (containerManager != null) {
- containerManager.start();
- }
- }
-
- @Override
- protected void registerMetrics() {
- super.registerMetrics();
-
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
- rootContext.registerGauge("learners", gaugeWithLeader(
- (leader) -> leader.getLearners().size())
- );
- rootContext.registerGauge("synced_followers", gaugeWithLeader(
- (leader) -> leader.getForwardingFollowers().size()
- ));
- rootContext.registerGauge("synced_non_voting_followers", gaugeWithLeader(
- (leader) -> leader.getNonVotingFollowers().size()
- ));
- rootContext.registerGauge("synced_observers", self::getSynced_observers_metric);
- rootContext.registerGauge("pending_syncs", gaugeWithLeader(
- (leader) -> leader.getNumPendingSyncs()
- ));
- rootContext.registerGauge("leader_uptime", gaugeWithLeader(
- (leader) -> leader.getUptime()
- ));
- rootContext.registerGauge("last_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getLastBufferSize()
- ));
- rootContext.registerGauge("max_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getMaxBufferSize()
- ));
- rootContext.registerGauge("min_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getMinBufferSize()
- ));
- }
-
- private org.apache.zookeeper.metrics.Gauge gaugeWithLeader(Function<Leader, Number> supplier) {
- return () -> {
- final Leader leader = getLeader();
- if (leader == null) {
- return null;
- }
- return supplier.apply(leader);
- };
- }
-
- @Override
- protected void unregisterMetrics() {
- super.unregisterMetrics();
-
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
- rootContext.unregisterGauge("learners");
- rootContext.unregisterGauge("synced_followers");
- rootContext.unregisterGauge("synced_non_voting_followers");
- rootContext.unregisterGauge("synced_observers");
- rootContext.unregisterGauge("pending_syncs");
- rootContext.unregisterGauge("leader_uptime");
-
- rootContext.unregisterGauge("last_proposal_size");
- rootContext.unregisterGauge("max_proposal_size");
- rootContext.unregisterGauge("min_proposal_size");
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (containerManager != null) {
- containerManager.stop();
- }
- super.shutdown(fullyShutDown);
- }
-
- @Override
- public int getGlobalOutstandingLimit() {
- int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
- int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor;
- return globalOutstandingLimit;
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LeaderSessionTracker(
- this,
- getZKDatabase().getSessionWithTimeOuts(),
- tickTime,
- self.getMyId(),
- self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- public boolean touch(long sess, int to) {
- return sessionTracker.touchSession(sess, to);
- }
-
- public boolean checkIfValidGlobalSession(long sess, int to) {
- if (self.areLocalSessionsEnabled() && !upgradeableSessionTracker.isGlobalSession(sess)) {
- return false;
- }
- return sessionTracker.touchSession(sess, to);
- }
-
- /**
- * Requests coming from the learner should go directly to
- * PrepRequestProcessor
- *
- * @param request
- */
- public void submitLearnerRequest(Request request) {
- /*
- * Requests coming from the learner should have gone through
- * submitRequest() on each server which already perform some request
- * validation, so we don't need to do it again.
- *
- * Additionally, LearnerHandler should start submitting requests into
- * the leader's pipeline only when the leader's server is started, so we
- * can submit the request directly into PrepRequestProcessor.
- *
- * This is done so that requests from learners won't go through
- * LeaderRequestProcessor which perform local session upgrade.
- */
- prepRequestProcessor.processRequest(request);
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(LeaderBean leaderBean, LocalPeerBean localPeerBean) {
- // register with JMX
- if (self.jmxLeaderElectionBean != null) {
- try {
- MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- jmxServerBean = leaderBean;
- MBeanRegistry.getInstance().register(leaderBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- boolean registerJMX(LearnerHandlerBean handlerBean) {
- try {
- MBeanRegistry.getInstance().register(handlerBean, jmxServerBean);
- return true;
- } catch (JMException e) {
- LOG.warn("Could not register connection", e);
- }
- return false;
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(Leader leader) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public String getState() {
- return "leader";
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getMyId();
- }
-
- @Override
- protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
- super.revalidateSession(cnxn, sessionId, sessionTimeout);
- try {
- // setowner as the leader itself, unless updated
- // via the follower handlers
- setOwner(sessionId, ServerCnxn.me);
- } catch (SessionExpiredException e) {
- // this is ok, it just means that the session revalidation failed.
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
deleted file mode 100644
index 3c7b2148400..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLSocket;
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.server.ExitCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.TxnLogEntry;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.util.ConfigUtils;
-import org.apache.zookeeper.server.util.MessageTracker;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.apache.zookeeper.util.ServiceUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the superclass of two of the three main actors in a ZK
- * ensemble: Followers and Observers. Both Followers and Observers share
- * a good deal of code which is moved into Peer to avoid duplication.
- */
-public class Learner {
-
- static class PacketInFlight {
-
- TxnHeader hdr;
- Record rec;
- TxnDigest digest;
-
- }
-
- QuorumPeer self;
- LearnerZooKeeperServer zk;
-
- protected BufferedOutputStream bufferedOutput;
-
- protected Socket sock;
- protected MultipleAddresses leaderAddr;
- protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
-
- /**
- * Socket getter
- */
- public Socket getSocket() {
- return sock;
- }
-
- LearnerSender sender = null;
- protected InputArchive leaderIs;
- protected OutputArchive leaderOs;
- /** the protocol version of the leader */
- protected int leaderProtocolVersion = 0x01;
-
- private static final int BUFFERED_MESSAGE_SIZE = 10;
- protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
-
- protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
-
- /**
- * Time to wait after connection attempt with the Leader or LearnerMaster before this
- * Learner tries to connect again.
- */
- private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
-
- private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
-
- public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
- private static boolean asyncSending =
- Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
- public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
- public static final boolean closeSocketAsync = Boolean
- .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
-
- static {
- LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
- LOG.info("TCP NoDelay set to: {}", nodelay);
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
- LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
- }
-
- final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>();
-
- public int getPendingRevalidationsCount() {
- return pendingRevalidations.size();
- }
-
- // for testing
- protected static void setAsyncSending(boolean newMode) {
- asyncSending = newMode;
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
-
- }
- protected static boolean getAsyncSending() {
- return asyncSending;
- }
- /**
- * validate a session for a client
- *
- * @param clientId
- * the client to be revalidated
- * @param timeout
- * the timeout for which the session is valid
- * @throws IOException
- */
- void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
- LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeLong(clientId);
- dos.writeInt(timeout);
- dos.close();
- QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
- pendingRevalidations.put(clientId, cnxn);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "To validate session 0x" + Long.toHexString(clientId));
- }
- writePacket(qp, true);
- }
-
- /**
- * write a packet to the leader.
- *
- * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
- * When packets are sent synchronously, writing is done within a synchronization block.
- * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
- * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
- * So we have only one thread writing to leaderOs at a time in either case.
- *
- * @param pp
- * the proposal packet to be sent to the leader
- * @throws IOException
- */
- void writePacket(QuorumPacket pp, boolean flush) throws IOException {
- if (asyncSending) {
- sender.queuePacket(pp);
- } else {
- writePacketNow(pp, flush);
- }
- }
-
- void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
- synchronized (leaderOs) {
- if (pp != null) {
- messageTracker.trackSent(pp.getType());
- leaderOs.writeRecord(pp, "packet");
- }
- if (flush) {
- bufferedOutput.flush();
- }
- }
- }
-
- /**
- * Start thread that will forward any packet in the queue to the leader
- */
- protected void startSendingThread() {
- sender = new LearnerSender(this);
- sender.start();
- }
-
- /**
- * read a packet from the leader
- *
- * @param pp
- * the packet to be instantiated
- * @throws IOException
- */
- void readPacket(QuorumPacket pp) throws IOException {
- synchronized (leaderIs) {
- leaderIs.readRecord(pp, "packet");
- messageTracker.trackReceived(pp.getType());
- }
- if (LOG.isTraceEnabled()) {
- final long traceMask =
- (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
- : ZooTrace.SERVER_PACKET_TRACE_MASK;
-
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
- }
- }
-
- /**
- * send a request packet to the leader
- *
- * @param request
- * the request from the client
- * @throws IOException
- */
- void request(Request request) throws IOException {
- if (request.isThrottled()) {
- LOG.error("Throttled request sent to leader: {}. Exiting", request);
- ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream oa = new DataOutputStream(baos);
- oa.writeLong(request.sessionId);
- oa.writeInt(request.cxid);
- oa.writeInt(request.type);
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- oa.write(payload);
- }
- oa.close();
- QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
- writePacket(qp, true);
- }
-
- /**
- * Returns the address of the node we think is the leader.
- */
- protected QuorumServer findLeader() {
- QuorumServer leaderServer = null;
- // Find the leader by id
- Vote current = self.getCurrentVote();
- for (QuorumServer s : self.getView().values()) {
- if (s.id == current.getId()) {
- // Ensure we have the leader's correct IP address before
- // attempting to connect.
- s.recreateSocketAddresses();
- leaderServer = s;
- break;
- }
- }
- if (leaderServer == null) {
- LOG.warn("Couldn't find the leader with id = {}", current.getId());
- }
- return leaderServer;
- }
-
- /**
- * Overridable helper method to return the System.nanoTime().
- * This method behaves identical to System.nanoTime().
- */
- protected long nanoTime() {
- return System.nanoTime();
- }
-
- /**
- * Overridable helper method to simply call sock.connect(). This can be
- * overriden in tests to fake connection success/failure for connectToLeader.
- */
- protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
- sock.connect(addr, timeout);
- }
-
- /**
- * Establish a connection with the LearnerMaster found by findLearnerMaster.
- * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
- * Retries until either initLimit time has elapsed or 5 tries have happened.
- * @param multiAddr - the address of the Peer to connect to.
- * @throws IOException - if the socket connection fails on the 5th attempt
- * if there is an authentication failure while connecting to leader
- */
- protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
-
- this.leaderAddr = multiAddr;
- Set<InetSocketAddress> addresses;
- if (self.isMultiAddressReachabilityCheckEnabled()) {
- // even if none of the addresses are reachable, we want to try to establish connection
- // see ZOOKEEPER-3758
- addresses = multiAddr.getAllReachableAddressesOrAll();
- } else {
- addresses = multiAddr.getAllAddresses();
- }
- ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
- CountDownLatch latch = new CountDownLatch(addresses.size());
- AtomicReference<Socket> socket = new AtomicReference<>(null);
- addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while trying to connect to Leader", e);
- } finally {
- executor.shutdown();
- try {
- if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.error("not all the LeaderConnector terminated properly");
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
- }
- }
-
- if (socket.get() == null) {
- throw new IOException("Failed connect to " + multiAddr);
- } else {
- sock = socket.get();
- sockBeingClosed.set(false);
- }
-
- self.authLearner.authenticate(sock, hostname);
-
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
- if (asyncSending) {
- startSendingThread();
- }
- }
-
- class LeaderConnector implements Runnable {
-
- private AtomicReference<Socket> socket;
- private InetSocketAddress address;
- private CountDownLatch latch;
-
- LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) {
- this.address = address;
- this.socket = socket;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- try {
- Thread.currentThread().setName("LeaderConnector-" + address);
- Socket sock = connectToLeader();
-
- if (sock != null && sock.isConnected()) {
- if (socket.compareAndSet(null, sock)) {
- LOG.info("Successfully connected to leader, using address: {}", address);
- } else {
- LOG.info("Connection to the leader is already established, close the redundant connection");
- sock.close();
- }
- }
-
- } catch (Exception e) {
- LOG.error("Failed connect to {}", address, e);
- } finally {
- latch.countDown();
- }
- }
-
- private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
- Socket sock = createSocket();
-
- // leader connection timeout defaults to tickTime * initLimit
- int connectTimeout = self.tickTime * self.initLimit;
-
- // but if connectToLearnerMasterLimit is specified, use that value to calculate
- // timeout instead of using the initLimit value
- if (self.connectToLearnerMasterLimit > 0) {
- connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
- }
-
- int remainingTimeout;
- long startNanoTime = nanoTime();
-
- for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
- try {
- // recalculate the init limit time because retries sleep for 1000 milliseconds
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
- if (remainingTimeout <= 0) {
- LOG.error("connectToLeader exceeded on retries.");
- throw new IOException("connectToLeader exceeded on retries.");
- }
-
- sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
- if (self.isSslQuorum()) {
- ((SSLSocket) sock).startHandshake();
- }
- sock.setTcpNoDelay(nodelay);
- break;
- } catch (IOException e) {
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
-
- if (remainingTimeout <= leaderConnectDelayDuringRetryMs) {
- LOG.error(
- "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else if (tries >= 4) {
- LOG.error(
- "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else {
- LOG.warn(
- "Unexpected exception, tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- sock = createSocket();
- }
- }
- Thread.sleep(leaderConnectDelayDuringRetryMs);
- }
-
- return sock;
- }
- }
-
- /**
- * Creating a simple or and SSL socket.
- * This can be overridden in tests to fake already connected sockets for connectToLeader.
- */
- protected Socket createSocket() throws X509Exception, IOException {
- Socket sock;
- if (self.isSslQuorum()) {
- sock = self.getX509Util().createSSLSocket();
- } else {
- sock = new Socket();
- }
- sock.setSoTimeout(self.tickTime * self.initLimit);
- return sock;
- }
-
- /**
- * Once connected to the leader or learner master, perform the handshake
- * protocol to establish a following / observing connection.
- * @param pktType
- * @return the zxid the Leader sends for synchronization purposes.
- * @throws IOException
- */
- protected long registerWithLeader(int pktType) throws IOException {
- /*
- * Send follower info, including last zxid and sid
- */
- long lastLoggedZxid = self.getLastLoggedZxid();
- QuorumPacket qp = new QuorumPacket();
- qp.setType(pktType);
- qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
-
- /*
- * Add sid to payload
- */
- LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
- ByteArrayOutputStream bsid = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
- boa.writeRecord(li, "LearnerInfo");
- qp.setData(bsid.toByteArray());
-
- writePacket(qp, true);
- readPacket(qp);
- final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
- if (qp.getType() == Leader.LEADERINFO) {
- // we are connected to a 1.0 server so accept the new epoch and read the next packet
- leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
- byte[] epochBytes = new byte[4];
- final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
- if (newEpoch > self.getAcceptedEpoch()) {
- wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
- self.setAcceptedEpoch(newEpoch);
- } else if (newEpoch == self.getAcceptedEpoch()) {
- // since we have already acked an epoch equal to the leaders, we cannot ack
- // again, but we still need to send our lastZxid to the leader so that we can
- // sync with it if it does assume leadership of the epoch.
- // the -1 indicates that this reply should not count as an ack for the new epoch
- wrappedEpochBytes.putInt(-1);
- } else {
- throw new IOException("Leaders epoch, "
- + newEpoch
- + " is less than accepted epoch, "
- + self.getAcceptedEpoch());
- }
- QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
- writePacket(ackNewEpoch, true);
- return ZxidUtils.makeZxid(newEpoch, 0);
- } else {
- if (newEpoch > self.getAcceptedEpoch()) {
- self.setAcceptedEpoch(newEpoch);
- }
- if (qp.getType() != Leader.NEWLEADER) {
- LOG.error("First packet should have been NEWLEADER");
- throw new IOException("First packet should have been NEWLEADER");
- }
- return qp.getZxid();
- }
- }
-
- /**
- * Finally, synchronize our history with the Leader (if Follower)
- * or the LearnerMaster (if Observer).
- * @param newLeaderZxid
- * @throws IOException
- * @throws InterruptedException
- */
- protected void syncWithLeader(long newLeaderZxid) throws Exception {
- long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
- QuorumVerifier newLeaderQV = null;
-
- class SyncHelper {
-
- // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot.
- // For SNAP and TRUNC the snapshot is needed to save that history.
- boolean willSnapshot = true;
- boolean syncSnapshot = false;
-
- // PROPOSALs received during sync, for matching up with COMMITs.
- Deque<PacketInFlight> proposals = new ArrayDeque<>();
-
- // PROPOSALs we delay forwarding to the ZK server until sync is done.
- Deque<PacketInFlight> delayedProposals = new ArrayDeque<>();
-
- // COMMITs we delay forwarding to the ZK server until sync is done.
- Deque<Long> delayedCommits = new ArrayDeque<>();
-
- void syncSnapshot() {
- syncSnapshot = true;
- }
-
- void noSnapshot() {
- willSnapshot = false;
- }
-
- void propose(PacketInFlight pif) {
- proposals.add(pif);
- delayedProposals.add(pif);
- }
-
- PacketInFlight nextProposal() {
- return proposals.peekFirst();
- }
-
- void commit() {
- PacketInFlight packet = proposals.remove();
- if (willSnapshot) {
- zk.processTxn(packet.hdr, packet.rec);
- delayedProposals.remove();
- } else {
- delayedCommits.add(packet.hdr.getZxid());
- }
- }
-
- void writeState() throws IOException, InterruptedException {
- // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER,
- // since this allows the leader to apply those transactions to its served state:
- if (willSnapshot) {
- zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions,
- willSnapshot = false; // but anything after this needs to go to the transaction log; or
- }
-
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
-
- // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first.
- fzk.syncProcessor.setDelayForwarding(true);
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- delayedProposals.clear();
- fzk.syncProcessor.syncFlush();
- }
-
- self.setCurrentEpoch(newEpoch);
- }
-
- void flushAcks() throws InterruptedException {
- if (zk instanceof FollowerZooKeeperServer) {
- // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState.
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(false);
- fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK.
- }
- }
-
- void applyDelayedPackets() {
- // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- for (Long zxid : delayedCommits) {
- fzk.commit(zxid);
- }
- } else if (zk instanceof ObserverZooKeeperServer) {
- ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- Long zxid = delayedCommits.peekFirst();
- if (p.hdr.getZxid() != zxid) {
- // log warning message if there is no matching commit
- // old leader send outstanding proposal to observer
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(zxid),
- Long.toHexString(p.hdr.getZxid()));
- continue;
- }
- delayedCommits.remove();
- Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
- request.setTxnDigest(p.digest);
- ozk.commitRequest(request);
- }
- } else {
- // New server type need to handle in-flight packets
- throw new UnsupportedOperationException("Unknown server type");
- }
- }
-
- }
-
- SyncHelper helper = new SyncHelper();
- QuorumPacket qp = new QuorumPacket();
- readPacket(qp);
- synchronized (zk) {
- if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
- self.setSyncMode(QuorumPeer.SyncMode.DIFF);
- if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
- LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
- helper.syncSnapshot();
- } else {
- helper.noSnapshot();
- }
- } else if (qp.getType() == Leader.SNAP) {
- self.setSyncMode(QuorumPeer.SyncMode.SNAP);
- LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
- // The leader is going to dump the database
- zk.getZKDatabase().deserializeSnapshot(leaderIs);
- // ZOOKEEPER-2819: overwrite config node content extracted
- // from leader snapshot with local config, to avoid potential
- // inconsistency of config node content during rolling restart.
- if (!self.isReconfigEnabled()) {
- LOG.debug("Reset config node content from local config after deserialization of snapshot.");
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- }
- String signature = leaderIs.readString("signature");
- if (!signature.equals("BenWasHere")) {
- LOG.error("Missing signature. Got {}", signature);
- throw new IOException("Missing signature");
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
- // Immediately persist the latest snapshot when there is txn log gap
- helper.syncSnapshot();
- } else if (qp.getType() == Leader.TRUNC) {
- // We need to truncate the log to the lastZxid of the leader
- self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
- LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
- boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
- if (!truncated) {
- LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- } else {
- LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- zk.createSessionTracker();
-
-
- // we are now going to start getting transactions to apply followed by an UPTODATE
- long lastQueued = 0;
- TxnLogEntry logEntry;
- outerLoop:
- while (self.isRunning()) {
- readPacket(qp);
- switch (qp.getType()) {
- case Leader.PROPOSAL:
- PacketInFlight pif = new PacketInFlight();
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- pif.hdr = logEntry.getHeader();
- pif.rec = logEntry.getTxn();
- pif.digest = logEntry.getDigest();
- if (pif.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(pif.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = pif.hdr.getZxid();
-
- if (pif.hdr.getType() == OpCode.reconfig) {
- SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
- QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- }
- helper.propose(pif);
- break;
- case Leader.COMMIT:
- case Leader.COMMITANDACTIVATE:
- pif = helper.nextProposal();
- if (pif.hdr.getZxid() != qp.getZxid()) {
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(qp.getZxid()),
- Long.toHexString(pif.hdr.getZxid()));
- } else {
- if (qp.getType() == Leader.COMMITANDACTIVATE) {
- tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid());
- }
- helper.commit();
- }
- break;
- case Leader.INFORM:
- case Leader.INFORMANDACTIVATE:
- PacketInFlight packet = new PacketInFlight();
- if (qp.getType() == Leader.INFORMANDACTIVATE) {
- ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
- long suggestedLeaderId = buffer.getLong();
- byte[] remainingData = new byte[buffer.remaining()];
- buffer.get(remainingData);
- logEntry = SerializeUtils.deserializeTxn(remainingData);
- packet.hdr = logEntry.getHeader();
- packet.rec = logEntry.getTxn();
- packet.digest = logEntry.getDigest();
- tryReconfig(packet, suggestedLeaderId, qp.getZxid());
- } else {
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- packet.rec = logEntry.getTxn();
- packet.hdr = logEntry.getHeader();
- packet.digest = logEntry.getDigest();
- // Log warning message if txn comes out-of-order
- if (packet.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(packet.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = packet.hdr.getZxid();
- }
- helper.propose(packet);
- helper.commit();
- break;
- case Leader.UPTODATE:
- LOG.info("Learner received UPTODATE message");
- if (newLeaderQV != null) {
- boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
- helper.flushAcks();
- self.setZooKeeperServer(zk);
- self.adminServer.setZooKeeperServer(zk);
- break outerLoop;
- case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
- LOG.info("Learner received NEWLEADER message");
- if (qp.getData() != null && qp.getData().length > 1) {
- try {
- QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- newLeaderQV = qv;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- helper.writeState();
- writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
- break;
- }
- }
- }
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
- ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
- writePacket(ack, true);
- zk.startServing();
- /*
- * Update the election vote here to ensure that all members of the
- * ensemble report the same vote to new servers that start up and
- * send leader election notifications to the ensemble.
- *
- * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
- */
- self.updateElectionVote(newEpoch);
-
- helper.applyDelayedPackets();
- }
-
- private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception {
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(qv, newLeader, zxid, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
-
- protected void revalidate(QuorumPacket qp) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
- DataInputStream dis = new DataInputStream(bis);
- long sessionId = dis.readLong();
- boolean valid = dis.readBoolean();
- ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
- if (cnxn == null) {
- LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId));
- } else {
- zk.finishSessionInit(cnxn, valid);
- }
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
- }
- }
-
- protected void ping(QuorumPacket qp) throws IOException {
- // Send back the ping with our session data
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- Map<Long, Integer> touchTable = zk.getTouchSnapshot();
- for (Entry<Long, Integer> entry : touchTable.entrySet()) {
- dos.writeLong(entry.getKey());
- dos.writeInt(entry.getValue());
- }
-
- QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
- writePacket(pingReply, true);
- }
-
- /**
- * Shutdown the Peer
- */
- public void shutdown() {
- self.setZooKeeperServer(null);
- self.closeAllConnections();
- self.adminServer.setZooKeeperServer(null);
-
- if (sender != null) {
- sender.shutdown();
- }
-
- closeSocket();
- // shutdown previous zookeeper
- if (zk != null) {
- // If we haven't finished SNAP sync, force fully shutdown
- // to avoid potential inconsistency
- zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
- }
- }
-
- boolean isRunning() {
- return self.isRunning() && zk.isRunning();
- }
-
- void closeSocket() {
- if (sock != null) {
- if (sockBeingClosed.compareAndSet(false, true)) {
- if (closeSocketAsync) {
- final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
- closingThread.setDaemon(true);
- closingThread.start();
- } else {
- closeSockSync();
- }
- }
- }
- }
-
- void closeSockSync() {
- try {
- long startTime = Time.currentElapsedTime();
- if (sock != null) {
- sock.close();
- sock = null;
- }
- ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
- } catch (IOException e) {
- LOG.warn("Ignoring error closing connection to leader", e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
deleted file mode 100644
index 99c4ae16dce..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.Map;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServerBean;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * Parent class for all ZooKeeperServers for Learners
- */
-public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
-
- /*
- * Request processors
- */
- protected CommitProcessor commitProcessor;
- protected SyncRequestProcessor syncProcessor;
-
- public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) throws IOException {
- super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self);
- }
-
- /**
- * Abstract method to return the learner associated with this server.
- * Since the Learner may change under our feet (when QuorumPeer reassigns
- * it) we can't simply take a reference here. Instead, we need the
- * subclasses to implement this.
- */
- public abstract Learner getLearner();
-
- /**
- * Returns the current state of the session tracker. This is only currently
- * used by a Learner to build a ping response packet.
- *
- */
- protected Map<Long, Integer> getTouchSnapshot() {
- if (sessionTracker != null) {
- return ((LearnerSessionTracker) sessionTracker).snapshot();
- }
- Map<Long, Integer> map = Map.of();
- return map;
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getMyId();
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(
- this,
- getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime,
- self.getMyId(),
- self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- @Override
- protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
- if (upgradeableSessionTracker.isLocalSession(sessionId)) {
- super.revalidateSession(cnxn, sessionId, sessionTimeout);
- } else {
- getLearner().validateSession(cnxn, sessionId, sessionTimeout);
- }
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
- // register with JMX
- if (self.jmxLeaderElectionBean != null) {
- try {
- MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- jmxServerBean = serverBean;
- MBeanRegistry.getInstance().register(serverBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(Learner peer) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- } else {
- LOG.info("Shutting down");
- try {
- if (syncProcessor != null) {
- // Shutting down the syncProcessor here, first, ensures queued transactions here are written to
- // permanent storage, which ensures that crash recovery data is consistent with what is used for a
- // leader election immediately following shutdown, because of the old leader going down; and also
- // that any state on its way to being written is also loaded in the potential call to
- // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader
- // that contains entries we have already written to our transaction log.
- syncProcessor.shutdown();
- }
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
- }
- }
- try {
- super.shutdown(fullyShutDown);
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception during shutdown", e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
deleted file mode 100644
index 1a44a98e6e7..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiConsumer;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A ZooKeeperServer for the Observer node type. Not much is different, but
- * we anticipate specializing the request processors in the future.
- *
- */
-public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(ObserverZooKeeperServer.class);
-
- /**
- * Enable since request processor for writing txnlog to disk and
- * take periodic snapshot. Default is ON.
- */
-
- private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
-
- /*
- * Pending sync requests
- */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<>();
-
- ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
- LOG.info("syncEnabled ={}", syncRequestProcessorEnabled);
- }
-
- public Observer getObserver() {
- return self.observer;
- }
-
- @Override
- public Learner getLearner() {
- return self.observer;
- }
-
- /**
- * Unlike a Follower, which sees a full request only during the PROPOSAL
- * phase, Observers get all the data required with the INFORM packet.
- * This method commits a request that has been unpacked by from an INFORM
- * received from the Leader.
- *
- * @param request
- */
- public void commitRequest(Request request) {
- if (syncProcessor != null) {
- // Write to txnlog and take periodic snapshot
- syncProcessor.processRequest(request);
- }
- commitProcessor.commit(request);
- }
-
- /**
- * Set up the request processors for an Observer:
- * firstProcesor-&gt;commitProcessor-&gt;finalProcessor
- */
- @Override
- protected void setupRequestProcessors() {
- // We might consider changing the processor behaviour of
- // Observers to, for example, remove the disk sync requirements.
- // Currently, they behave almost exactly the same as followers.
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
- commitProcessor.start();
- firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
- ((ObserverRequestProcessor) firstProcessor).start();
-
- /*
- * Observer should write to disk, so that the it won't request
- * too old txn from the leader which may lead to getting an entire
- * snapshot.
- *
- * However, this may degrade performance as it has to write to disk
- * and do periodic snapshot which may double the memory requirements
- */
- if (syncRequestProcessorEnabled) {
- syncProcessor = new SyncRequestProcessor(this, null);
- syncProcessor.start();
- }
- }
-
- /*
- * Process a sync request
- */
- public synchronized void sync() {
- if (pendingSyncs.size() == 0) {
- LOG.warn("Not expecting a sync.");
- return;
- }
-
- Request r = pendingSyncs.remove();
- commitProcessor.commit(r);
- }
-
- @Override
- public String getState() {
- return "observer";
- }
-
- @Override
- public void dumpMonitorValues(BiConsumer<String, Object> response) {
- super.dumpMonitorValues(response);
- response.accept("observer_master_id", getObserver().getLearnerMasterId());
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
deleted file mode 100644
index f6fc87d7716..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ /dev/null
@@ -1,2711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import javax.security.sasl.SaslException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException.BadArgumentsException;
-import org.apache.zookeeper.common.AtomicFileOutputStream;
-import org.apache.zookeeper.common.AtomicFileWritingIdiom;
-import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
-import org.apache.zookeeper.common.QuorumX509Util;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.admin.AdminServer;
-import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
-import org.apache.zookeeper.server.admin.AdminServerFactory;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
-import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
-import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.util.ConfigUtils;
-import org.apache.zookeeper.server.util.JvmPauseMonitor;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class manages the quorum protocol. There are three states this server
- * can be in:
- * <ol>
- * <li>Leader election - each server will elect a leader (proposing itself as a
- * leader initially).</li>
- * <li>Follower - the server will synchronize with the leader and replicate any
- * transactions.</li>
- * <li>Leader - the server will process requests and forward them to followers.
- * A majority of followers must log the request before it can be accepted.
- * </ol>
- *
- * This class will setup a datagram socket that will always respond with its
- * view of the current leader. The response will take the form of:
- *
- * <pre>
- * int xid;
- *
- * long myid;
- *
- * long leader_id;
- *
- * long leader_zxid;
- * </pre>
- *
- * The request for the current leader will consist solely of an xid: int xid;
- */
-public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
-
- private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);
-
- public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames";
- public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false";
-
- private QuorumBean jmxQuorumBean;
- LocalPeerBean jmxLocalPeerBean;
- private Map<Long, RemotePeerBean> jmxRemotePeerBean;
- LeaderElectionBean jmxLeaderElectionBean;
-
- // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
- // of updates; see the implementation comment at setLastSeenQuorumVerifier().
- private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
-
- QuorumAuthServer authServer;
- QuorumAuthLearner authLearner;
-
- /**
- * ZKDatabase is a top level member of quorumpeer
- * which will be used in all the zookeeperservers
- * instantiated later. Also, it is created once on
- * bootup and only thrown away in case of a truncate
- * message from the leader
- */
- private ZKDatabase zkDb;
-
- private JvmPauseMonitor jvmPauseMonitor;
-
- private final AtomicBoolean suspended = new AtomicBoolean(false);
-
- public static final class AddressTuple {
-
- public final MultipleAddresses quorumAddr;
- public final MultipleAddresses electionAddr;
- public final InetSocketAddress clientAddr;
-
- public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
- this.quorumAddr = quorumAddr;
- this.electionAddr = electionAddr;
- this.clientAddr = clientAddr;
- }
-
- }
-
- private int observerMasterPort;
-
- public int getObserverMasterPort() {
- return observerMasterPort;
- }
-
- public void setObserverMasterPort(int observerMasterPort) {
- this.observerMasterPort = observerMasterPort;
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled";
- public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false";
-
- private boolean multiAddressEnabled = true;
- public boolean isMultiAddressEnabled() {
- return multiAddressEnabled;
- }
-
- public void setMultiAddressEnabled(boolean multiAddressEnabled) {
- this.multiAddressEnabled = multiAddressEnabled;
- LOG.info("multiAddress.enabled set to {}", multiAddressEnabled);
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs";
-
- private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
- public int getMultiAddressReachabilityCheckTimeoutMs() {
- return multiAddressReachabilityCheckTimeoutMs;
- }
-
- public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) {
- this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs;
- LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs);
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled";
-
- private boolean multiAddressReachabilityCheckEnabled = true;
-
- public boolean isMultiAddressReachabilityCheckEnabled() {
- return multiAddressReachabilityCheckEnabled;
- }
-
- public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) {
- this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled;
- LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled);
- }
-
- public static class QuorumServer {
-
- public MultipleAddresses addr = new MultipleAddresses();
-
- public MultipleAddresses electionAddr = new MultipleAddresses();
-
- public InetSocketAddress clientAddr = null;
-
- public long id;
-
- public String hostname;
-
- public LearnerType type = LearnerType.PARTICIPANT;
-
- public boolean isClientAddrFromStatic = false;
-
- private List<InetSocketAddress> myAddrs;
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
- this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
- this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
- }
-
- // VisibleForTesting
- public QuorumServer(long id, InetSocketAddress addr) {
- this(id, addr, null, null, LearnerType.PARTICIPANT);
- }
-
- public long getId() {
- return id;
- }
-
- /**
- * Performs a DNS lookup for server address and election address.
- *
- * If the DNS lookup fails, this.addr and electionAddr remain
- * unmodified.
- */
- public void recreateSocketAddresses() {
- if (this.addr.isEmpty()) {
- LOG.warn("Server address has not been initialized");
- return;
- }
- if (this.electionAddr.isEmpty()) {
- LOG.warn("Election address has not been initialized");
- return;
- }
- this.addr.recreateSocketAddresses();
- this.electionAddr.recreateSocketAddresses();
- }
-
- private LearnerType getType(String s) throws ConfigException {
- switch (s.trim().toLowerCase()) {
- case "observer":
- return LearnerType.OBSERVER;
- case "participant":
- return LearnerType.PARTICIPANT;
- default:
- throw new ConfigException("Unrecognised peertype: " + s);
- }
- }
-
- public QuorumServer(long sid, String addressStr) throws ConfigException {
- this(sid, addressStr, QuorumServer::getInetAddress);
- }
-
- QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
- this.id = sid;
- initializeWithAddressString(addressStr, getInetAddress);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
- this(id, addr, electionAddr, null, type);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
- this.id = id;
- if (addr != null) {
- this.addr.addAddress(addr);
- }
- if (electionAddr != null) {
- this.electionAddr.addAddress(electionAddr);
- }
- this.type = type;
- this.clientAddr = clientAddr;
-
- setMyAddrs();
- }
-
- private static final String wrongFormat =
- " does not have the form server_config or server_config;client_config"
- + " where server_config is the pipe separated list of host:port:port or host:port:port:type"
- + " and client_config is port or host:port";
-
- private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
- LearnerType newType = null;
- String[] serverClientParts = addressStr.split(";");
- String[] serverAddresses = serverClientParts[0].split("\\|");
-
- if (serverClientParts.length == 2) {
- String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
- if (clientParts.length > 2) {
- throw new ConfigException(addressStr + wrongFormat);
- }
-
- // is client_config a host:port or just a port
- String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
- try {
- clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
- }
- }
-
- boolean multiAddressEnabled = Boolean.parseBoolean(
- System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
- if (!multiAddressEnabled && serverAddresses.length > 1) {
- throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id);
- }
-
- boolean canonicalize = Boolean.parseBoolean(
- System.getProperty(
- CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES,
- CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
-
- for (String serverAddress : serverAddresses) {
- String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
- if ((serverClientParts.length > 2) || (serverParts.length < 3)
- || (serverParts.length > 4)) {
- throw new ConfigException(addressStr + wrongFormat);
- }
-
- String serverHostName = serverParts[0];
-
- // server_config should be either host:port:port or host:port:port:type
- InetSocketAddress tempAddress;
- InetSocketAddress tempElectionAddress;
- try {
- tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1]));
- addr.addAddress(tempAddress);
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]);
- }
- try {
- tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2]));
- electionAddr.addAddress(tempElectionAddress);
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]);
- }
-
- if (tempAddress.getPort() == tempElectionAddress.getPort()) {
- throw new ConfigException("Client and election port must be different! Please update the "
- + "configuration file on server." + this.id);
- }
-
- if (canonicalize) {
- InetAddress ia = getInetAddress.apply(tempAddress);
- if (ia == null) {
- throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable");
- }
-
- String canonicalHostName = ia.getCanonicalHostName();
-
- if (!canonicalHostName.equals(serverHostName)
- // Avoid using literal IP address when
- // security check fails
- && !canonicalHostName.equals(ia.getHostAddress())) {
- LOG.info("Host name for quorum server {} "
- + "canonicalized from {} to {}",
- this.id, serverHostName, canonicalHostName);
- serverHostName = canonicalHostName;
- }
- }
-
- if (serverParts.length == 4) {
- LearnerType tempType = getType(serverParts[3]);
- if (newType == null) {
- newType = tempType;
- }
-
- if (newType != tempType) {
- throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType);
- }
- }
-
- this.hostname = serverHostName;
- }
-
- if (newType != null) {
- type = newType;
- }
-
- setMyAddrs();
- }
-
- private static InetAddress getInetAddress(InetSocketAddress addr) {
- return addr.getAddress();
- }
-
- private void setMyAddrs() {
- this.myAddrs = new ArrayList<>();
- this.myAddrs.addAll(this.addr.getAllAddresses());
- this.myAddrs.add(this.clientAddr);
- this.myAddrs.addAll(this.electionAddr.getAllAddresses());
- this.myAddrs = excludedSpecialAddresses(this.myAddrs);
- }
-
- public static String delimitedHostString(InetSocketAddress addr) {
- String host = addr.getHostString();
- if (host.contains(":")) {
- return "[" + host + "]";
- } else {
- return host;
- }
- }
-
- public String toString() {
- StringWriter sw = new StringWriter();
-
- List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses());
- List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses());
-
- if (addrList.size() > 0 && electionAddrList.size() > 0) {
- addrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
- electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
- sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d",
- delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort()))
- .collect(Collectors.joining("|")));
- }
-
- if (type == LearnerType.OBSERVER) {
- sw.append(":observer");
- } else if (type == LearnerType.PARTICIPANT) {
- sw.append(":participant");
- }
-
- if (clientAddr != null && !isClientAddrFromStatic) {
- sw.append(";");
- sw.append(delimitedHostString(clientAddr));
- sw.append(":");
- sw.append(String.valueOf(clientAddr.getPort()));
- }
-
- return sw.toString();
- }
-
- public int hashCode() {
- assert false : "hashCode not designed";
- return 42; // any arbitrary constant will do
- }
-
- private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
- return (addr1 != null || addr2 == null)
- && (addr1 == null || addr2 != null)
- && (addr1 == null || addr2 == null || addr1.equals(addr2));
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof QuorumServer)) {
- return false;
- }
- QuorumServer qs = (QuorumServer) o;
- if ((qs.id != id) || (qs.type != type)) {
- return false;
- }
- if (!addr.equals(qs.addr)) {
- return false;
- }
- if (!electionAddr.equals(qs.electionAddr)) {
- return false;
- }
- return checkAddressesEqual(clientAddr, qs.clientAddr);
- }
-
- public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
- List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
- otherAddrs.add(s.clientAddr);
- otherAddrs.addAll(s.electionAddr.getAllAddresses());
- otherAddrs = excludedSpecialAddresses(otherAddrs);
-
- for (InetSocketAddress my : this.myAddrs) {
-
- for (InetSocketAddress other : otherAddrs) {
- if (my.equals(other)) {
- String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id);
- throw new BadArgumentsException(error);
- }
- }
- }
- }
-
- private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) {
- List<InetSocketAddress> included = new ArrayList<>();
-
- for (InetSocketAddress addr : addrs) {
- if (addr == null) {
- continue;
- }
- InetAddress inetaddr = addr.getAddress();
-
- if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::])
- || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1)
- continue;
- }
- included.add(addr);
- }
- return included;
- }
-
- }
-
- public enum ServerState {
- LOOKING,
- FOLLOWING,
- LEADING,
- OBSERVING
- }
-
- /**
- * (Used for monitoring) shows the current phase of
- * Zab protocol that peer is running.
- */
- public enum ZabState {
- ELECTION,
- DISCOVERY,
- SYNCHRONIZATION,
- BROADCAST
- }
-
- /**
- * (Used for monitoring) When peer is in synchronization phase, this shows
- * which synchronization mechanism is being used
- */
- public enum SyncMode {
- NONE,
- DIFF,
- SNAP,
- TRUNC
- }
-
- /*
- * A peer can either be participating, which implies that it is willing to
- * both vote in instances of consensus and to elect or become a Leader, or
- * it may be observing in which case it isn't.
- *
- * We need this distinction to decide which ServerState to move to when
- * conditions change (e.g. which state to become after LOOKING).
- */
- public enum LearnerType {
- PARTICIPANT,
- OBSERVER
- }
-
- /*
- * To enable observers to have no identifier, we need a generic identifier
- * at least for QuorumCnxManager. We use the following constant to as the
- * value of such a generic identifier.
- */
-
- static final long OBSERVER_ID = Long.MAX_VALUE;
-
- /*
- * Record leader election time
- */
- public long start_fle, end_fle; // fle = fast leader election
- public static final String FLE_TIME_UNIT = "MS";
- private long unavailableStartTime;
-
- /*
- * Default value of peer is participant
- */
- private LearnerType learnerType = LearnerType.PARTICIPANT;
-
- public LearnerType getLearnerType() {
- return learnerType;
- }
-
- /**
- * Sets the LearnerType
- */
- public void setLearnerType(LearnerType p) {
- learnerType = p;
- }
-
- protected synchronized void setConfigFileName(String s) {
- configFilename = s;
- }
-
- private String configFilename = null;
-
- public int getQuorumSize() {
- return getVotingView().size();
- }
-
- public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
- this.jvmPauseMonitor = jvmPauseMonitor;
- }
-
- /**
- * QuorumVerifier implementation; default (majority).
- */
-
- //last committed quorum verifier
- private QuorumVerifier quorumVerifier;
-
- //last proposed quorum verifier
- private QuorumVerifier lastSeenQuorumVerifier = null;
-
- // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
- final Object QV_LOCK = new Object();
-
- /**
- * My id
- */
- private long myid;
-
- /**
- * get the id of this quorum peer.
- */
- public long getMyId() {
- return myid;
- }
-
- // VisibleForTesting
- void setId(long id) {
- this.myid = id;
- }
-
- private boolean sslQuorum;
- private boolean shouldUsePortUnification;
-
- public boolean isSslQuorum() {
- return sslQuorum;
- }
-
- public boolean shouldUsePortUnification() {
- return shouldUsePortUnification;
- }
-
- private final QuorumX509Util x509Util;
-
- QuorumX509Util getX509Util() {
- return x509Util;
- }
-
- /**
- * This is who I think the leader currently is.
- */
- private volatile Vote currentVote;
-
- public synchronized Vote getCurrentVote() {
- return currentVote;
- }
-
- public synchronized void setCurrentVote(Vote v) {
- currentVote = v;
- }
-
- private volatile boolean running = true;
-
- private String initialConfig;
-
- /**
- * The number of milliseconds of each tick
- */
- protected int tickTime;
-
- /**
- * Whether learners in this quorum should create new sessions as local.
- * False by default to preserve existing behavior.
- */
- protected boolean localSessionsEnabled = false;
-
- /**
- * Whether learners in this quorum should upgrade local sessions to
- * global. Only matters if local sessions are enabled.
- */
- protected boolean localSessionsUpgradingEnabled = true;
-
- /**
- * Minimum number of milliseconds to allow for session timeout.
- * A value of -1 indicates unset, use default.
- */
- protected int minSessionTimeout = -1;
-
- /**
- * Maximum number of milliseconds to allow for session timeout.
- * A value of -1 indicates unset, use default.
- */
- protected int maxSessionTimeout = -1;
-
- /**
- * The ZooKeeper server's socket backlog length. The number of connections
- * that will be queued to be read before new connections are dropped. A
- * value of one indicates the default backlog will be used.
- */
- protected int clientPortListenBacklog = -1;
-
- /**
- * The number of ticks that the initial synchronization phase can take
- */
- protected volatile int initLimit;
-
- /**
- * The number of ticks that can pass between sending a request and getting
- * an acknowledgment
- */
- protected volatile int syncLimit;
-
- /**
- * The number of ticks that can pass before retrying to connect to learner master
- */
- protected volatile int connectToLearnerMasterLimit;
-
- /**
- * Enables/Disables sync request processor. This option is enabled
- * by default and is to be used with observers.
- */
- protected boolean syncEnabled = true;
-
- /**
- * The current tick
- */
- protected AtomicInteger tick = new AtomicInteger();
-
- /**
- * Whether or not to listen on all IPs for the two quorum ports
- * (broadcast and fast leader election).
- */
- protected boolean quorumListenOnAllIPs = false;
-
- /**
- * Keeps time taken for leader election in milliseconds. Sets the value to
- * this variable only after the completion of leader election.
- */
- private long electionTimeTaken = -1;
-
- /**
- * Enable/Disables quorum authentication using sasl. Defaulting to false.
- */
- protected boolean quorumSaslEnableAuth;
-
- /**
- * If this is false, quorum peer server will accept another quorum peer client
- * connection even if the authentication did not succeed. This can be used while
- * upgrading ZooKeeper server. Defaulting to false (required).
- */
- protected boolean quorumServerSaslAuthRequired;
-
- /**
- * If this is false, quorum peer learner will talk to quorum peer server
- * without authentication. This can be used while upgrading ZooKeeper
- * server. Defaulting to false (required).
- */
- protected boolean quorumLearnerSaslAuthRequired;
-
- /**
- * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
- */
- protected String quorumServicePrincipal;
-
- /**
- * Quorum learner login context name in jaas-conf file to read the kerberos
- * security details. Defaulting to 'QuorumLearner'.
- */
- protected String quorumLearnerLoginContext;
-
- /**
- * Quorum server login context name in jaas-conf file to read the kerberos
- * security details. Defaulting to 'QuorumServer'.
- */
- protected String quorumServerLoginContext;
-
- // TODO: need to tune the default value of thread size
- private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
- /**
- * The maximum number of threads to allow in the connectionExecutors thread
- * pool which will be used to initiate quorum server connections.
- */
- protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
-
- public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
- private static int quorumCnxnTimeoutMs;
-
- static {
- quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1);
- LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs);
- }
-
- /**
- * @deprecated As of release 3.4.0, this class has been deprecated, since
- * it is used with one of the udp-based versions of leader election, which
- * we are also deprecating.
- *
- * This class simply responds to requests for the current leader of this
- * node.
- * <p>
- * The request contains just an xid generated by the requestor.
- * <p>
- * The response has the xid, the id of this server, the id of the leader,
- * and the zxid of the leader.
- *
- *
- */
- @Deprecated
- class ResponderThread extends ZooKeeperThread {
-
- ResponderThread() {
- super("ResponderThread");
- }
-
- volatile boolean running = true;
-
- @Override
- public void run() {
- try {
- byte[] b = new byte[36];
- ByteBuffer responseBuffer = ByteBuffer.wrap(b);
- DatagramPacket packet = new DatagramPacket(b, b.length);
- while (running) {
- udpSocket.receive(packet);
- if (packet.getLength() != 4) {
- LOG.warn("Got more than just an xid! Len = {}", packet.getLength());
- } else {
- responseBuffer.clear();
- responseBuffer.getInt(); // Skip the xid
- responseBuffer.putLong(myid);
- Vote current = getCurrentVote();
- switch (getPeerState()) {
- case LOOKING:
- responseBuffer.putLong(current.getId());
- responseBuffer.putLong(current.getZxid());
- break;
- case LEADING:
- responseBuffer.putLong(myid);
- try {
- long proposed;
- synchronized (leader) {
- proposed = leader.lastProposed;
- }
- responseBuffer.putLong(proposed);
- } catch (NullPointerException npe) {
- // This can happen in state transitions,
- // just ignore the request
- }
- break;
- case FOLLOWING:
- responseBuffer.putLong(current.getId());
- try {
- responseBuffer.putLong(follower.getZxid());
- } catch (NullPointerException npe) {
- // This can happen in state transitions,
- // just ignore the request
- }
- break;
- case OBSERVING:
- // Do nothing, Observers keep themselves to
- // themselves.
- break;
- }
- packet.setData(b);
- udpSocket.send(packet);
- }
- packet.setLength(b.length);
- }
- } catch (RuntimeException e) {
- LOG.warn("Unexpected runtime exception in ResponderThread", e);
- } catch (IOException e) {
- LOG.warn("Unexpected IO exception in ResponderThread", e);
- } finally {
- LOG.warn("QuorumPeer responder thread exited");
- }
- }
-
- }
-
- private ServerState state = ServerState.LOOKING;
-
- private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
- private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
- private AtomicReference<String> leaderAddress = new AtomicReference<>("");
- private AtomicLong leaderId = new AtomicLong(-1);
-
- private boolean reconfigFlag = false; // indicates that a reconfig just committed
-
- public synchronized void setPeerState(ServerState newState) {
- state = newState;
- if (newState == ServerState.LOOKING) {
- setLeaderAddressAndId(null, -1);
- setZabState(ZabState.ELECTION);
- } else {
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
- }
-
- public void setZabState(ZabState zabState) {
- if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) {
- long unavailableTime = Time.currentElapsedTime() - unavailableStartTime;
- ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime);
- if (getPeerState() == ServerState.LEADING) {
- ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime);
- }
- unavailableStartTime = 0;
- }
- this.zabState.set(zabState);
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
-
- public void setSyncMode(SyncMode syncMode) {
- this.syncMode.set(syncMode);
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
-
- public ZabState getZabState() {
- return zabState.get();
- }
-
- public SyncMode getSyncMode() {
- return syncMode.get();
- }
-
- public void setLeaderAddressAndId(MultipleAddresses addr, long newId) {
- if (addr != null) {
- leaderAddress.set(String.join("|", addr.getAllHostStrings()));
- } else {
- leaderAddress.set(null);
- }
- leaderId.set(newId);
- }
-
- public String getLeaderAddress() {
- return leaderAddress.get();
- }
-
- public long getLeaderId() {
- return leaderId.get();
- }
-
- public String getDetailedPeerState() {
- final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
- final ZabState zabState = getZabState();
- if (!ZabState.ELECTION.equals(zabState)) {
- sb.append(" - ").append(zabState.toString().toLowerCase());
- }
- final SyncMode syncMode = getSyncMode();
- if (!SyncMode.NONE.equals(syncMode)) {
- sb.append(" - ").append(syncMode.toString().toLowerCase());
- }
- return sb.toString();
- }
-
- public synchronized void reconfigFlagSet() {
- reconfigFlag = true;
- }
- public synchronized void reconfigFlagClear() {
- reconfigFlag = false;
- }
- public synchronized boolean isReconfigStateChange() {
- return reconfigFlag;
- }
- public synchronized ServerState getPeerState() {
- return state;
- }
-
- DatagramSocket udpSocket;
-
- private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
-
- /**
- * Resolves hostname for a given server ID.
- *
- * This method resolves hostname for a given server ID in both quorumVerifer
- * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
- * it also updates myAddrs.
- */
- public void recreateSocketAddresses(long id) {
- QuorumVerifier qv = getQuorumVerifier();
- if (qv != null) {
- QuorumServer qs = qv.getAllMembers().get(id);
- if (qs != null) {
- qs.recreateSocketAddresses();
- if (id == getMyId()) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
- }
- }
- }
- qv = getLastSeenQuorumVerifier();
- if (qv != null) {
- QuorumServer qs = qv.getAllMembers().get(id);
- if (qs != null) {
- qs.recreateSocketAddresses();
- }
- }
- }
-
- private AddressTuple getAddrs() {
- AddressTuple addrs = myAddrs.get();
- if (addrs != null) {
- return addrs;
- }
- try {
- synchronized (QV_LOCK) {
- addrs = myAddrs.get();
- while (addrs == null) {
- QV_LOCK.wait();
- addrs = myAddrs.get();
- }
- return addrs;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- public MultipleAddresses getQuorumAddress() {
- return getAddrs().quorumAddr;
- }
-
- public MultipleAddresses getElectionAddress() {
- return getAddrs().electionAddr;
- }
-
- public InetSocketAddress getClientAddress() {
- final AddressTuple addrs = myAddrs.get();
- return (addrs == null) ? null : addrs.clientAddr;
- }
-
- private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
- synchronized (QV_LOCK) {
- myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
- QV_LOCK.notifyAll();
- }
- }
-
- private int electionType;
-
- Election electionAlg;
-
- ServerCnxnFactory cnxnFactory;
- ServerCnxnFactory secureCnxnFactory;
-
- private FileTxnSnapLog logFactory = null;
-
- private final QuorumStats quorumStats;
-
- AdminServer adminServer;
-
- private final boolean reconfigEnabled;
-
- public static QuorumPeer testingQuorumPeer() throws SaslException {
- return new QuorumPeer();
- }
-
- public QuorumPeer() throws SaslException {
- super("QuorumPeer");
- quorumStats = new QuorumStats(this);
- jmxRemotePeerBean = new HashMap<>();
- adminServer = AdminServerFactory.createAdminServer();
- x509Util = createX509Util();
- initialize();
- reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
- }
-
- // VisibleForTesting
- QuorumX509Util createX509Util() {
- return new QuorumX509Util();
- }
-
- /**
- * For backward compatibility purposes, we instantiate QuorumMaj by default.
- */
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException {
- this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers));
- }
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
- this();
- this.cnxnFactory = cnxnFactory;
- this.electionType = electionType;
- this.myid = myid;
- this.tickTime = tickTime;
- this.initLimit = initLimit;
- this.syncLimit = syncLimit;
- this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
- this.quorumListenOnAllIPs = quorumListenOnAllIPs;
- this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
- this.zkDb = new ZKDatabase(this.logFactory);
- if (quorumConfig == null) {
- quorumConfig = new QuorumMaj(quorumPeers);
- }
- setQuorumVerifier(quorumConfig, false);
- adminServer = AdminServerFactory.createAdminServer();
- }
-
- public void initialize() throws SaslException {
- // init quorum auth server & learner
- if (isQuorumSaslAuthEnabled()) {
- Set<String> authzHosts = new HashSet<>();
- for (QuorumServer qs : getView().values()) {
- authzHosts.add(qs.hostname);
- }
- authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
- authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
- } else {
- authServer = new NullQuorumAuthServer();
- authLearner = new NullQuorumAuthLearner();
- }
- }
-
- QuorumStats quorumStats() {
- return quorumStats;
- }
-
- @Override
- public synchronized void start() {
- if (!getView().containsKey(myid)) {
- throw new RuntimeException("My id " + myid + " not in the peer list");
- }
- loadDataBase();
- startServerCnxnFactory();
- try {
- adminServer.start();
- } catch (AdminServerException e) {
- LOG.warn("Problem starting AdminServer", e);
- }
- startLeaderElection();
- startJvmPauseMonitor();
- super.start();
- }
-
- private void loadDataBase() {
- try {
- zkDb.loadDataBase();
-
- // load the epochs
- long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
- long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
- try {
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- } catch (FileNotFoundException e) {
- // pick a reasonable epoch number
- // this should only happen once when moving to a
- // new code version
- currentEpoch = epochOfZxid;
- LOG.info(
- "{} not found! Creating with a reasonable default of {}. "
- + "This should only happen when you are upgrading your installation",
- CURRENT_EPOCH_FILENAME,
- currentEpoch);
- writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
- }
- if (epochOfZxid > currentEpoch) {
- // acceptedEpoch.tmp file in snapshot directory
- File currentTmp = new File(getTxnFactory().getSnapDir(),
- CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
- if (currentTmp.exists()) {
- long epochOfTmp = readLongFromFile(currentTmp.getName());
- LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
- setCurrentEpoch(epochOfTmp);
- } else {
- throw new IOException(
- "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
- + ", is older than the last zxid, " + lastProcessedZxid);
- }
- }
- try {
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- } catch (FileNotFoundException e) {
- // pick a reasonable epoch number
- // this should only happen once when moving to a
- // new code version
- acceptedEpoch = epochOfZxid;
- LOG.info(
- "{} not found! Creating with a reasonable default of {}. "
- + "This should only happen when you are upgrading your installation",
- ACCEPTED_EPOCH_FILENAME,
- acceptedEpoch);
- writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
- }
- if (acceptedEpoch < currentEpoch) {
- throw new IOException("The accepted epoch, "
- + ZxidUtils.zxidToString(acceptedEpoch)
- + " is less than the current epoch, "
- + ZxidUtils.zxidToString(currentEpoch));
- }
- } catch (IOException ie) {
- LOG.error("Unable to load database on disk", ie);
- throw new RuntimeException("Unable to run quorum server ", ie);
- }
- }
-
- ResponderThread responder;
-
- public synchronized void stopLeaderElection() {
- responder.running = false;
- responder.interrupt();
- }
- public synchronized void startLeaderElection() {
- try {
- if (getPeerState() == ServerState.LOOKING) {
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
- }
- } catch (IOException e) {
- RuntimeException re = new RuntimeException(e.getMessage());
- re.setStackTrace(e.getStackTrace());
- throw re;
- }
-
- this.electionAlg = createElectionAlgorithm(electionType);
- }
-
- private void startJvmPauseMonitor() {
- if (this.jvmPauseMonitor != null) {
- this.jvmPauseMonitor.serviceStart();
- }
- }
-
- /**
- * Count the number of nodes in the map that could be followers.
- * @param peers
- * @return The number of followers in the map
- */
- protected static int countParticipants(Map<Long, QuorumServer> peers) {
- int count = 0;
- for (QuorumServer q : peers.values()) {
- if (q.type == LearnerType.PARTICIPANT) {
- count++;
- }
- }
- return count;
- }
-
- /**
- * This constructor is only used by the existing unit test code.
- * It defaults to FileLogProvider persistence provider.
- */
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- new QuorumMaj(quorumPeers));
- }
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- new QuorumOracleMaj(quorumPeers, oraclePath));
- }
-
- /**
- * This constructor is only used by the existing unit test code.
- * It defaults to FileLogProvider persistence provider.
- */
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- quorumConfig);
- }
-
- private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
- QuorumServer quorumServer = quorumPeers.get(myid);
- if (null == quorumServer) {
- throw new IOException("No QuorumServer correspoding to myid " + myid);
- }
- if (null == quorumServer.clientAddr) {
- return new InetSocketAddress(clientPort);
- }
- if (quorumServer.clientAddr.getPort() != clientPort) {
- throw new IOException("QuorumServer port "
- + quorumServer.clientAddr.getPort()
- + " does not match with given port "
- + clientPort);
- }
- return quorumServer.clientAddr;
- }
-
- /**
- * returns the highest zxid that this host has seen
- *
- * @return the highest zxid for this host
- */
- public long getLastLoggedZxid() {
- if (!zkDb.isInitialized()) {
- loadDataBase();
- }
- return zkDb.getDataTreeLastProcessedZxid();
- }
-
- public Follower follower;
- public Leader leader;
- public Observer observer;
-
- protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
- return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
- return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
- return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- @SuppressWarnings("deprecation")
- protected Election createElectionAlgorithm(int electionAlgorithm) {
- Election le = null;
-
- //TODO: use a factory rather than a switch
- switch (electionAlgorithm) {
- case 1:
- throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
- case 2:
- throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
- case 3:
- QuorumCnxManager qcm = createCnxnManager();
- QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
- if (oldQcm != null) {
- LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
- oldQcm.halt();
- }
- QuorumCnxManager.Listener listener = qcm.listener;
- if (listener != null) {
- listener.start();
- FastLeaderElection fle = new FastLeaderElection(this, qcm);
- fle.start();
- le = fle;
- } else {
- LOG.error("Null listener when initializing cnx manager");
- }
- break;
- default:
- assert false;
- }
- return le;
- }
-
- @SuppressWarnings("deprecation")
- protected Election makeLEStrategy() {
- LOG.debug("Initializing leader election protocol...");
- return electionAlg;
- }
-
- protected synchronized void setLeader(Leader newLeader) {
- leader = newLeader;
- }
-
- protected synchronized void setFollower(Follower newFollower) {
- follower = newFollower;
- }
-
- protected synchronized void setObserver(Observer newObserver) {
- observer = newObserver;
- }
-
- public synchronized ZooKeeperServer getActiveServer() {
- if (leader != null) {
- return leader.zk;
- } else if (follower != null) {
- return follower.zk;
- } else if (observer != null) {
- return observer.zk;
- }
- return null;
- }
-
- boolean shuttingDownLE = false;
-
- public void setSuspended(boolean suspended) {
- this.suspended.set(suspended);
- }
- private void checkSuspended() {
- try {
- while (suspended.get()) {
- Thread.sleep(10);
- }
- } catch (InterruptedException err) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void run() {
- updateThreadName();
-
- LOG.debug("Starting quorum peer");
- try {
- jmxQuorumBean = new QuorumBean(this);
- MBeanRegistry.getInstance().register(jmxQuorumBean, null);
- for (QuorumServer s : getView().values()) {
- ZKMBeanInfo p;
- if (getMyId() == s.id) {
- p = jmxLocalPeerBean = new LocalPeerBean(this);
- try {
- MBeanRegistry.getInstance().register(p, jmxQuorumBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxLocalPeerBean = null;
- }
- } else {
- RemotePeerBean rBean = new RemotePeerBean(this, s);
- try {
- MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
- jmxRemotePeerBean.put(s.id, rBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- }
- }
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxQuorumBean = null;
- }
-
- try {
- /*
- * Main loop
- */
- while (running) {
- if (unavailableStartTime == 0) {
- unavailableStartTime = Time.currentElapsedTime();
- }
-
- switch (getPeerState()) {
- case LOOKING:
- LOG.info("LOOKING");
- ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
-
- if (Boolean.getBoolean("readonlymode.enabled")) {
- LOG.info("Attempting to start ReadOnlyZooKeeperServer");
-
- // Create read-only server but don't start it immediately
- final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
-
- // Instead of starting roZk immediately, wait some grace
- // period before we decide we're partitioned.
- //
- // Thread is used here because otherwise it would require
- // changes in each of election strategy classes which is
- // unnecessary code coupling.
- Thread roZkMgr = new Thread() {
- public void run() {
- try {
- // lower-bound grace period to 2 secs
- sleep(Math.max(2000, tickTime));
- if (ServerState.LOOKING.equals(getPeerState())) {
- roZk.startup();
- }
- } catch (InterruptedException e) {
- LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
- } catch (Exception e) {
- LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
- }
- }
- };
- try {
- roZkMgr.start();
- reconfigFlagClear();
- if (shuttingDownLE) {
- shuttingDownLE = false;
- startLeaderElection();
- }
- setCurrentVote(makeLEStrategy().lookForLeader());
- checkSuspended();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- setPeerState(ServerState.LOOKING);
- } finally {
- // If the thread is in the the grace period, interrupt
- // to come out of waiting.
- roZkMgr.interrupt();
- roZk.shutdown();
- }
- } else {
- try {
- reconfigFlagClear();
- if (shuttingDownLE) {
- shuttingDownLE = false;
- startLeaderElection();
- }
- setCurrentVote(makeLEStrategy().lookForLeader());
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- setPeerState(ServerState.LOOKING);
- }
- }
- break;
- case OBSERVING:
- try {
- LOG.info("OBSERVING");
- setObserver(makeObserver(logFactory));
- observer.observeLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- observer.shutdown();
- setObserver(null);
- updateServerState();
-
- // Add delay jitter before we switch to LOOKING
- // state to reduce the load of ObserverMaster
- if (isRunning()) {
- Observer.waitForObserverElectionDelay();
- }
- }
- break;
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING");
- setFollower(makeFollower(logFactory));
- follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- follower.shutdown();
- setFollower(null);
- updateServerState();
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown");
- setLeader(null);
- }
- updateServerState();
- }
- break;
- }
- }
- } finally {
- LOG.warn("QuorumPeer main thread exited");
- MBeanRegistry instance = MBeanRegistry.getInstance();
- instance.unregister(jmxQuorumBean);
- instance.unregister(jmxLocalPeerBean);
-
- for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
- instance.unregister(remotePeerBean);
- }
-
- jmxQuorumBean = null;
- jmxLocalPeerBean = null;
- jmxRemotePeerBean = null;
- }
- }
-
- private synchronized void updateServerState() {
- if (!reconfigFlag) {
- setPeerState(ServerState.LOOKING);
- LOG.warn("PeerState set to LOOKING");
- return;
- }
-
- if (getMyId() == getCurrentVote().getId()) {
- setPeerState(ServerState.LEADING);
- LOG.debug("PeerState set to LEADING");
- } else if (getLearnerType() == LearnerType.PARTICIPANT) {
- setPeerState(ServerState.FOLLOWING);
- LOG.debug("PeerState set to FOLLOWING");
- } else if (getLearnerType() == LearnerType.OBSERVER) {
- setPeerState(ServerState.OBSERVING);
- LOG.debug("PeerState set to OBSERVER");
- } else { // currently shouldn't happen since there are only 2 learner types
- setPeerState(ServerState.LOOKING);
- LOG.debug("Should not be here");
- }
- reconfigFlag = false;
- }
-
- public void shutdown() {
- running = false;
- x509Util.close();
- if (leader != null) {
- leader.shutdown("quorum Peer shutdown");
- }
- if (follower != null) {
- follower.shutdown();
- }
- shutdownServerCnxnFactory();
- if (udpSocket != null) {
- udpSocket.close();
- }
- if (jvmPauseMonitor != null) {
- jvmPauseMonitor.serviceStop();
- }
-
- try {
- adminServer.shutdown();
- } catch (AdminServerException e) {
- LOG.warn("Problem stopping AdminServer", e);
- }
-
- if (getElectionAlg() != null) {
- this.interrupt();
- getElectionAlg().shutdown();
- }
- try {
- zkDb.close();
- } catch (IOException ie) {
- LOG.warn("Error closing logs ", ie);
- }
- }
-
- /**
- * A 'view' is a node's current opinion of the membership of the entire
- * ensemble.
- */
- public Map<Long, QuorumPeer.QuorumServer> getView() {
- return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
- }
-
- /**
- * Observers are not contained in this view, only nodes with
- * PeerType=PARTICIPANT.
- */
- public Map<Long, QuorumPeer.QuorumServer> getVotingView() {
- return getQuorumVerifier().getVotingMembers();
- }
-
- /**
- * Returns only observers, no followers.
- */
- public Map<Long, QuorumPeer.QuorumServer> getObservingView() {
- return getQuorumVerifier().getObservingMembers();
- }
-
- public synchronized Set<Long> getCurrentAndNextConfigVoters() {
- Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet());
- if (getLastSeenQuorumVerifier() != null) {
- voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet());
- }
- return voterIds;
- }
-
- /**
- * Check if a node is in the current view. With static membership, the
- * result of this check will never change; only when dynamic membership
- * is introduced will this be more useful.
- */
- public boolean viewContains(Long sid) {
- return this.getView().containsKey(sid);
- }
-
- /**
- * Only used by QuorumStats at the moment
- */
- public String[] getQuorumPeers() {
- List<String> l = new ArrayList<>();
- synchronized (this) {
- if (leader != null) {
- for (LearnerHandler fh : leader.getLearners()) {
- if (fh.getSocket() != null) {
- String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress());
- if (leader.isLearnerSynced(fh)) {
- s += "*";
- }
- l.add(s);
- }
- }
- } else if (follower != null) {
- l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress()));
- }
- }
- return l.toArray(new String[0]);
- }
-
- public String getServerState() {
- switch (getPeerState()) {
- case LOOKING:
- return QuorumStats.Provider.LOOKING_STATE;
- case LEADING:
- return QuorumStats.Provider.LEADING_STATE;
- case FOLLOWING:
- return QuorumStats.Provider.FOLLOWING_STATE;
- case OBSERVING:
- return QuorumStats.Provider.OBSERVING_STATE;
- }
- return QuorumStats.Provider.UNKNOWN_STATE;
- }
-
- /**
- * set the id of this quorum peer.
- */
- public void setMyid(long myid) {
- this.myid = myid;
- }
-
- public void setInitialConfig(String initialConfig) {
- this.initialConfig = initialConfig;
- }
-
- public String getInitialConfig() {
- return initialConfig;
- }
-
- /**
- * Get the number of milliseconds of each tick
- */
- public int getTickTime() {
- return tickTime;
- }
-
- /**
- * Set the number of milliseconds of each tick
- */
- public void setTickTime(int tickTime) {
- LOG.info("tickTime set to {}", tickTime);
- this.tickTime = tickTime;
- }
-
- /** Maximum number of connections allowed from particular host (ip) */
- public int getMaxClientCnxnsPerHost() {
- if (cnxnFactory != null) {
- return cnxnFactory.getMaxClientCnxnsPerHost();
- }
- if (secureCnxnFactory != null) {
- return secureCnxnFactory.getMaxClientCnxnsPerHost();
- }
- return -1;
- }
-
- /** Whether local sessions are enabled */
- public boolean areLocalSessionsEnabled() {
- return localSessionsEnabled;
- }
-
- /** Whether to enable local sessions */
- public void enableLocalSessions(boolean flag) {
- LOG.info("Local sessions {}", (flag ? "enabled" : "disabled"));
- localSessionsEnabled = flag;
- }
-
- /** Whether local sessions are allowed to upgrade to global sessions */
- public boolean isLocalSessionsUpgradingEnabled() {
- return localSessionsUpgradingEnabled;
- }
-
- /** Whether to allow local sessions to upgrade to global sessions */
- public void enableLocalSessionsUpgrading(boolean flag) {
- LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled"));
- localSessionsUpgradingEnabled = flag;
- }
-
- /** minimum session timeout in milliseconds */
- public int getMinSessionTimeout() {
- return minSessionTimeout;
- }
-
- /** minimum session timeout in milliseconds */
- public void setMinSessionTimeout(int min) {
- LOG.info("minSessionTimeout set to {}", min);
- this.minSessionTimeout = min;
- }
-
- /** maximum session timeout in milliseconds */
- public int getMaxSessionTimeout() {
- return maxSessionTimeout;
- }
-
- /** maximum session timeout in milliseconds */
- public void setMaxSessionTimeout(int max) {
- LOG.info("maxSessionTimeout set to {}", max);
- this.maxSessionTimeout = max;
- }
-
- /** The server socket's listen backlog length */
- public int getClientPortListenBacklog() {
- return this.clientPortListenBacklog;
- }
-
- /** Sets the server socket's listen backlog length. */
- public void setClientPortListenBacklog(int backlog) {
- this.clientPortListenBacklog = backlog;
- }
-
- /**
- * Get the number of ticks that the initial synchronization phase can take
- */
- public int getInitLimit() {
- return initLimit;
- }
-
- /**
- * Set the number of ticks that the initial synchronization phase can take
- */
- public void setInitLimit(int initLimit) {
- LOG.info("initLimit set to {}", initLimit);
- this.initLimit = initLimit;
- }
-
- /**
- * Get the current tick
- */
- public int getTick() {
- return tick.get();
- }
-
- public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
- Properties props = new Properties();
- props.load(new StringReader(s));
- return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath());
- }
-
- /**
- * Return QuorumVerifier object for the last committed configuration.
- */
- public QuorumVerifier getQuorumVerifier() {
- synchronized (QV_LOCK) {
- return quorumVerifier;
- }
- }
-
- /**
- * Return QuorumVerifier object for the last proposed configuration.
- */
- public QuorumVerifier getLastSeenQuorumVerifier() {
- synchronized (QV_LOCK) {
- return lastSeenQuorumVerifier;
- }
- }
-
- public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) {
- if (qvOLD == null || !qvOLD.equals(qvNEW)) {
- LOG.warn("Restarting Leader Election");
- getElectionAlg().shutdown();
- shuttingDownLE = false;
- startLeaderElection();
- }
- }
-
- public String getNextDynamicConfigFilename() {
- if (configFilename == null) {
- LOG.warn("configFilename is null! This should only happen in tests.");
- return null;
- }
- return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
- }
-
- // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
- // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
- // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
- // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
- // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
- private void connectNewPeers(QuorumCnxManager qcm) {
- if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
- Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
- for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
- if (e.getKey() != getMyId() && !committedView.containsKey(e.getKey())) {
- qcm.connectOne(e.getKey());
- }
- }
- }
- }
-
- public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
- if (!isReconfigEnabled()) {
- LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
- return;
- }
-
- // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
- // and then take QV_LOCK. Take the locks in the same order to ensure that we don't
- // deadlock against other callers of connectOne(). If qcmRef gets set in another
- // thread while we're inside the synchronized block, that does no harm; if we didn't
- // take a lock on qcm (because it was null when we sampled it), we won't call
- // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility
- // of updates that provably happen in another thread before entering this method.)
- QuorumCnxManager qcm = qcmRef.get();
- Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
- synchronized (outerLockObject) {
- synchronized (QV_LOCK) {
- if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
- LOG.error("setLastSeenQuorumVerifier called with stale config "
- + qv.getVersion()
- + ". Current version: "
- + quorumVerifier.getVersion());
- }
- // assuming that a version uniquely identifies a configuration, so if
- // version is the same, nothing to do here.
- if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
- return;
- }
- lastSeenQuorumVerifier = qv;
- if (qcm != null) {
- connectNewPeers(qcm);
- }
-
- if (writeToDisk) {
- try {
- String fileName = getNextDynamicConfigFilename();
- if (fileName != null) {
- QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
- }
- } catch (IOException e) {
- LOG.error("Error writing next dynamic config file to disk", e);
- }
- }
- }
- }
- }
-
- public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
- synchronized (QV_LOCK) {
- if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
- // this is normal. For example - server found out about new config through FastLeaderElection gossiping
- // and then got the same config in UPTODATE message so its already known
- LOG.debug(
- "{} setQuorumVerifier called with known or old config {}. Current version: {}",
- getMyId(),
- qv.getVersion(),
- quorumVerifier.getVersion());
- return quorumVerifier;
- }
- QuorumVerifier prevQV = quorumVerifier;
- quorumVerifier = qv;
- if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
- lastSeenQuorumVerifier = qv;
- }
-
- if (writeToDisk) {
- // some tests initialize QuorumPeer without a static config file
- if (configFilename != null) {
- try {
- String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
- QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
- QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
- } catch (IOException e) {
- LOG.error("Error closing file", e);
- }
- } else {
- LOG.info("writeToDisk == true but configFilename == null");
- }
- }
-
- if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
- QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
- }
- QuorumServer qs = qv.getAllMembers().get(getMyId());
- if (qs != null) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
- }
- updateObserverMasterList();
- return prevQV;
- }
- }
-
- private String makeDynamicConfigFilename(long version) {
- return configFilename + ".dynamic." + Long.toHexString(version);
- }
-
- private boolean needEraseClientInfoFromStaticConfig() {
- QuorumServer server = quorumVerifier.getAllMembers().get(getMyId());
- return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
- }
-
- /**
- * Get an instance of LeaderElection
- */
- public Election getElectionAlg() {
- return electionAlg;
- }
-
- /**
- * Get the synclimit
- */
- public int getSyncLimit() {
- return syncLimit;
- }
-
- /**
- * Set the synclimit
- */
- public void setSyncLimit(int syncLimit) {
- LOG.info("syncLimit set to {}", syncLimit);
- this.syncLimit = syncLimit;
- }
-
- /**
- * Get the connectToLearnerMasterLimit
- */
- public int getConnectToLearnerMasterLimit() {
- return connectToLearnerMasterLimit;
- }
-
- /**
- * Set the connectToLearnerMasterLimit
- */
- public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
- LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit);
- this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
- }
-
- /**
- * The syncEnabled can also be set via a system property.
- */
- public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
-
- /**
- * Return syncEnabled.
- */
- public boolean getSyncEnabled() {
- if (System.getProperty(SYNC_ENABLED) != null) {
- LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED));
- return Boolean.getBoolean(SYNC_ENABLED);
- } else {
- return syncEnabled;
- }
- }
-
- /**
- * Set syncEnabled.
- *
- * @param syncEnabled
- */
- public void setSyncEnabled(boolean syncEnabled) {
- this.syncEnabled = syncEnabled;
- }
-
- /**
- * Gets the election type
- */
- public int getElectionType() {
- return electionType;
- }
-
- /**
- * Sets the election type
- */
- public void setElectionType(int electionType) {
- this.electionType = electionType;
- }
-
- public boolean getQuorumListenOnAllIPs() {
- return quorumListenOnAllIPs;
- }
-
- public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
- this.quorumListenOnAllIPs = quorumListenOnAllIPs;
- }
-
- public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
- this.cnxnFactory = cnxnFactory;
- }
-
- public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) {
- this.secureCnxnFactory = secureCnxnFactory;
- }
-
- public void setSslQuorum(boolean sslQuorum) {
- if (sslQuorum) {
- LOG.info("Using TLS encrypted quorum communication");
- } else {
- LOG.info("Using insecure (non-TLS) quorum communication");
- }
- this.sslQuorum = sslQuorum;
- }
-
- public void setUsePortUnification(boolean shouldUsePortUnification) {
- LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
- this.shouldUsePortUnification = shouldUsePortUnification;
- }
-
- private void startServerCnxnFactory() {
- if (cnxnFactory != null) {
- cnxnFactory.start();
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.start();
- }
- }
-
- private void shutdownServerCnxnFactory() {
- if (cnxnFactory != null) {
- cnxnFactory.shutdown();
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.shutdown();
- }
- }
-
- // Leader and learner will control the zookeeper server and pass it into QuorumPeer.
- public void setZooKeeperServer(ZooKeeperServer zks) {
- if (cnxnFactory != null) {
- cnxnFactory.setZooKeeperServer(zks);
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.setZooKeeperServer(zks);
- }
- }
-
- public void closeAllConnections() {
- if (cnxnFactory != null) {
- cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
- }
- }
-
- public int getClientPort() {
- if (cnxnFactory != null) {
- return cnxnFactory.getLocalPort();
- }
- return -1;
- }
-
- public int getSecureClientPort() {
- if (secureCnxnFactory != null) {
- return secureCnxnFactory.getLocalPort();
- }
- return -1;
- }
-
- public void setTxnFactory(FileTxnSnapLog factory) {
- this.logFactory = factory;
- }
-
- public FileTxnSnapLog getTxnFactory() {
- return this.logFactory;
- }
-
- /**
- * set zk database for this node
- * @param database
- */
- public void setZKDatabase(ZKDatabase database) {
- this.zkDb = database;
- }
-
- protected ZKDatabase getZkDb() {
- return zkDb;
- }
-
- public synchronized void initConfigInZKDatabase() {
- if (zkDb != null) {
- zkDb.initConfigInZKDatabase(getQuorumVerifier());
- }
- }
-
- public boolean isRunning() {
- return running;
- }
-
- /**
- * get reference to QuorumCnxManager
- */
- public QuorumCnxManager getQuorumCnxManager() {
- return qcmRef.get();
- }
- private long readLongFromFile(String name) throws IOException {
- File file = new File(logFactory.getSnapDir(), name);
- BufferedReader br = new BufferedReader(new FileReader(file));
- String line = "";
- try {
- line = br.readLine();
- return Long.parseLong(line);
- } catch (NumberFormatException e) {
- throw new IOException("Found " + line + " in " + file);
- } finally {
- br.close();
- }
- }
-
- private long acceptedEpoch = -1;
- private long currentEpoch = -1;
-
- public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
-
- public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
-
- /**
- * Write a long value to disk atomically. Either succeeds or an exception
- * is thrown.
- * @param name file name to write the long to
- * @param value the long value to write to the named file
- * @throws IOException if the file cannot be written atomically
- */
- // visibleForTest
- void writeLongToFile(String name, final long value) throws IOException {
- File file = new File(logFactory.getSnapDir(), name);
- new AtomicFileWritingIdiom(file, new WriterStatement() {
- @Override
- public void write(Writer bw) throws IOException {
- bw.write(Long.toString(value));
- }
- });
- }
-
- public long getCurrentEpoch() throws IOException {
- if (currentEpoch == -1) {
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- }
- return currentEpoch;
- }
-
- public long getAcceptedEpoch() throws IOException {
- if (acceptedEpoch == -1) {
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- }
- return acceptedEpoch;
- }
-
- public void setCurrentEpoch(long e) throws IOException {
- writeLongToFile(CURRENT_EPOCH_FILENAME, e);
- currentEpoch = e;
- }
-
- public void setAcceptedEpoch(long e) throws IOException {
- writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
- acceptedEpoch = e;
- }
-
- public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
- if (!isReconfigEnabled()) {
- LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
- return false;
- }
-
- InetSocketAddress oldClientAddr = getClientAddress();
-
- // update last committed quorum verifier, write the new config to disk
- // and restart leader election if config changed.
- QuorumVerifier prevQV = setQuorumVerifier(qv, true);
-
- // There is no log record for the initial config, thus after syncing
- // with leader
- // /zookeeper/config is empty! it is also possible that last committed
- // config is propagated during leader election
- // without the propagation the corresponding log records.
- // so we should explicitly do this (this is not necessary when we're
- // already a Follower/Observer, only
- // for Learner):
- initConfigInZKDatabase();
-
- if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
- Map<Long, QuorumServer> newMembers = qv.getAllMembers();
- updateRemotePeerMXBeans(newMembers);
- if (restartLE) {
- restartLeaderElection(prevQV, qv);
- }
-
- QuorumServer myNewQS = newMembers.get(getMyId());
- if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
- cnxnFactory.reconfigure(myNewQS.clientAddr);
- updateThreadName();
- }
-
- boolean roleChange = updateLearnerType(qv);
- boolean leaderChange = false;
- if (suggestedLeaderId != null) {
- // zxid should be non-null too
- leaderChange = updateVote(suggestedLeaderId, zxid);
- } else {
- long currentLeaderId = getCurrentVote().getId();
- QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId);
- QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId);
- leaderChange = (myleaderInCurQV == null
- || myleaderInCurQV.addr == null
- || myleaderInNewQV == null
- || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr));
- // we don't have a designated leader - need to go into leader
- // election
- reconfigFlagClear();
- }
-
- return roleChange || leaderChange;
- }
- return false;
-
- }
-
- private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) {
- Set<Long> existingMembers = new HashSet<>(newMembers.keySet());
- existingMembers.retainAll(jmxRemotePeerBean.keySet());
- for (Long id : existingMembers) {
- RemotePeerBean rBean = jmxRemotePeerBean.get(id);
- rBean.setQuorumServer(newMembers.get(id));
- }
-
- Set<Long> joiningMembers = new HashSet<>(newMembers.keySet());
- joiningMembers.removeAll(jmxRemotePeerBean.keySet());
- joiningMembers.remove(getMyId()); // remove self as it is local bean
- for (Long id : joiningMembers) {
- QuorumServer qs = newMembers.get(id);
- RemotePeerBean rBean = new RemotePeerBean(this, qs);
- try {
- MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
- jmxRemotePeerBean.put(qs.id, rBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- }
-
- Set<Long> leavingMembers = new HashSet<>(jmxRemotePeerBean.keySet());
- leavingMembers.removeAll(newMembers.keySet());
- for (Long id : leavingMembers) {
- RemotePeerBean rBean = jmxRemotePeerBean.remove(id);
- try {
- MBeanRegistry.getInstance().unregister(rBean);
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- }
- }
-
- private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
- private void updateObserverMasterList() {
- if (observerMasterPort <= 0) {
- return; // observer masters not enabled
- }
- observerMasters.clear();
- StringBuilder sb = new StringBuilder();
- for (QuorumServer server : quorumVerifier.getVotingMembers().values()) {
- InetAddress address = server.addr.getReachableOrOne().getAddress();
- InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort);
- observerMasters.add(new QuorumServer(server.id, addr));
- sb.append(addr).append(",");
- }
- LOG.info("Updated learner master list to be {}", sb.toString());
- Collections.shuffle(observerMasters);
- // Reset the internal index of the observerMaster when
- // the observerMaster List is refreshed
- nextObserverMaster = 0;
- }
-
- private boolean useObserverMasters() {
- return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0;
- }
-
- private int nextObserverMaster = 0;
- private QuorumServer nextObserverMaster() {
- if (nextObserverMaster >= observerMasters.size()) {
- nextObserverMaster = 0;
- // Add a reconnect delay only after the observer
- // has exhausted trying to connect to all the masters
- // from the observerMasterList
- if (isRunning()) {
- Observer.waitForReconnectDelay();
- }
- }
- return observerMasters.get(nextObserverMaster++);
- }
-
- QuorumServer findLearnerMaster(QuorumServer leader) {
- if (useObserverMasters()) {
- return nextObserverMaster();
- } else {
- // Add delay jitter to reduce the load on the leader
- if (isRunning()) {
- Observer.waitForReconnectDelay();
- }
- return leader;
- }
- }
-
- /**
- * Vet a given learner master's information.
- * Allows specification by server id, ip only, or ip and port
- */
- QuorumServer validateLearnerMaster(String desiredMaster) {
- if (useObserverMasters()) {
- Long sid;
- try {
- sid = Long.parseLong(desiredMaster);
- } catch (NumberFormatException e) {
- sid = null;
- }
- for (QuorumServer server : observerMasters) {
- if (sid == null) {
- for (InetSocketAddress address : server.addr.getAllAddresses()) {
- String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort();
- if (serverAddr.startsWith(desiredMaster)) {
- return server;
- }
- }
- } else {
- if (sid.equals(server.id)) {
- return server;
- }
- }
- }
- if (sid == null) {
- LOG.info("could not find learner master address={}", desiredMaster);
- } else {
- LOG.warn("could not find learner master sid={}", sid);
- }
- } else {
- LOG.info("cannot validate request, observer masters not enabled");
- }
- return null;
- }
-
- private boolean updateLearnerType(QuorumVerifier newQV) {
- //check if I'm an observer in new config
- if (newQV.getObservingMembers().containsKey(getMyId())) {
- if (getLearnerType() != LearnerType.OBSERVER) {
- setLearnerType(LearnerType.OBSERVER);
- LOG.info("Becoming an observer");
- reconfigFlagSet();
- return true;
- } else {
- return false;
- }
- } else if (newQV.getVotingMembers().containsKey(getMyId())) {
- if (getLearnerType() != LearnerType.PARTICIPANT) {
- setLearnerType(LearnerType.PARTICIPANT);
- LOG.info("Becoming a voting participant");
- reconfigFlagSet();
- return true;
- } else {
- return false;
- }
- }
- // I'm not in the view
- if (getLearnerType() != LearnerType.PARTICIPANT) {
- setLearnerType(LearnerType.PARTICIPANT);
- LOG.info("Becoming a non-voting participant");
- reconfigFlagSet();
- return true;
- }
- return false;
- }
-
- private boolean updateVote(long designatedLeader, long zxid) {
- Vote currentVote = getCurrentVote();
- if (currentVote != null && designatedLeader != currentVote.getId()) {
- setCurrentVote(new Vote(designatedLeader, zxid));
- reconfigFlagSet();
- LOG.warn("Suggested leader: {}", designatedLeader);
- return true;
- }
- return false;
- }
-
- /**
- * Updates leader election info to avoid inconsistencies when
- * a new server tries to join the ensemble.
- *
- * Here is the inconsistency scenario we try to solve by updating the peer
- * epoch after following leader:
- *
- * Let's say we have an ensemble with 3 servers z1, z2 and z3.
- *
- * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is
- * 0xb9, aka current accepted epoch on disk.
- * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading
- * the current accept epoch from disk.
- * 3. z2 received notification from z1 and z3, which is following z3 with
- * epoch 0xb8, so it started following z3 again with peer epoch 0xb8.
- * 4. before z2 successfully connected to z3, z3 get restarted with new
- * epoch 0xb9.
- * 5. z2 will retry around a few round (default 5s) before giving up,
- * meanwhile it will report z3 as leader.
- * 6. z1 restarted, and looking with peer epoch 0xb9.
- * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9.
- * 8. z2 successfully connected to z3 before giving up, but with peer
- * epoch 0xb8.
- * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot
- * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting
- * 0xb9.
- *
- * By updating the election vote after actually following leader, we can
- * avoid this kind of stuck happened.
- *
- * Btw, the zxid and electionEpoch could be inconsistent because of the same
- * reason, it's better to update these as well after syncing with leader, but
- * that required protocol change which is non trivial. This problem is worked
- * around by skipping comparing the zxid and electionEpoch when counting for
- * votes for out of election servers during looking for leader.
- *
- * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732
- */
- protected void updateElectionVote(long newEpoch) {
- Vote currentVote = getCurrentVote();
- if (currentVote != null) {
- setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote
- .getState()));
- }
- }
-
- private void updateThreadName() {
- String plain = cnxnFactory != null
- ? cnxnFactory.getLocalAddress() != null
- ? formatInetAddr(cnxnFactory.getLocalAddress())
- : "disabled"
- : "disabled";
- String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled";
- setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getMyId(), plain, secure));
- }
-
- /**
- * Sets the time taken for leader election in milliseconds.
- *
- * @param electionTimeTaken time taken for leader election
- */
- void setElectionTimeTaken(long electionTimeTaken) {
- this.electionTimeTaken = electionTimeTaken;
- }
-
- /**
- * @return the time taken for leader election in milliseconds.
- */
- long getElectionTimeTaken() {
- return electionTimeTaken;
- }
-
- void setQuorumServerSaslRequired(boolean serverSaslRequired) {
- quorumServerSaslAuthRequired = serverSaslRequired;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired);
- }
-
- void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
- quorumLearnerSaslAuthRequired = learnerSaslRequired;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired);
- }
-
- void setQuorumSaslEnabled(boolean enableAuth) {
- quorumSaslEnableAuth = enableAuth;
- if (!quorumSaslEnableAuth) {
- LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)");
- } else {
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
- }
- }
-
- void setQuorumServicePrincipal(String servicePrincipal) {
- quorumServicePrincipal = servicePrincipal;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal);
- }
-
- void setQuorumLearnerLoginContext(String learnerContext) {
- quorumLearnerLoginContext = learnerContext;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext);
- }
-
- void setQuorumServerLoginContext(String serverContext) {
- quorumServerLoginContext = serverContext;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext);
- }
-
- void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
- if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
- quorumCnxnThreadsSize = qCnxnThreadsSize;
- }
- LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
- }
-
- boolean isQuorumSaslAuthEnabled() {
- return quorumSaslEnableAuth;
- }
-
- private boolean isQuorumServerSaslAuthRequired() {
- return quorumServerSaslAuthRequired;
- }
-
- private boolean isQuorumLearnerSaslAuthRequired() {
- return quorumLearnerSaslAuthRequired;
- }
-
- public QuorumCnxManager createCnxnManager() {
- int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
- LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
- return new QuorumCnxManager(
- this,
- this.getMyId(),
- this.getView(),
- this.authServer,
- this.authLearner,
- timeout,
- this.getQuorumListenOnAllIPs(),
- this.quorumCnxnThreadsSize,
- this.isQuorumSaslAuthEnabled());
- }
-
- boolean isLeader(long id) {
- Vote vote = getCurrentVote();
- return vote != null && id == vote.getId();
- }
-
- public boolean isReconfigEnabled() {
- return reconfigEnabled;
- }
-
- @InterfaceAudience.Private
- /**
- * This is a metric that depends on the status of the peer.
- */ public Integer getSynced_observers_metric() {
- if (leader != null) {
- return leader.getObservingLearners().size();
- } else if (follower != null) {
- return follower.getSyncedObserverSize();
- } else {
- return null;
- }
- }
-
- /**
- * Create a new QuorumPeer and apply all the values per the already-parsed config.
- *
- * @param config The appertained quorum peer config.
- * @return A QuorumPeer instantiated with specified peer config. Note this peer
- * is not fully initialized; caller should finish initialization through
- * additional configurations (connection factory settings, etc).
- *
- * @throws IOException
- */
- public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException {
- QuorumPeer quorumPeer = new QuorumPeer();
- quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
- quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
- quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
- quorumPeer.setElectionType(config.getElectionAlg());
- quorumPeer.setMyid(config.getServerId());
- quorumPeer.setTickTime(config.getTickTime());
- quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
- quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
- quorumPeer.setInitLimit(config.getInitLimit());
- quorumPeer.setSyncLimit(config.getSyncLimit());
- quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
- quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
- quorumPeer.setConfigFileName(config.getConfigFilename());
- quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
- quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
- if (config.getLastSeenQuorumVerifier() != null) {
- quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
- }
- quorumPeer.initConfigInZKDatabase();
- quorumPeer.setSslQuorum(config.isSslQuorum());
- quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
- quorumPeer.setLearnerType(config.getPeerType());
- quorumPeer.setSyncEnabled(config.getSyncEnabled());
- quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
- if (config.sslQuorumReloadCertFiles) {
- quorumPeer.getX509Util().enableCertFileReloading();
- }
- quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
- quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
- quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
-
- // sets quorum sasl authentication configurations
- quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
- if (quorumPeer.isQuorumSaslAuthEnabled()) {
- quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
- quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
- quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
- quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
- quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
- }
- quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
-
- if (config.jvmPauseMonitorToRun) {
- quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
- }
-
- return quorumPeer;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
deleted file mode 100644
index a96a395b03b..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Objects;
-import java.util.stream.Collectors;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.PrepRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServerBean;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-/**
- * A ZooKeeperServer which comes into play when peer is partitioned from the
- * majority. Handles read-only clients, but drops connections from not-read-only
- * ones.
- * <p>
- * The very first processor in the chain of request processors is a
- * ReadOnlyRequestProcessor which drops state-changing requests.
- */
-public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
-
- protected final QuorumPeer self;
- private volatile boolean shutdown = false;
-
- ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
- super(
- logFactory,
- self.tickTime,
- self.minSessionTimeout,
- self.maxSessionTimeout,
- self.clientPortListenBacklog,
- zkDb,
- self.getInitialConfig(),
- self.isReconfigEnabled());
- this.self = self;
- }
-
- @Override
- protected void setupRequestProcessors() {
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
- ((PrepRequestProcessor) prepProcessor).start();
- firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
- ((ReadOnlyRequestProcessor) firstProcessor).start();
- }
-
- @Override
- public synchronized void startup() {
- // check to avoid startup follows shutdown
- if (shutdown) {
- LOG.warn("Not starting Read-only server as startup follows shutdown!");
- return;
- }
- registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean);
- super.startup();
- self.setZooKeeperServer(this);
- self.adminServer.setZooKeeperServer(this);
- LOG.info("Read-only server started");
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(
- this, getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime, self.getMyId(), self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- @Override
- protected void startSessionTracker() {
- ((LearnerSessionTracker) sessionTracker).start();
- }
-
- @Override
- protected void setLocalSessionFlag(Request si) {
- switch (si.type) {
- case OpCode.createSession:
- if (self.areLocalSessionsEnabled()) {
- si.setLocalSession(true);
- }
- break;
- case OpCode.closeSession:
- if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) {
- si.setLocalSession(true);
- } else {
- LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode",
- Long.toHexString(si.sessionId));
- }
- break;
- default:
- break;
- }
- }
-
- @Override
- protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
- if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
- String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
- LOG.info(msg);
- throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
- }
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
- // register with JMX
- try {
- jmxServerBean = serverBean;
- MBeanRegistry.getInstance().register(serverBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(ZooKeeperServer zks) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public String getState() {
- return "read-only";
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getMyId();
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- } else {
- shutdown = true;
- unregisterJMX(this);
-
- // set peer's server to null
- self.setZooKeeperServer(null);
- // clear all the connections
- self.closeAllConnections();
-
- self.adminServer.setZooKeeperServer(null);
- }
- // shutdown the server itself
- super.shutdown(fullyShutDown);
- }
-
- @Override
- public void dumpConf(PrintWriter pwriter) {
- super.dumpConf(pwriter);
-
- pwriter.print("initLimit=");
- pwriter.println(self.getInitLimit());
- pwriter.print("syncLimit=");
- pwriter.println(self.getSyncLimit());
- pwriter.print("electionAlg=");
- pwriter.println(self.getElectionType());
- pwriter.print("electionPort=");
- pwriter.println(self.getElectionAddress().getAllPorts()
- .stream().map(Objects::toString).collect(Collectors.joining("|")));
- pwriter.print("quorumPort=");
- pwriter.println(self.getQuorumAddress().getAllPorts()
- .stream().map(Objects::toString).collect(Collectors.joining("|")));
- pwriter.print("peerType=");
- pwriter.println(self.getLearnerType().ordinal());
- }
-
- @Override
- protected void setState(State state) {
- this.state = state;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
deleted file mode 100644
index d65ead216f0..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import java.io.Flushable;
-import java.io.IOException;
-import java.net.Socket;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SendAckRequestProcessor implements RequestProcessor, Flushable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
-
- final Learner learner;
-
- SendAckRequestProcessor(Learner peer) {
- this.learner = peer;
- }
-
- public void processRequest(Request si) {
- if (si.type != OpCode.sync) {
- QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
- try {
- si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
-
- learner.writePacket(qp, false);
- } catch (IOException e) {
- LOG.warn("Closing connection to leader, exception during packet send", e);
- try {
- if (!learner.sock.isClosed()) {
- learner.sock.close();
- }
- } catch (IOException e1) {
- // Nothing to do, we are shutting things down, so an exception here is irrelevant
- LOG.debug("Ignoring error closing the connection", e1);
- }
- }
- }
- }
-
- public void flush() throws IOException {
- try {
- learner.writePacket(null, true);
- } catch (IOException e) {
- LOG.warn("Closing connection to leader, exception during packet send", e);
- try {
- Socket socket = learner.sock;
- if (socket != null && !socket.isClosed()) {
- learner.sock.close();
- }
- } catch (IOException e1) {
- // Nothing to do, we are shutting things down, so an exception here is irrelevant
- LOG.debug("Ignoring error closing the connection", e1);
- }
- }
- }
-
- public void shutdown() {
- // Nothing needed
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server/CMakeLists.txt b/zookeeper-server/zookeeper-server/CMakeLists.txt
index c7e9679ed24..30ed1ed4404 100644
--- a/zookeeper-server/zookeeper-server/CMakeLists.txt
+++ b/zookeeper-server/zookeeper-server/CMakeLists.txt
@@ -1,4 +1,4 @@
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-install_jar(zookeeper-server-3.9.1-jar-with-dependencies.jar)
+install_jar(zookeeper-server-3.9.2-jar-with-dependencies.jar)
# Make symlink so that we have a default version, should be done only in zookeeper-server module
-install_symlink(lib/jars/zookeeper-server-3.9.1-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar)
+install_symlink(lib/jars/zookeeper-server-3.9.2-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server/pom.xml b/zookeeper-server/zookeeper-server/pom.xml
index f1b33dd0ae7..791c026234a 100644
--- a/zookeeper-server/zookeeper-server/pom.xml
+++ b/zookeeper-server/zookeeper-server/pom.xml
@@ -8,11 +8,11 @@
<version>8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>zookeeper-server-3.9.1</artifactId>
+ <artifactId>zookeeper-server-3.9.2</artifactId>
<packaging>container-plugin</packaging>
<version>8-SNAPSHOT</version>
<properties>
- <zookeeper.version>3.9.1</zookeeper.version>
+ <zookeeper.version>3.9.2</zookeeper.version>
</properties>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index 891a35582b3..c74a020bcf4 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
@@ -26,6 +26,7 @@ public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin {
private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName());
+
@SuppressWarnings("try")
@Override
public void reconfigure(String connectionSpec, String servers) throws ReconfigException {
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 895bbeffa5f..00af31b46d4 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -384,13 +384,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
+ " minSessionTimeout {} ms"
+ " maxSessionTimeout {} ms"
+ " clientPortListenBacklog {}"
- + " datadir {}"
+ + " dataLogdir {}"
+ " snapdir {}",
tickTime,
getMinSessionTimeout(),
getMaxSessionTimeout(),
getClientPortListenBacklog(),
- txnLogFactory.getDataDir(),
+ txnLogFactory.getDataLogDir(),
txnLogFactory.getSnapDir());
}
@@ -442,7 +442,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
pwriter.print("dataDirSize=");
pwriter.println(getDataDirSize());
pwriter.print("dataLogDir=");
- pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
+ pwriter.println(zkDb.snapLog.getDataLogDir().getAbsolutePath());
pwriter.print("dataLogSize=");
pwriter.println(getLogDirSize());
pwriter.print("tickTime=");
@@ -464,7 +464,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return new ZooKeeperServerConf(
getClientPort(),
zkDb.snapLog.getSnapDir().getAbsolutePath(),
- zkDb.snapLog.getDataDir().getAbsolutePath(),
+ zkDb.snapLog.getDataLogDir().getAbsolutePath(),
getTickTime(),
getMaxClientCnxnsPerHost(),
getMinSessionTimeout(),
@@ -649,7 +649,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
if (zkDb == null) {
return 0L;
}
- File path = zkDb.snapLog.getDataDir();
+ File path = zkDb.snapLog.getSnapDir();
return getDirSize(path);
}
@@ -658,7 +658,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
if (zkDb == null) {
return 0L;
}
- File path = zkDb.snapLog.getSnapDir();
+ File path = zkDb.snapLog.getDataLogDir();
return getDirSize(path);
}
@@ -1506,7 +1506,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
- String msg = "Refusing session request for client "
+ String msg = "Refusing session(0x"
+ + Long.toHexString(sessionId)
+ + ") request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(request.getLastZxidSeen())
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8d8b6dabce8..3c7b2148400 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -597,7 +597,6 @@ public class Learner {
willSnapshot = false; // but anything after this needs to go to the transaction log; or
}
- self.setCurrentEpoch(newEpoch);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
@@ -613,6 +612,8 @@ public class Learner {
delayedProposals.clear();
fzk.syncProcessor.syncFlush();
}
+
+ self.setCurrentEpoch(newEpoch);
}
void flushAcks() throws InterruptedException {