From 8973f674dea9cacb820069821ae50035fb96e442 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Mon, 30 Jan 2023 16:28:23 +0100 Subject: Add support for ZooKeeper 3.8.1 --- zookeeper-server/CMakeLists.txt | 1 + zookeeper-server/pom.xml | 1 + .../zookeeper-server-3.8.1/CMakeLists.txt | 2 + zookeeper-server/zookeeper-server-3.8.1/pom.xml | 107 + .../zookeeper/ConfigServerZooKeeperServer.java | 43 + .../ReconfigurableVespaZooKeeperServer.java | 45 + .../zookeeper/VespaMtlsAuthenticationProvider.java | 41 + .../com/yahoo/vespa/zookeeper/VespaQuorumPeer.java | 60 + .../vespa/zookeeper/VespaZooKeeperAdminImpl.java | 93 + .../vespa/zookeeper/VespaZooKeeperServerImpl.java | 51 + .../java/org/apache/zookeeper/common/NetUtils.java | 94 + .../zookeeper/server/SyncRequestProcessor.java | 353 +++ .../server/VespaNettyServerCnxnFactory.java | 37 + .../apache/zookeeper/server/ZooKeeperServer.java | 2329 ++++++++++++++++++++ .../server/quorum/LeaderZooKeeperServer.java | 310 +++ .../apache/zookeeper/server/quorum/Learner.java | 920 ++++++++ .../server/quorum/LearnerZooKeeperServer.java | 184 ++ .../server/quorum/ObserverZooKeeperServer.java | 140 ++ .../server/quorum/ReadOnlyZooKeeperServer.java | 239 ++ .../server/quorum/SendAckRequestProcessor.java | 84 + 20 files changed, 5134 insertions(+) create mode 100644 zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt create mode 100644 zookeeper-server/zookeeper-server-3.8.1/pom.xml create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java create mode 100644 zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java diff --git a/zookeeper-server/CMakeLists.txt b/zookeeper-server/CMakeLists.txt index e88493c1a1b..ea8bf68eba8 100644 --- a/zookeeper-server/CMakeLists.txt +++ b/zookeeper-server/CMakeLists.txt @@ -1,3 +1,4 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. add_subdirectory(zookeeper-server-common) add_subdirectory(zookeeper-server) +add_subdirectory(zookeeper-server-3.8.1) diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index 69a38ef9e2a..23590a3941f 100644 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -14,6 +14,7 @@ zookeeper-server-common zookeeper-server + zookeeper-server-3.8.1 diff --git a/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt new file mode 100644 index 00000000000..72c47b6028b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/CMakeLists.txt @@ -0,0 +1,2 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +install_jar(zookeeper-server-3.8.1-jar-with-dependencies.jar) diff --git a/zookeeper-server/zookeeper-server-3.8.1/pom.xml b/zookeeper-server/zookeeper-server-3.8.1/pom.xml new file mode 100644 index 00000000000..72da1ba26c4 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/pom.xml @@ -0,0 +1,107 @@ + + + + 4.0.0 + + com.yahoo.vespa + zookeeper-server-parent + 8-SNAPSHOT + ../pom.xml + + zookeeper-server-3.8.1 + container-plugin + 8-SNAPSHOT + + 3.8.1 + + + + com.yahoo.vespa + zookeeper-server-common + ${project.version} + + + com.yahoo.vespa + zookeeper-client-common + ${project.version} + + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + + io.dropwizard.metrics + metrics-core + compile + + + org.slf4j + slf4j-api + + + + + org.xerial.snappy + snappy-java + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + -Xlint:all + + + + + org.apache.maven.plugins + maven-install-plugin + + true + + + + com.yahoo.vespa + bundle-plugin + true + + com.sun.management + zookeeper-server + + + + + diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java new file mode 100644 index 00000000000..17179aa5e69 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java @@ -0,0 +1,43 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; +import java.nio.file.Path; + +/** + * + * Server used for starting config server, needed to be able to have different behavior for hosted and + * self-hosted Vespa (controlled by zookeeperServerConfig.dynamicReconfiguration). + * + * @author Harald Musum + */ +public class ConfigServerZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { + + private final VespaZooKeeperServer zooKeeperServer; + + @Inject + public ConfigServerZooKeeperServer(ZookeeperServerConfig zookeeperServerConfig) { + this.zooKeeperServer = zookeeperServerConfig.dynamicReconfiguration() + ? new ReconfigurableVespaZooKeeperServer(new Reconfigurer(new VespaZooKeeperAdminImpl()), zookeeperServerConfig) + : new VespaZooKeeperServerImpl(zookeeperServerConfig); + } + + @Override + public void deconstruct() { zooKeeperServer.shutdown(); } + + @Override + public void shutdown() { + zooKeeperServer.shutdown(); + } + + @Override + public void start(Path configFilePath) { + zooKeeperServer.start(configFilePath); + } + + @Override + public boolean reconfigurable() { return zooKeeperServer.reconfigurable(); } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java new file mode 100644 index 00000000000..8161e4bba4b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java @@ -0,0 +1,45 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import ai.vespa.validation.Validation; +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; +import java.nio.file.Path; +import java.time.Duration; + +/** + * Starts or reconfigures zookeeper cluster. + * The QuorumPeer conditionally created here is owned by the Reconfigurer; + * when it already has a peer, that peer is used here in case start or shutdown is required. + * + * @author hmusum + */ +public class ReconfigurableVespaZooKeeperServer extends AbstractComponent implements VespaZooKeeperServer { + + private QuorumPeer peer; + + @Inject + public ReconfigurableVespaZooKeeperServer(Reconfigurer reconfigurer, ZookeeperServerConfig zookeeperServerConfig) { + Validation.require(zookeeperServerConfig.dynamicReconfiguration(), + zookeeperServerConfig.dynamicReconfiguration(), + "dynamicReconfiguration must be true"); + peer = reconfigurer.startOrReconfigure(zookeeperServerConfig, this, () -> peer = new VespaQuorumPeer()); + } + + @Override + public void shutdown() { + peer.shutdown(Duration.ofMinutes(1)); + } + + @Override + public void start(Path configFilePath) { + peer.start(configFilePath); + } + + @Override + public boolean reconfigurable() { + return true; + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java new file mode 100644 index 00000000000..66742b0e05b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java @@ -0,0 +1,41 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.auth.AuthenticationProvider; +import org.apache.zookeeper.server.auth.X509AuthenticationProvider; + +import java.security.cert.X509Certificate; +import java.util.logging.Logger; + +/** + * A {@link AuthenticationProvider} to be used in combination with Vespa mTLS + * + * @author bjorncs + */ +public class VespaMtlsAuthenticationProvider extends X509AuthenticationProvider { + + private static final Logger log = Logger.getLogger(VespaMtlsAuthenticationProvider.class.getName()); + + public VespaMtlsAuthenticationProvider() throws X509Exception { super(null, null);} + + @Override + public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) { + // Vespa's mTLS peer authorization rules are performed by the underlying trust manager implementation. + // The client is authorized once the SSL handshake has completed. + X509Certificate[] certificateChain = (X509Certificate[]) cnxn.getClientCertificateChain(); + if (certificateChain == null || certificateChain.length == 0) { + log.warning("Client not authenticated - should not be possible with clientAuth=NEED"); + return KeeperException.Code.AUTHFAILED; + } + X509Certificate certificate = certificateChain[0]; + cnxn.addAuthInfo(new Id(getScheme(), certificate.getSubjectX500Principal().getName())); + return KeeperException.Code.OK; + } + + @Override public String getScheme() { return "x509"; } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java new file mode 100644 index 00000000000..e5c35a185b5 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java @@ -0,0 +1,60 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.yahoo.protect.Process; +import org.apache.zookeeper.server.admin.AdminServer; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerMain; + +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Starts or stops a ZooKeeper server. Extends QuorumPeerMain to be able to call initializeAndRun() and wraps + * exceptions so that it can be used by code that does not depend on ZooKeeper. + * + * @author hmusum + */ +class VespaQuorumPeer extends QuorumPeerMain implements QuorumPeer { + + private static final Logger log = java.util.logging.Logger.getLogger(VespaQuorumPeer.class.getName()); + + @Override + public void start(Path path) { + initializeAndRun(new String[]{ path.toFile().getAbsolutePath()}); + } + + @Override + public void shutdown(Duration timeout) { + if (quorumPeer != null) { + log.log(Level.FINE, "Shutting down ZooKeeper server"); + try { + quorumPeer.shutdown(); + quorumPeer.join(timeout.toMillis()); // Wait for shutdown to complete + if (quorumPeer.isAlive()) + throw new IllegalStateException("Peer still alive after " + timeout); + } catch (RuntimeException | InterruptedException e) { + // If shutdown fails, we have no other option than forcing the JVM to stop and letting it be restarted. + // + // When a VespaZooKeeperServer component receives a new config, the container will try to start a new + // server with the new config, this will fail until the old server is deconstructed. If the old server + // fails to deconstruct/shutdown, the new one will never start and if that happens forcing a restart is + // the better option. + Process.logAndDie("Failed to shut down ZooKeeper server properly, forcing shutdown", e); + } + } + } + + @Override + protected void initializeAndRun(String[] args) { + try { + super.initializeAndRun(args); + } catch (QuorumPeerConfig.ConfigException | IOException | AdminServer.AdminServerException e) { + throw new RuntimeException("Exception when initializing or running ZooKeeper server", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java new file mode 100644 index 00000000000..be69a12599b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java @@ -0,0 +1,93 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.net.HostName; +import com.yahoo.vespa.zookeeper.client.ZkClientConfigBuilder; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.data.ACL; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.yolean.Exceptions.uncheck; + +/** + * @author hmusum + */ +@SuppressWarnings("unused") // Created by injection +public class VespaZooKeeperAdminImpl implements VespaZooKeeperAdmin { + + private static final Logger log = java.util.logging.Logger.getLogger(VespaZooKeeperAdminImpl.class.getName()); + + @Override + public void reconfigure(String connectionSpec, String servers) throws ReconfigException { + try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(connectionSpec)) { + long fromConfig = -1; + // Using string parameters because the List variant of reconfigure fails to join empty lists (observed on 3.5.6, fixed in 3.7.0). + log.log(Level.INFO, "Applying ZooKeeper config: " + servers); + byte[] appliedConfig = zooKeeperAdmin.reconfigure(null, null, servers, fromConfig, null); + log.log(Level.INFO, "Applied ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); + + // Verify by issuing a write operation; this is only accepted once new quorum is obtained. + List acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + String node = zooKeeperAdmin.create("/reconfigure-dummy-node", new byte[0], acl, CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperAdmin.delete(node, -1); + + log.log(Level.INFO, "Verified ZooKeeper config: " + new String(appliedConfig, StandardCharsets.UTF_8)); + } + catch ( KeeperException.ReconfigInProgress + | KeeperException.ConnectionLossException + | KeeperException.NewConfigNoQuorum e) { + throw new ReconfigException(e); + } + catch (KeeperException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private ZooKeeperAdmin createAdmin(String connectionSpec) { + return uncheck(() -> new ZooKeeperAdmin(connectionSpec, (int) sessionTimeout().toMillis(), + (event) -> log.log(Level.INFO, event.toString()), new ZkClientConfigBuilder().toConfig())); + } + + /** Creates a node in zookeeper, with hostname as part of node name, this ensures that server is up and working before returning */ + void createDummyNode(ZookeeperServerConfig zookeeperServerConfig) { + int sleepTime = 2_000; + try (ZooKeeperAdmin zooKeeperAdmin = createAdmin(localConnectionSpec(zookeeperServerConfig))) { + Instant end = Instant.now().plus(Duration.ofMinutes(5)); + Exception exception = null; + do { + try { + zooKeeperAdmin.create("/dummy-node-" + HostName.getLocalhost(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + return; + } catch (KeeperException e) { + if (e instanceof KeeperException.NodeExistsException) { + try { + zooKeeperAdmin.setData("/dummy-node-" + HostName.getLocalhost(), new byte[0], -1); + return; + } catch (KeeperException ex) { + log.log(Level.INFO, e.getMessage()); + Thread.sleep(sleepTime); + continue; + } + } + log.log(Level.INFO, e.getMessage()); + exception = e; + Thread.sleep(sleepTime); + } + } while (Instant.now().isBefore(end)); + throw new RuntimeException("Unable to create dummy node: ", exception); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +} + diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java new file mode 100644 index 00000000000..8adabeedb1b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java @@ -0,0 +1,51 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.zookeeper; + +import ai.vespa.validation.Validation; +import com.yahoo.cloud.config.ZookeeperServerConfig; +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.annotation.Inject; +import java.nio.file.Path; +import java.time.Duration; + +/** + * @author Ulf Lilleengen + * @author Harald Musum + */ +public class VespaZooKeeperServerImpl extends AbstractComponent implements VespaZooKeeperServer { + + private final VespaQuorumPeer peer; + private final ZooKeeperRunner runner; + + @Inject + public VespaZooKeeperServerImpl(ZookeeperServerConfig zookeeperServerConfig) { + Validation.require(! zookeeperServerConfig.dynamicReconfiguration(), + ! zookeeperServerConfig.dynamicReconfiguration(), + "dynamicReconfiguration must be false"); + this.peer = new VespaQuorumPeer(); + this.runner = new ZooKeeperRunner(zookeeperServerConfig, this); + new VespaZooKeeperAdminImpl().createDummyNode(zookeeperServerConfig); + } + + @Override + public void deconstruct() { + runner.shutdown(); + super.deconstruct(); + } + + @Override + public void shutdown() { + peer.shutdown(Duration.ofMinutes(1)); + } + + @Override + public void start(Path configFilePath) { + peer.start(configFilePath); + } + + @Override + public boolean reconfigurable() { + return false; + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java new file mode 100644 index 00000000000..33ec9b1303a --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/common/NetUtils.java @@ -0,0 +1,94 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * This class contains common utilities for netstuff. Like printing IPv6 literals correctly + */ +public class NetUtils { + + // Note: Changed from original to use hostname from InetSocketAddress if there exists one + public static String formatInetAddr(InetSocketAddress addr) { + String hostName = addr.getHostName(); + if (hostName != null) { + return String.format("%s:%s", hostName, addr.getPort()); + } + + InetAddress ia = addr.getAddress(); + + if (ia == null) { + return String.format("%s:%s", addr.getHostString(), addr.getPort()); + } + if (ia instanceof Inet6Address) { + return String.format("[%s]:%s", ia.getHostAddress(), addr.getPort()); + } else { + return String.format("%s:%s", ia.getHostAddress(), addr.getPort()); + } + } + + /** + * Separates host and port from given host port string if host port string is enclosed + * within square bracket. + * + * @param hostPort host port string + * @return String[]{host, port} if host port string is host:port + * or String[] {host, port:port} if host port string is host:port:port + * or String[] {host} if host port string is host + * or String[]{} if not a ipv6 host port string. + */ + public static String[] getIPV6HostAndPort(String hostPort) { + if (hostPort.startsWith("[")) { + int i = hostPort.lastIndexOf(']'); + if (i < 0) { + throw new IllegalArgumentException( + hostPort + " starts with '[' but has no matching ']'"); + } + String host = hostPort.substring(1, i); + if (host.isEmpty()) { + throw new IllegalArgumentException(host + " is empty."); + } + if (hostPort.length() > i + 1) { + return getHostPort(hostPort, i, host); + } + return new String[] { host }; + } else { + //Not an IPV6 host port string + return new String[] {}; + } + } + + private static String[] getHostPort(String hostPort, int indexOfClosingBracket, String host) { + // [127::1]:2181 , check separator : exits + if (hostPort.charAt(indexOfClosingBracket + 1) != ':') { + throw new IllegalArgumentException(hostPort + " does not have : after ]"); + } + // [127::1]: scenario + if (indexOfClosingBracket + 2 == hostPort.length()) { + throw new IllegalArgumentException(hostPort + " doesn't have a port after colon."); + } + //do not include + String port = hostPort.substring(indexOfClosingBracket + 2); + return new String[] { host, port }; + } +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java new file mode 100644 index 00000000000..e03e0b07944 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This RequestProcessor logs requests to disk. It batches the requests to do + * the io efficiently. The request is not passed to the next RequestProcessor + * until its log has been synced to disk. + * + * SyncRequestProcessor is used in 3 different cases + * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which + * send ack back to itself. + * 2. Follower - Sync request to disk and forward request to + * SendAckRequestProcessor which send the packets to leader. + * SendAckRequestProcessor is flushable which allow us to force + * push packets to leader. + * 3. Observer - Sync committed request to disk (received as INFORM packet). + * It never send ack back to the leader, so the nextProcessor will + * be null. This change the semantic of txnlog on the observer + * since it only contains committed txns. + */ +public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); + + private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void close() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void open() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + + /** The number of log entries to log before starting a snapshot */ + private static int snapCount = ZooKeeperServer.getSnapCount(); + + /** + * The total size of log entries before starting a snapshot + */ + private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes(); + + /** + * Random numbers used to vary snapshot timing + */ + private int randRoll; + private long randSize; + + private final BlockingQueue queuedRequests = new LinkedBlockingQueue(); + + private final Semaphore snapThreadMutex = new Semaphore(1); + + private final ZooKeeperServer zks; + + private final DelayingProcessor nextProcessor; + + /** + * Transactions that have been written and are waiting to be flushed to + * disk. Basically this is the list of SyncItems whose callbacks will be + * invoked after flush returns successfully. + */ + private final Queue toFlush; + private long lastFlushTime; + + public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { + super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); + this.zks = zks; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); + this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); + } + + /** + * used by tests to check for changing + * snapcounts + * @param count + */ + public static void setSnapCount(int count) { + snapCount = count; + } + + /** + * used by tests to get the snapcount + * @return the snapcount + */ + public static int getSnapCount() { + return snapCount; + } + + private long getRemainingDelay() { + long flushDelay = zks.getFlushDelay(); + long duration = Time.currentElapsedTime() - lastFlushTime; + if (duration < flushDelay) { + return flushDelay - duration; + } + return 0; + } + + /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush + * whenever either condition is hit. If only one or the other is + * set, flush only when the relevant condition is hit. + */ + private boolean shouldFlush() { + long flushDelay = zks.getFlushDelay(); + long maxBatchSize = zks.getMaxBatchSize(); + if ((flushDelay > 0) && (getRemainingDelay() == 0)) { + return true; + } + return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize); + } + + /** + * used by tests to check for changing + * snapcounts + * @param size + */ + public static void setSnapSizeInBytes(long size) { + snapSizeInBytes = size; + } + + private boolean shouldSnapshot() { + int logCount = zks.getZKDatabase().getTxnCount(); + long logSize = zks.getZKDatabase().getTxnSize(); + return (logCount > (snapCount / 2 + randRoll)) + || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); + } + + private void resetSnapshotStats() { + randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); + randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); + } + + @Override + public void run() { + try { + // we do this in an attempt to ensure that not all of the servers + // in the ensemble take a snapshot at the same time + resetSnapshotStats(); + lastFlushTime = Time.currentElapsedTime(); + while (true) { + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); + + long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); + Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); + if (si == null) { + /* We timed out looking for more writes to batch, go ahead and flush immediately */ + flush(); + si = queuedRequests.take(); + } + + if (si == REQUEST_OF_DEATH) { + break; + } + + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + + long startProcessTime = Time.currentElapsedTime(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); + + // track the number of records written to the log + if (!si.isThrottled() && zks.getZKDatabase().append(si)) { + if (shouldSnapshot()) { + resetSnapshotStats(); + // roll the log + zks.getZKDatabase().rollLog(); + // take a snapshot + if (!snapThreadMutex.tryAcquire()) { + LOG.warn("Too busy to snap, skipping"); + } else { + new ZooKeeperThread("Snapshot Thread") { + public void run() { + try { + zks.takeSnapshot(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + snapThreadMutex.release(); + } + } + }.start(); + } + } + } else if (toFlush.isEmpty()) { + // optimization for read heavy workloads + // iff this is a read or a throttled request(which doesn't need to be written to the disk), + // and there are no pending flushes (writes), then just pass this to the next processor + if (nextProcessor != null) { + nextProcessor.processRequest(si); + nextProcessor.flush(); + } + continue; + } + toFlush.add(si); + if (shouldFlush()) { + flush(); + } + ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); + } + } catch (Throwable t) { + handleException(this.getName(), t); + } + LOG.info("SyncRequestProcessor exited!"); + } + + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + + private void flush() throws IOException, RequestProcessorException { + if (this.toFlush.isEmpty()) { + return; + } + + ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); + + long flushStartTime = Time.currentElapsedTime(); + zks.getZKDatabase().commit(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); + + if (this.nextProcessor == null) { + this.toFlush.clear(); + } else { + while (!this.toFlush.isEmpty()) { + final Request i = this.toFlush.remove(); + long latency = Time.currentElapsedTime() - i.syncQueueStartTime; + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); + this.nextProcessor.processRequest(i); + } + nextProcessor.flush(); + } + lastFlushTime = Time.currentElapsedTime(); + } + + public void shutdown() { + LOG.info("Shutting down"); + queuedRequests.add(REQUEST_OF_DEATH); + try { + this.join(); + this.flush(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while wating for {} to finish", this); + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOG.warn("Got IO exception during shutdown"); + } catch (RequestProcessorException e) { + LOG.warn("Got request processor exception during shutdown"); + } + if (nextProcessor != null) { + nextProcessor.shutdown(); + } + } + + public void processRequest(final Request request) { + Objects.requireNonNull(request, "Request cannot be null"); + + request.syncQueueStartTime = Time.currentElapsedTime(); + queuedRequests.add(request); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java new file mode 100644 index 00000000000..fdfe0fe8467 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package org.apache.zookeeper.server; + +import com.yahoo.vespa.zookeeper.Configurator; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.logging.Logger; + +/** + * Overrides secure setting with value from {@link Configurator}. + * Workaround for incorrect handling of clientSecurePort in combination with ZooKeeper Dynamic Reconfiguration in 3.6.2 + * See https://issues.apache.org/jira/browse/ZOOKEEPER-3577. + * + * Using package {@link org.apache.zookeeper.server} as {@link NettyServerCnxnFactory#NettyServerCnxnFactory()} is package-private. + * + * @author bjorncs + */ +public class VespaNettyServerCnxnFactory extends NettyServerCnxnFactory { + + private static final Logger log = Logger.getLogger(VespaNettyServerCnxnFactory.class.getName()); + + private final boolean isSecure; + + public VespaNettyServerCnxnFactory() { + super(); + this.isSecure = Configurator.VespaNettyServerCnxnFactory_isSecure; + boolean portUnificationEnabled = Boolean.getBoolean(NettyServerCnxnFactory.PORT_UNIFICATION_KEY); + log.info(String.format("For %h: isSecure=%b, portUnification=%b", this, isSecure, portUnificationEnabled)); + } + + @Override + public void configure(InetSocketAddress addr, int maxClientCnxns, int backlog, boolean secure) throws IOException { + log.info(String.format("For %h: configured() invoked with parameter 'secure'=%b, overridden to %b", this, secure, isSecure)); + super.configure(addr, maxClientCnxns, backlog, isSecure); + } +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java new file mode 100644 index 00000000000..5f408ea58ff --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -0,0 +1,2329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.Environment; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.Quotas; +import org.apache.zookeeper.StatsTrack; +import org.apache.zookeeper.Version; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.ZookeeperBanner; +import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.common.StringUtils; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.StatPersisted; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.proto.AuthPacket; +import org.apache.zookeeper.proto.ConnectRequest; +import org.apache.zookeeper.proto.ConnectResponse; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.proto.DeleteRequest; +import org.apache.zookeeper.proto.GetSASLRequest; +import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; +import org.apache.zookeeper.proto.SetACLRequest; +import org.apache.zookeeper.proto.SetDataRequest; +import org.apache.zookeeper.proto.SetSASLResponse; +import org.apache.zookeeper.server.DataTree.ProcessTxnResult; +import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; +import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; +import org.apache.zookeeper.server.SessionTracker.Session; +import org.apache.zookeeper.server.SessionTracker.SessionExpirer; +import org.apache.zookeeper.server.auth.ProviderRegistry; +import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; +import org.apache.zookeeper.server.util.JvmPauseMonitor; +import org.apache.zookeeper.server.util.OSMXBean; +import org.apache.zookeeper.server.util.QuotaMetricsUtils; +import org.apache.zookeeper.server.util.RequestPathMetricsCollector; +import org.apache.zookeeper.txn.CreateSessionTxn; +import org.apache.zookeeper.txn.TxnDigest; +import org.apache.zookeeper.txn.TxnHeader; +import org.apache.zookeeper.util.ServiceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.sasl.SaslException; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +/** + * This class implements a simple standalone ZooKeeperServer. It sets up the + * following chain of RequestProcessors to process requests: + * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor + */ +public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { + + protected static final Logger LOG; + private static final RateLogger RATE_LOGGER; + + public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; + + public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; + public static final String SKIP_ACL = "zookeeper.skipACL"; + public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; + + // When enabled, will check ACL constraints appertained to the requests first, + // before sending the requests to the quorum. + static final boolean enableEagerACLCheck; + + static final boolean skipACL; + + public static final boolean enforceQuota; + + public static final String SASL_SUPER_USER = "zookeeper.superUser"; + + public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; + public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; + private static boolean digestEnabled; + + // Add a enable/disable option for now, we should remove this one when + // this feature is confirmed to be stable + public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; + private static boolean closeSessionTxnEnabled = true; + + static { + LOG = LoggerFactory.getLogger(ZooKeeperServer.class); + + RATE_LOGGER = new RateLogger(LOG); + + ZookeeperBanner.printBanner(LOG); + + Environment.logEnv("Server environment:", LOG); + + enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); + LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); + + skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); + if (skipACL) { + LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); + } + + enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false")); + if (enforceQuota) { + LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); + } + + digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); + LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); + + closeSessionTxnEnabled = Boolean.parseBoolean( + System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); + LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + } + + public static boolean isCloseSessionTxnEnabled() { + return closeSessionTxnEnabled; + } + + public static void setCloseSessionTxnEnabled(boolean enabled) { + ZooKeeperServer.closeSessionTxnEnabled = enabled; + LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, + ZooKeeperServer.closeSessionTxnEnabled); + } + + protected ZooKeeperServerBean jmxServerBean; + protected DataTreeBean jmxDataTreeBean; + + public static final int DEFAULT_TICK_TIME = 3000; + protected int tickTime = DEFAULT_TICK_TIME; + public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled + protected static volatile int throttledOpWaitTime = + Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME); + /** value of -1 indicates unset, use default */ + protected int minSessionTimeout = -1; + /** value of -1 indicates unset, use default */ + protected int maxSessionTimeout = -1; + /** Socket listen backlog. Value of -1 indicates unset */ + protected int listenBacklog = -1; + protected SessionTracker sessionTracker; + private FileTxnSnapLog txnLogFactory = null; + private ZKDatabase zkDb; + private ResponseCache readResponseCache; + private ResponseCache getChildrenResponseCache; + private final AtomicLong hzxid = new AtomicLong(0); + public static final Exception ok = new Exception("No prob"); + protected RequestProcessor firstProcessor; + protected JvmPauseMonitor jvmPauseMonitor; + protected volatile State state = State.INITIAL; + private boolean isResponseCachingEnabled = true; + /* contains the configuration file content read at startup */ + protected String initialConfig; + protected boolean reconfigEnabled; + private final RequestPathMetricsCollector requestPathMetricsCollector; + private static final int DEFAULT_SNAP_COUNT = 100000; + private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000; + + private boolean localSessionEnabled = false; + protected enum State { + INITIAL, + RUNNING, + SHUTDOWN, + ERROR + } + + /** + * This is the secret that we use to generate passwords. For the moment, + * it's more of a checksum that's used in reconnection, which carries no + * security weight, and is treated internally as if it carries no + * security weight. + */ + private static final long superSecret = 0XB3415C00L; + + private final AtomicInteger requestsInProcess = new AtomicInteger(0); + final Deque outstandingChanges = new ArrayDeque<>(); + // this data structure must be accessed under the outstandingChanges lock + final Map outstandingChangesForPath = new HashMap(); + + protected ServerCnxnFactory serverCnxnFactory; + protected ServerCnxnFactory secureServerCnxnFactory; + + private final ServerStats serverStats; + private final ZooKeeperServerListener listener; + private ZooKeeperServerShutdownHandler zkShutdownHandler; + private volatile int createSessionTrackerServerId = 1; + + private static final String FLUSH_DELAY = "zookeeper.flushDelay"; + private static volatile long flushDelay; + private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; + private static volatile long maxWriteQueuePollTime; + private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; + private static volatile int maxBatchSize; + + /** + * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. + * Flag not used for small transfers like connectResponses. + */ + public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; + public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; + public static final int intBufferStartingSizeBytes; + + public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; + public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; + + static { + long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); + setFlushDelay(configuredFlushDelay); + setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); + setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); + + intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); + + if (intBufferStartingSizeBytes < 32) { + String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. " + + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=\" "; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); + } + + // Connection throttling + private BlueThrottle connThrottle = new BlueThrottle(); + + private RequestThrottler requestThrottler; + public static final String SNAP_COUNT = "zookeeper.snapCount"; + + /** + * This setting sets a limit on the total number of large requests that + * can be inflight and is designed to prevent ZooKeeper from accepting + * too many large requests such that the JVM runs out of usable heap and + * ultimately crashes. + * + * The limit is enforced by the {@link checkRequestSize(int, boolean)} + * method which is called by the connection layer ({@link NIOServerCnxn}, + * {@link NettyServerCnxn}) before allocating a byte buffer and pulling + * data off the TCP socket. The limit is then checked again by the + * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which + * also atomically updates {@link currentLargeRequestBytes}. The request is + * then marked as a large request, with the request size stored in the Request + * object so that it can later be decremented from {@link currentLargeRequestsBytes}. + * + * When a request is completed or dropped, the relevant code path calls the + * {@link requestFinished(Request)} method which performs the decrement if + * needed. + */ + private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; + + /** + * The size threshold after which a request is considered a large request + * and is checked against the large request byte limit. + */ + private volatile int largeRequestThreshold = -1; + + private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); + + private AuthenticationHelper authHelper; + + void removeCnxn(ServerCnxn cnxn) { + zkDb.removeCnxn(cnxn); + } + + /** + * Creates a ZooKeeperServer instance. Nothing is setup, use the setX + * methods to prepare the instance (eg datadir, datalogdir, ticktime, + * builder, etc...) + * + */ + public ZooKeeperServer() { + listener = new ZooKeeperServerListenerImpl(this); + serverStats = new ServerStats(this); + this.requestPathMetricsCollector = new RequestPathMetricsCollector(); + this.authHelper = new AuthenticationHelper(); + } + + /** + * Keeping this constructor for backward compatibility + */ + public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { + this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); + } + + /** + * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't + * actually start listening for clients until run() is invoked. + * + */ + public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) { + serverStats = new ServerStats(this); + this.txnLogFactory = txnLogFactory; + this.txnLogFactory.setServerStats(this.serverStats); + this.zkDb = zkDb; + this.tickTime = tickTime; + setMinSessionTimeout(minSessionTimeout); + setMaxSessionTimeout(maxSessionTimeout); + this.listenBacklog = clientPortListenBacklog; + this.reconfigEnabled = reconfigEnabled; + + listener = new ZooKeeperServerListenerImpl(this); + + readResponseCache = new ResponseCache(Integer.getInteger( + GET_DATA_RESPONSE_CACHE_SIZE, + ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData"); + + getChildrenResponseCache = new ResponseCache(Integer.getInteger( + GET_CHILDREN_RESPONSE_CACHE_SIZE, + ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren"); + + this.initialConfig = initialConfig; + + this.requestPathMetricsCollector = new RequestPathMetricsCollector(); + + this.initLargeRequestThrottlingSettings(); + + this.authHelper = new AuthenticationHelper(); + + LOG.info( + "Created patched server with" + + " tickTime {} ms" + + " minSessionTimeout {} ms" + + " maxSessionTimeout {} ms" + + " clientPortListenBacklog {}" + + " datadir {}" + + " snapdir {}", + tickTime, + getMinSessionTimeout(), + getMaxSessionTimeout(), + getClientPortListenBacklog(), + txnLogFactory.getDataDir(), + txnLogFactory.getSnapDir()); + } + + public String getInitialConfig() { + return initialConfig; + } + + /** + * Adds JvmPauseMonitor and calls + * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} + * + */ + public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { + this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); + this.jvmPauseMonitor = jvmPauseMonitor; + if (jvmPauseMonitor != null) { + LOG.info("Added JvmPauseMonitor to server"); + } + } + + /** + * creates a zookeeperserver instance. + * @param txnLogFactory the file transaction snapshot logging class + * @param tickTime the ticktime for the server + */ + public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) { + this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled()); + } + + public ServerStats serverStats() { + return serverStats; + } + + public RequestPathMetricsCollector getRequestPathMetricsCollector() { + return requestPathMetricsCollector; + } + + public BlueThrottle connThrottle() { + return connThrottle; + } + + public void dumpConf(PrintWriter pwriter) { + pwriter.print("clientPort="); + pwriter.println(getClientPort()); + pwriter.print("secureClientPort="); + pwriter.println(getSecureClientPort()); + pwriter.print("dataDir="); + pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); + pwriter.print("dataDirSize="); + pwriter.println(getDataDirSize()); + pwriter.print("dataLogDir="); + pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); + pwriter.print("dataLogSize="); + pwriter.println(getLogDirSize()); + pwriter.print("tickTime="); + pwriter.println(getTickTime()); + pwriter.print("maxClientCnxns="); + pwriter.println(getMaxClientCnxnsPerHost()); + pwriter.print("minSessionTimeout="); + pwriter.println(getMinSessionTimeout()); + pwriter.print("maxSessionTimeout="); + pwriter.println(getMaxSessionTimeout()); + pwriter.print("clientPortListenBacklog="); + pwriter.println(getClientPortListenBacklog()); + + pwriter.print("serverId="); + pwriter.println(getServerId()); + } + + public ZooKeeperServerConf getConf() { + return new ZooKeeperServerConf( + getClientPort(), + zkDb.snapLog.getSnapDir().getAbsolutePath(), + zkDb.snapLog.getDataDir().getAbsolutePath(), + getTickTime(), + getMaxClientCnxnsPerHost(), + getMinSessionTimeout(), + getMaxSessionTimeout(), + getServerId(), + getClientPortListenBacklog()); + } + + /** + * This constructor is for backward compatibility with the existing unit + * test code. + * It defaults to FileLogProvider persistence provider. + */ + public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { + this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); + } + + /** + * Default constructor, relies on the config for its argument values + * + * @throws IOException + */ + public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { + this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled()); + } + + /** + * get the zookeeper database for this server + * @return the zookeeper database for this server + */ + public ZKDatabase getZKDatabase() { + return this.zkDb; + } + + /** + * set the zkdatabase for this zookeeper server + * @param zkDb + */ + public void setZKDatabase(ZKDatabase zkDb) { + this.zkDb = zkDb; + } + + /** + * Restore sessions and data + */ + public void loadData() throws IOException, InterruptedException { + /* + * When a new leader starts executing Leader#lead, it + * invokes this method. The database, however, has been + * initialized before running leader election so that + * the server could pick its zxid for its initial vote. + * It does it by invoking QuorumPeer#getLastLoggedZxid. + * Consequently, we don't need to initialize it once more + * and avoid the penalty of loading it a second time. Not + * reloading it is particularly important for applications + * that host a large database. + * + * The following if block checks whether the database has + * been initialized or not. Note that this method is + * invoked by at least one other method: + * ZooKeeperServer#startdata. + * + * See ZOOKEEPER-1642 for more detail. + */ + if (zkDb.isInitialized()) { + setZxid(zkDb.getDataTreeLastProcessedZxid()); + } else { + setZxid(zkDb.loadDataBase()); + } + + // Clean up dead sessions + zkDb.getSessions().stream() + .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) + .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); + + // Make a clean snapshot + takeSnapshot(); + } + + public void takeSnapshot() { + takeSnapshot(false); + } + + public void takeSnapshot(boolean syncSnap) { + long start = Time.currentElapsedTime(); + try { + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + } catch (IOException e) { + LOG.error("Severe unrecoverable error, exiting", e); + // This is a severe error that we cannot recover from, + // so we need to exit + ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + } + long elapsed = Time.currentElapsedTime() - start; + LOG.info("Snapshot taken in {} ms", elapsed); + ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); + } + + public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { + return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); + } + + @Override + public long getDataDirSize() { + if (zkDb == null) { + return 0L; + } + File path = zkDb.snapLog.getDataDir(); + return getDirSize(path); + } + + @Override + public long getLogDirSize() { + if (zkDb == null) { + return 0L; + } + File path = zkDb.snapLog.getSnapDir(); + return getDirSize(path); + } + + private long getDirSize(File file) { + long size = 0L; + if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + size += getDirSize(f); + } + } + } else { + size = file.length(); + } + return size; + } + + public long getZxid() { + return hzxid.get(); + } + + public SessionTracker getSessionTracker() { + return sessionTracker; + } + + long getNextZxid() { + return hzxid.incrementAndGet(); + } + + public void setZxid(long zxid) { + hzxid.set(zxid); + } + + private void close(long sessionId) { + Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); + submitRequest(si); + } + + public void closeSession(long sessionId) { + LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); + + // we do not want to wait for a session close. send it as soon as we + // detect it! + close(sessionId); + } + + protected void killSession(long sessionId, long zxid) { + zkDb.killSession(sessionId, zxid); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); + } + if (sessionTracker != null) { + sessionTracker.removeSession(sessionId); + } + } + + public void expire(Session session) { + long sessionId = session.getSessionId(); + LOG.info( + "Expiring session 0x{}, timeout of {}ms exceeded", + Long.toHexString(sessionId), + session.getTimeout()); + close(sessionId); + } + + public void expire(long sessionId) { + LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId)); + + close(sessionId); + } + + public static class MissingSessionException extends IOException { + + private static final long serialVersionUID = 7467414635467261007L; + + public MissingSessionException(String msg) { + super(msg); + } + + } + + void touch(ServerCnxn cnxn) throws MissingSessionException { + if (cnxn == null) { + return; + } + long id = cnxn.getSessionId(); + int to = cnxn.getSessionTimeout(); + if (!sessionTracker.touchSession(id, to)) { + throw new MissingSessionException("No session with sessionid 0x" + + Long.toHexString(id) + + " exists, probably expired and removed"); + } + } + + protected void registerJMX() { + // register with JMX + try { + jmxServerBean = new ZooKeeperServerBean(this); + MBeanRegistry.getInstance().register(jmxServerBean, null); + + try { + jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + public void startdata() throws IOException, InterruptedException { + //check to see if zkDb is not null + if (zkDb == null) { + zkDb = new ZKDatabase(this.txnLogFactory); + } + if (!zkDb.isInitialized()) { + loadData(); + } + } + + public synchronized void startup() { + startupWithServerState(State.RUNNING); + } + + public synchronized void startupWithoutServing() { + startupWithServerState(State.INITIAL); + } + + public synchronized void startServing() { + setState(State.RUNNING); + notifyAll(); + } + + private void startupWithServerState(State state) { + if (sessionTracker == null) { + createSessionTracker(); + } + startSessionTracker(); + setupRequestProcessors(); + + startRequestThrottler(); + + registerJMX(); + + startJvmPauseMonitor(); + + registerMetrics(); + + setState(state); + + requestPathMetricsCollector.start(); + + localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); + + notifyAll(); + } + + protected void startJvmPauseMonitor() { + if (this.jvmPauseMonitor != null) { + this.jvmPauseMonitor.serviceStart(); + } + } + + protected void startRequestThrottler() { + requestThrottler = new RequestThrottler(this); + requestThrottler.start(); + + } + + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); + ((SyncRequestProcessor) syncProcessor).start(); + firstProcessor = new PrepRequestProcessor(this, syncProcessor); + ((PrepRequestProcessor) firstProcessor).start(); + } + + public ZooKeeperServerListener getZooKeeperServerListener() { + return listener; + } + + /** + * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to + * {@link #startup()} being called + * + * @param newId ID to use + */ + public void setCreateSessionTrackerServerId(int newId) { + createSessionTrackerServerId = newId; + } + + protected void createSessionTracker() { + sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); + } + + protected void startSessionTracker() { + ((SessionTrackerImpl) sessionTracker).start(); + } + + /** + * Sets the state of ZooKeeper server. After changing the state, it notifies + * the server state change to a registered shutdown handler, if any. + *

+ * The following are the server state transitions: + *

  • During startup the server will be in the INITIAL state.
  • + *
  • After successfully starting, the server sets the state to RUNNING. + *
  • + *
  • The server transitions to the ERROR state if it hits an internal + * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource + * error events, e.g., SyncRequestProcessor not being able to write a txn to + * disk.
  • + *
  • During shutdown the server sets the state to SHUTDOWN, which + * corresponds to the server not running.
+ * + * @param state new server state. + */ + protected void setState(State state) { + this.state = state; + // Notify server state changes to the registered shutdown handler, if any. + if (zkShutdownHandler != null) { + zkShutdownHandler.handle(state); + } else { + LOG.debug( + "ZKShutdownHandler is not registered, so ZooKeeper server" + + " won't take any action on ERROR or SHUTDOWN server state changes"); + } + } + + /** + * This can be used while shutting down the server to see whether the server + * is already shutdown or not. + * + * @return true if the server is running or server hits an error, false + * otherwise. + */ + protected boolean canShutdown() { + return state == State.RUNNING || state == State.ERROR; + } + + /** + * @return true if the server is running, false otherwise. + */ + public boolean isRunning() { + return state == State.RUNNING; + } + + public void shutdown() { + shutdown(false); + } + + /** + * Shut down the server instance + * @param fullyShutDown true if another server using the same database will not replace this one in the same process + */ + public synchronized void shutdown(boolean fullyShutDown) { + if (!canShutdown()) { + if (fullyShutDown && zkDb != null) { + zkDb.clear(); + } + LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); + return; + } + LOG.info("shutting down"); + + // new RuntimeException("Calling shutdown").printStackTrace(); + setState(State.SHUTDOWN); + + // unregister all metrics that are keeping a strong reference to this object + // subclasses will do their specific clean up + unregisterMetrics(); + + if (requestThrottler != null) { + requestThrottler.shutdown(); + } + + // Since sessionTracker and syncThreads poll we just have to + // set running to false and they will detect it during the poll + // interval. + if (sessionTracker != null) { + sessionTracker.shutdown(); + } + if (firstProcessor != null) { + firstProcessor.shutdown(); + } + if (jvmPauseMonitor != null) { + jvmPauseMonitor.serviceStop(); + } + + if (zkDb != null) { + if (fullyShutDown) { + zkDb.clear(); + } else { + // else there is no need to clear the database + // * When a new quorum is established we can still apply the diff + // on top of the same zkDb data + // * If we fetch a new snapshot from leader, the zkDb will be + // cleared anyway before loading the snapshot + try { + // This will fast-forward the database to the latest recorded transactions + zkDb.fastForwardDataBase(); + } catch (IOException e) { + LOG.error("Error updating DB", e); + zkDb.clear(); + } + } + } + + requestPathMetricsCollector.shutdown(); + unregisterJMX(); + } + + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + jmxDataTreeBean = null; + } + + public void incInProcess() { + requestsInProcess.incrementAndGet(); + } + + public void decInProcess() { + requestsInProcess.decrementAndGet(); + if (requestThrottler != null) { + requestThrottler.throttleWake(); + } + } + + public int getInProcess() { + return requestsInProcess.get(); + } + + public int getInflight() { + return requestThrottleInflight(); + } + + private int requestThrottleInflight() { + if (requestThrottler != null) { + return requestThrottler.getInflight(); + } + return 0; + } + + static class PrecalculatedDigest { + final long nodeDigest; + final long treeDigest; + + PrecalculatedDigest(long nodeDigest, long treeDigest) { + this.nodeDigest = nodeDigest; + this.treeDigest = treeDigest; + } + } + + + /** + * This structure is used to facilitate information sharing between PrepRP + * and FinalRP. + */ + static class ChangeRecord { + PrecalculatedDigest precalculatedDigest; + byte[] data; + + ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List acl) { + this.zxid = zxid; + this.path = path; + this.stat = stat; + this.childCount = childCount; + this.acl = acl; + } + + long zxid; + + String path; + + StatPersisted stat; /* Make sure to create a new object when changing */ + + int childCount; + + List acl; /* Make sure to create a new object when changing */ + + ChangeRecord duplicate(long zxid) { + StatPersisted stat = new StatPersisted(); + if (this.stat != null) { + DataTree.copyStatPersisted(this.stat, stat); + } + ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount, + acl == null ? new ArrayList<>() : new ArrayList<>(acl)); + changeRecord.precalculatedDigest = precalculatedDigest; + changeRecord.data = data; + return changeRecord; + } + + } + + byte[] generatePasswd(long id) { + Random r = new Random(id ^ superSecret); + byte[] p = new byte[16]; + r.nextBytes(p); + return p; + } + + protected boolean checkPasswd(long sessionId, byte[] passwd) { + return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); + } + + long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { + if (passwd == null) { + // Possible since it's just deserialized from a packet on the wire. + passwd = new byte[0]; + } + long sessionId = sessionTracker.createSession(timeout); + Random r = new Random(sessionId ^ superSecret); + r.nextBytes(passwd); + ByteBuffer to = ByteBuffer.allocate(4); + to.putInt(timeout); + cnxn.setSessionId(sessionId); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); + submitRequest(si); + return sessionId; + } + + /** + * set the owner of this session as owner + * @param id the session id + * @param owner the owner of the session + * @throws SessionExpiredException + */ + public void setOwner(long id, Object owner) throws SessionExpiredException { + sessionTracker.setOwner(id, owner); + } + + protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { + boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); + } + finishSessionInit(cnxn, rc); + } + + public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { + if (checkPasswd(sessionId, passwd)) { + revalidateSession(cnxn, sessionId, sessionTimeout); + } else { + LOG.warn( + "Incorrect password from {} for session 0x{}", + cnxn.getRemoteSocketAddress(), + Long.toHexString(sessionId)); + finishSessionInit(cnxn, false); + } + } + + public void finishSessionInit(ServerCnxn cnxn, boolean valid) { + // register with JMX + try { + if (valid) { + if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { + serverCnxnFactory.registerConnection(cnxn); + } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { + secureServerCnxnFactory.registerConnection(cnxn); + } + } + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + + try { + ConnectResponse rsp = new ConnectResponse( + 0, + valid ? cnxn.getSessionTimeout() : 0, + valid ? cnxn.getSessionId() : 0, // send 0 if session is no + // longer valid + valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); + bos.writeInt(-1, "len"); + rsp.serialize(bos, "connect"); + if (!cnxn.isOldClient) { + bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); + } + baos.close(); + ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); + bb.putInt(bb.remaining() - 4).rewind(); + cnxn.sendBuffer(bb); + + if (valid) { + LOG.debug( + "Established session 0x{} with negotiated timeout {} for client {}", + Long.toHexString(cnxn.getSessionId()), + cnxn.getSessionTimeout(), + cnxn.getRemoteSocketAddress()); + cnxn.enableRecv(); + } else { + + LOG.info( + "Invalid session 0x{} for client {}, probably expired", + Long.toHexString(cnxn.getSessionId()), + cnxn.getRemoteSocketAddress()); + cnxn.sendBuffer(ServerCnxnFactory.closeConn); + } + + } catch (Exception e) { + LOG.warn("Exception while establishing session, closing", e); + cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); + } + } + + public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { + closeSession(cnxn.getSessionId()); + } + + public long getServerId() { + return 0; + } + + /** + * If the underlying Zookeeper server support local session, this method + * will set a isLocalSession to true if a request is associated with + * a local session. + * + * @param si + */ + protected void setLocalSessionFlag(Request si) { + } + + public void submitRequest(Request si) { + enqueueRequest(si); + } + + public void enqueueRequest(Request si) { + if (requestThrottler == null) { + synchronized (this) { + try { + // Since all requests are passed to the request + // processor it should wait for setting up the request + // processor chain. The state will be updated to RUNNING + // after the setup. + while (state == State.INITIAL) { + wait(1000); + } + } catch (InterruptedException e) { + LOG.warn("Unexpected interruption", e); + } + if (requestThrottler == null) { + throw new RuntimeException("Not started"); + } + } + } + requestThrottler.submitRequest(si); + } + + public void submitRequestNow(Request si) { + if (firstProcessor == null) { + synchronized (this) { + try { + // Since all requests are passed to the request + // processor it should wait for setting up the request + // processor chain. The state will be updated to RUNNING + // after the setup. + while (state == State.INITIAL) { + wait(1000); + } + } catch (InterruptedException e) { + LOG.warn("Unexpected interruption", e); + } + if (firstProcessor == null || state != State.RUNNING) { + throw new RuntimeException("Not started"); + } + } + } + try { + touch(si.cnxn); + boolean validpacket = Request.isValid(si.type); + if (validpacket) { + setLocalSessionFlag(si); + firstProcessor.processRequest(si); + if (si.cnxn != null) { + incInProcess(); + } + } else { + LOG.warn("Received packet at server of unknown type {}", si.type); + // Update request accounting/throttling limits + requestFinished(si); + new UnimplementedRequestProcessor().processRequest(si); + } + } catch (MissingSessionException e) { + LOG.debug("Dropping request.", e); + // Update request accounting/throttling limits + requestFinished(si); + } catch (RequestProcessorException e) { + LOG.error("Unable to process request", e); + // Update request accounting/throttling limits + requestFinished(si); + } + } + + public static int getSnapCount() { + int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT); + // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor + if (snapCount < 2) { + LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); + snapCount = 2; + } + return snapCount; + } + + public int getGlobalOutstandingLimit() { + return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT); + } + + public static long getSnapSizeInBytes() { + long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default + if (size <= 0) { + LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); + } + return size * 1024; // Convert to bytes + } + + public void setServerCnxnFactory(ServerCnxnFactory factory) { + serverCnxnFactory = factory; + } + + public ServerCnxnFactory getServerCnxnFactory() { + return serverCnxnFactory; + } + + public ServerCnxnFactory getSecureServerCnxnFactory() { + return secureServerCnxnFactory; + } + + public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { + secureServerCnxnFactory = factory; + } + + /** + * return the last processed id from the + * datatree + */ + public long getLastProcessedZxid() { + return zkDb.getDataTreeLastProcessedZxid(); + } + + /** + * return the outstanding requests + * in the queue, which haven't been + * processed yet + */ + public long getOutstandingRequests() { + return getInProcess(); + } + + /** + * return the total number of client connections that are alive + * to this server + */ + public int getNumAliveConnections() { + int numAliveConnections = 0; + + if (serverCnxnFactory != null) { + numAliveConnections += serverCnxnFactory.getNumAliveConnections(); + } + + if (secureServerCnxnFactory != null) { + numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); + } + + return numAliveConnections; + } + + /** + * truncate the log to get in sync with others + * if in a quorum + * @param zxid the zxid that it needs to get in sync + * with others + * @throws IOException + */ + public void truncateLog(long zxid) throws IOException { + this.zkDb.truncateLog(zxid); + } + + public int getTickTime() { + return tickTime; + } + + public void setTickTime(int tickTime) { + LOG.info("tickTime set to {} ms", tickTime); + this.tickTime = tickTime; + } + + public static int getThrottledOpWaitTime() { + return throttledOpWaitTime; + } + + public static void setThrottledOpWaitTime(int time) { + LOG.info("throttledOpWaitTime set to {} ms", time); + throttledOpWaitTime = time; + } + + public int getMinSessionTimeout() { + return minSessionTimeout; + } + + public void setMinSessionTimeout(int min) { + this.minSessionTimeout = min == -1 ? tickTime * 2 : min; + LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout); + } + + public int getMaxSessionTimeout() { + return maxSessionTimeout; + } + + public void setMaxSessionTimeout(int max) { + this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; + LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout); + } + + public int getClientPortListenBacklog() { + return listenBacklog; + } + + public void setClientPortListenBacklog(int backlog) { + this.listenBacklog = backlog; + LOG.info("clientPortListenBacklog set to {}", backlog); + } + + public int getClientPort() { + return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; + } + + public int getSecureClientPort() { + return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; + } + + /** Maximum number of connections allowed from particular host (ip) */ + public int getMaxClientCnxnsPerHost() { + if (serverCnxnFactory != null) { + return serverCnxnFactory.getMaxClientCnxnsPerHost(); + } + if (secureServerCnxnFactory != null) { + return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); + } + return -1; + } + + public void setTxnLogFactory(FileTxnSnapLog txnLog) { + this.txnLogFactory = txnLog; + } + + public FileTxnSnapLog getTxnLogFactory() { + return this.txnLogFactory; + } + + /** + * Returns the elapsed sync of time of transaction log in milliseconds. + */ + public long getTxnLogElapsedSyncTime() { + return txnLogFactory.getTxnLogElapsedSyncTime(); + } + + public String getState() { + return "standalone"; + } + + public void dumpEphemerals(PrintWriter pwriter) { + zkDb.dumpEphemerals(pwriter); + } + + public Map> getEphemerals() { + return zkDb.getEphemerals(); + } + + public double getConnectionDropChance() { + return connThrottle.getDropChance(); + } + + public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) + throws IOException, ClientCnxnLimitException { + + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); + ConnectRequest connReq = new ConnectRequest(); + connReq.deserialize(bia, "connect"); + LOG.debug( + "Session establishment request from client {} client's lastZxid is 0x{}", + cnxn.getRemoteSocketAddress(), + Long.toHexString(connReq.getLastZxidSeen())); + + long sessionId = connReq.getSessionId(); + int tokensNeeded = 1; + if (connThrottle.isConnectionWeightEnabled()) { + if (sessionId == 0) { + if (localSessionEnabled) { + tokensNeeded = connThrottle.getRequiredTokensForLocal(); + } else { + tokensNeeded = connThrottle.getRequiredTokensForGlobal(); + } + } else { + tokensNeeded = connThrottle.getRequiredTokensForRenew(); + } + } + + if (!connThrottle.checkLimit(tokensNeeded)) { + throw new ClientCnxnLimitException(); + } + ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + + ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); + + boolean readOnly = false; + try { + readOnly = bia.readBool("readOnly"); + cnxn.isOldClient = false; + } catch (IOException e) { + // this is ok -- just a packet from an old client which + // doesn't contain readOnly field + LOG.warn( + "Connection request from old client {}; will be dropped if server is in r-o mode", + cnxn.getRemoteSocketAddress()); + } + if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { + String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); + LOG.info(msg); + throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); + } + if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { + String msg = "Refusing session request for client " + + cnxn.getRemoteSocketAddress() + + " as it has seen zxid 0x" + + Long.toHexString(connReq.getLastZxidSeen()) + + " our last zxid is 0x" + + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + + " client must try another server"; + + LOG.info(msg); + throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); + } + int sessionTimeout = connReq.getTimeOut(); + byte[] passwd = connReq.getPasswd(); + int minSessionTimeout = getMinSessionTimeout(); + if (sessionTimeout < minSessionTimeout) { + sessionTimeout = minSessionTimeout; + } + int maxSessionTimeout = getMaxSessionTimeout(); + if (sessionTimeout > maxSessionTimeout) { + sessionTimeout = maxSessionTimeout; + } + cnxn.setSessionTimeout(sessionTimeout); + // We don't want to receive any packets until we are sure that the + // session is setup + cnxn.disableRecv(); + if (sessionId == 0) { + long id = createSession(cnxn, passwd, sessionTimeout); + LOG.debug( + "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", + Long.toHexString(id), + Long.toHexString(connReq.getLastZxidSeen()), + connReq.getTimeOut(), + cnxn.getRemoteSocketAddress()); + } else { + validateSession(cnxn, sessionId); + LOG.debug( + "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", + Long.toHexString(sessionId), + Long.toHexString(connReq.getLastZxidSeen()), + connReq.getTimeOut(), + cnxn.getRemoteSocketAddress()); + if (serverCnxnFactory != null) { + serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); + } + if (secureServerCnxnFactory != null) { + secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); + } + cnxn.setSessionId(sessionId); + reopenSession(cnxn, sessionId, passwd, sessionTimeout); + ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); + + } + } + + /** + * Validate if a particular session can be reestablished. + * + * @param cnxn + * @param sessionId + */ + protected void validateSession(ServerCnxn cnxn, long sessionId) + throws IOException { + // do nothing + } + + public boolean shouldThrottle(long outStandingCount) { + int globalOutstandingLimit = getGlobalOutstandingLimit(); + if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { + return outStandingCount > 0; + } + return false; + } + + long getFlushDelay() { + return flushDelay; + } + + static void setFlushDelay(long delay) { + LOG.info("{} = {} ms", FLUSH_DELAY, delay); + flushDelay = delay; + } + + long getMaxWriteQueuePollTime() { + return maxWriteQueuePollTime; + } + + static void setMaxWriteQueuePollTime(long maxTime) { + LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); + maxWriteQueuePollTime = maxTime; + } + + int getMaxBatchSize() { + return maxBatchSize; + } + + static void setMaxBatchSize(int size) { + LOG.info("{}={}", MAX_BATCH_SIZE, size); + maxBatchSize = size; + } + + private void initLargeRequestThrottlingSettings() { + setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); + setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); + } + + public int getLargeRequestMaxBytes() { + return largeRequestMaxBytes; + } + + public void setLargeRequestMaxBytes(int bytes) { + if (bytes <= 0) { + LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); + LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); + } else { + largeRequestMaxBytes = bytes; + LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); + } + } + + public int getLargeRequestThreshold() { + return largeRequestThreshold; + } + + public void setLargeRequestThreshold(int threshold) { + if (threshold == 0 || threshold < -1) { + LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); + largeRequestThreshold = -1; + } else { + largeRequestThreshold = threshold; + LOG.info("The large request threshold is set to {}", largeRequestThreshold); + } + } + + public int getLargeRequestBytes() { + return currentLargeRequestBytes.get(); + } + + private boolean isLargeRequest(int length) { + // The large request limit is disabled when threshold is -1 + if (largeRequestThreshold == -1) { + return false; + } + return length > largeRequestThreshold; + } + + public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { + if (!isLargeRequest(length)) { + return true; + } + if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { + return true; + } else { + ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); + throw new IOException("Rejecting large request"); + } + + } + + private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { + if (!isLargeRequest(length)) { + return true; + } + + int bytes = currentLargeRequestBytes.addAndGet(length); + if (bytes > largeRequestMaxBytes) { + currentLargeRequestBytes.addAndGet(-length); + ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); + throw new IOException("Rejecting large request"); + } + return true; + } + + public void requestFinished(Request request) { + int largeRequestLength = request.getLargeRequestSize(); + if (largeRequestLength != -1) { + currentLargeRequestBytes.addAndGet(-largeRequestLength); + } + } + + public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { + // We have the request, now process and setup for next + InputStream bais = new ByteBufferInputStream(incomingBuffer); + BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); + RequestHeader h = new RequestHeader(); + h.deserialize(bia, "header"); + + // Need to increase the outstanding request count first, otherwise + // there might be a race condition that it enabled recv after + // processing request and then disabled when check throttling. + // + // Be aware that we're actually checking the global outstanding + // request before this request. + // + // It's fine if the IOException thrown before we decrease the count + // in cnxn, since it will close the cnxn anyway. + cnxn.incrOutstandingAndCheckThrottle(h); + + // Through the magic of byte buffers, txn will not be + // pointing + // to the start of the txn + incomingBuffer = incomingBuffer.slice(); + if (h.getType() == OpCode.auth) { + LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); + AuthPacket authPacket = new AuthPacket(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); + String scheme = authPacket.getScheme(); + ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); + Code authReturn = Code.AUTHFAILED; + if (ap != null) { + try { + // handleAuthentication may close the connection, to allow the client to choose + // a different server to connect to. + authReturn = ap.handleAuthentication( + new ServerAuthenticationProvider.ServerObjs(this, cnxn), + authPacket.getAuth()); + } catch (RuntimeException e) { + LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); + authReturn = Code.AUTHFAILED; + } + } + if (authReturn == Code.OK) { + LOG.info("Session 0x{}: auth success for scheme {} and address {}", + Long.toHexString(cnxn.getSessionId()), scheme, + cnxn.getRemoteSocketAddress()); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue()); + cnxn.sendResponse(rh, null, null); + } else { + if (ap == null) { + LOG.warn( + "No authentication provider for scheme: {} has {}", + scheme, + ProviderRegistry.listProviders()); + } else { + LOG.warn("Authentication failed for scheme: {}", scheme); + } + // send a response... + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue()); + cnxn.sendResponse(rh, null, null); + // ... and close connection + cnxn.sendBuffer(ServerCnxnFactory.closeConn); + cnxn.disableRecv(); + } + return; + } else if (h.getType() == OpCode.sasl) { + processSasl(incomingBuffer, cnxn, h); + } else { + if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { + // Authentication enforcement is failed + // Already sent response to user about failure and closed the session, lets return + return; + } else { + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); + int length = incomingBuffer.limit(); + if (isLargeRequest(length)) { + // checkRequestSize will throw IOException if request is rejected + checkRequestSizeWhenMessageReceived(length); + si.setLargeRequestSize(length); + } + si.setOwner(ServerCnxn.me); + submitRequest(si); + } + } + } + + private static boolean isSaslSuperUser(String id) { + if (id == null || id.isEmpty()) { + return false; + } + + Properties properties = System.getProperties(); + int prefixLen = SASL_SUPER_USER.length(); + + for (String k : properties.stringPropertyNames()) { + if (k.startsWith(SASL_SUPER_USER) + && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) { + String value = properties.getProperty(k); + + if (value != null && value.equals(id)) { + return true; + } + } + } + + return false; + } + + private static boolean shouldAllowSaslFailedClientsConnect() { + return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); + } + + private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { + LOG.debug("Responding to client SASL token."); + GetSASLRequest clientTokenRecord = new GetSASLRequest(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); + byte[] clientToken = clientTokenRecord.getToken(); + LOG.debug("Size of client SASL token: {}", clientToken.length); + byte[] responseToken = null; + try { + ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; + try { + // note that clientToken might be empty (clientToken.length == 0): + // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the + // SASL negotiation process. + responseToken = saslServer.evaluateResponse(clientToken); + if (saslServer.isComplete()) { + String authorizationID = saslServer.getAuthorizationID(); + LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}", + Long.toHexString(cnxn.getSessionId()), authorizationID); + cnxn.addAuthInfo(new Id("sasl", authorizationID)); + + if (isSaslSuperUser(authorizationID)) { + cnxn.addAuthInfo(new Id("super", "")); + LOG.info( + "Session 0x{}: Authenticated Id '{}' as super user", + Long.toHexString(cnxn.getSessionId()), + authorizationID); + } + } + } catch (SaslException e) { + LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); + if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) { + LOG.warn("Maintaining client connection despite SASL authentication failure."); + } else { + int error; + if (authHelper.isSaslAuthRequired()) { + LOG.warn( + "Closing client connection due to server requires client SASL authenticaiton," + + "but client SASL authentication has failed, or client is not configured with SASL " + + "authentication."); + error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); + } else { + LOG.warn("Closing client connection due to SASL authentication failure."); + error = Code.AUTHFAILED.intValue(); + } + + ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); + cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); + cnxn.sendCloseSession(); + cnxn.disableRecv(); + return; + } + } + } catch (NullPointerException e) { + LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); + } + if (responseToken != null) { + LOG.debug("Size of server SASL response: {}", responseToken.length); + } + + ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); + Record record = new SetSASLResponse(responseToken); + cnxn.sendResponse(replyHeader, record, "response"); + } + + // entry point for quorum/Learner.java + public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { + processTxnForSessionEvents(null, hdr, txn); + return processTxnInDB(hdr, txn, null); + } + + // entry point for FinalRequestProcessor.java + public ProcessTxnResult processTxn(Request request) { + TxnHeader hdr = request.getHdr(); + processTxnForSessionEvents(request, hdr, request.getTxn()); + + final boolean writeRequest = (hdr != null); + final boolean quorumRequest = request.isQuorum(); + + // return fast w/o synchronization when we get a read + if (!writeRequest && !quorumRequest) { + return new ProcessTxnResult(); + } + synchronized (outstandingChanges) { + ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); + + // request.hdr is set for write requests, which are the only ones + // that add to outstandingChanges. + if (writeRequest) { + long zxid = hdr.getZxid(); + while (!outstandingChanges.isEmpty() + && outstandingChanges.peek().zxid <= zxid) { + ChangeRecord cr = outstandingChanges.remove(); + ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); + if (cr.zxid < zxid) { + LOG.warn( + "Zxid outstanding 0x{} is less than current 0x{}", + Long.toHexString(cr.zxid), + Long.toHexString(zxid)); + } + if (outstandingChangesForPath.get(cr.path) == cr) { + outstandingChangesForPath.remove(cr.path); + } + } + } + + // do not add non quorum packets to the queue. + if (quorumRequest) { + getZKDatabase().addCommittedProposal(request); + } + return rc; + } + } + + private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { + int opCode = (request == null) ? hdr.getType() : request.type; + long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; + + if (opCode == OpCode.createSession) { + if (hdr != null && txn instanceof CreateSessionTxn) { + CreateSessionTxn cst = (CreateSessionTxn) txn; + sessionTracker.commitSession(sessionId, cst.getTimeOut()); + } else if (request == null || !request.isLocalSession()) { + LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); + } + } else if (opCode == OpCode.closeSession) { + sessionTracker.removeSession(sessionId); + } + } + + private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { + if (hdr == null) { + return new ProcessTxnResult(); + } else { + return getZKDatabase().processTxn(hdr, txn, digest); + } + } + + public Map> getSessionExpiryMap() { + return sessionTracker.getSessionExpiryMap(); + } + + /** + * This method is used to register the ZooKeeperServerShutdownHandler to get + * server's error or shutdown state change notifications. + * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for + * every server state changes {@link #setState(State)}. + * + * @param zkShutdownHandler shutdown handler + */ + void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { + this.zkShutdownHandler = zkShutdownHandler; + } + + public boolean isResponseCachingEnabled() { + return isResponseCachingEnabled; + } + + public void setResponseCachingEnabled(boolean isEnabled) { + isResponseCachingEnabled = isEnabled; + } + + public ResponseCache getReadResponseCache() { + return isResponseCachingEnabled ? readResponseCache : null; + } + + public ResponseCache getGetChildrenResponseCache() { + return isResponseCachingEnabled ? getChildrenResponseCache : null; + } + + protected void registerMetrics() { + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + + final ZKDatabase zkdb = this.getZKDatabase(); + final ServerStats stats = this.serverStats(); + + rootContext.registerGauge("avg_latency", stats::getAvgLatency); + + rootContext.registerGauge("max_latency", stats::getMaxLatency); + rootContext.registerGauge("min_latency", stats::getMinLatency); + + rootContext.registerGauge("packets_received", stats::getPacketsReceived); + rootContext.registerGauge("packets_sent", stats::getPacketsSent); + rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); + + rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); + rootContext.registerGauge("uptime", stats::getUptime); + + rootContext.registerGauge("znode_count", zkdb::getNodeCount); + + rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); + rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); + + rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); + + rootContext.registerGauge("global_sessions", zkdb::getSessionCount); + rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); + + OSMXBean osMbean = new OSMXBean(); + rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); + rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); + rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); + + rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); + rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); + rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); + + rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); + rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount); + rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount); + rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount); + + rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE, + () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree())); + rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE, + () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree())); + rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE, + () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree())); + rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE, + () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree())); + } + + protected void unregisterMetrics() { + + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + + rootContext.unregisterGauge("avg_latency"); + + rootContext.unregisterGauge("max_latency"); + rootContext.unregisterGauge("min_latency"); + + rootContext.unregisterGauge("packets_received"); + rootContext.unregisterGauge("packets_sent"); + rootContext.unregisterGauge("num_alive_connections"); + + rootContext.unregisterGauge("outstanding_requests"); + rootContext.unregisterGauge("uptime"); + + rootContext.unregisterGauge("znode_count"); + + rootContext.unregisterGauge("watch_count"); + rootContext.unregisterGauge("ephemerals_count"); + rootContext.unregisterGauge("approximate_data_size"); + + rootContext.unregisterGauge("global_sessions"); + rootContext.unregisterGauge("local_sessions"); + + rootContext.unregisterGauge("open_file_descriptor_count"); + rootContext.unregisterGauge("max_file_descriptor_count"); + rootContext.unregisterGauge("connection_drop_probability"); + + rootContext.unregisterGauge("last_client_response_size"); + rootContext.unregisterGauge("max_client_response_size"); + rootContext.unregisterGauge("min_client_response_size"); + + rootContext.unregisterGauge("auth_failed_count"); + rootContext.unregisterGauge("non_mtls_remote_conn_count"); + rootContext.unregisterGauge("non_mtls_local_conn_count"); + + rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE); + rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE); + rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE); + rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE); + } + + /** + * Hook into admin server, useful to expose additional data + * that do not represent metrics. + * + * @param response a sink which collects the data. + */ + public void dumpMonitorValues(BiConsumer response) { + ServerStats stats = serverStats(); + response.accept("version", Version.getFullVersion()); + response.accept("server_state", stats.getServerState()); + } + + /** + * Grant or deny authorization to an operation on a node as a function of: + * @param cnxn : the server connection + * @param acl : set of ACLs for the node + * @param perm : the permission that the client is requesting + * @param ids : the credentials supplied by the client + * @param path : the ZNode path + * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. + */ + public void checkACL(ServerCnxn cnxn, List acl, int perm, List ids, String path, List setAcls) throws KeeperException.NoAuthException { + if (skipACL) { + return; + } + + LOG.debug("Permission requested: {} ", perm); + LOG.debug("ACLs for node: {}", acl); + LOG.debug("Client credentials: {}", ids); + + if (acl == null || acl.size() == 0) { + return; + } + for (Id authId : ids) { + if (authId.getScheme().equals("super")) { + return; + } + } + for (ACL a : acl) { + Id id = a.getId(); + if ((a.getPerms() & perm) != 0) { + if (id.getScheme().equals("world") && id.getId().equals("anyone")) { + return; + } + ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); + if (ap != null) { + for (Id authId : ids) { + if (authId.getScheme().equals(id.getScheme()) + && ap.matches( + new ServerAuthenticationProvider.ServerObjs(this, cnxn), + new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { + return; + } + } + } + } + } + throw new KeeperException.NoAuthException(); + } + + /** + * check a path whether exceeded the quota. + * + * @param path + * the path of the node, used for the quota prefix check + * @param lastData + * the current node data, {@code null} for none + * @param data + * the data to be set, or {@code null} for none + * @param type + * currently, create and setData need to check quota + */ + public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException { + if (!enforceQuota) { + return; + } + long dataBytes = (data == null) ? 0 : data.length; + ZKDatabase zkDatabase = getZKDatabase(); + String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path); + if (StringUtils.isEmpty(lastPrefix)) { + return; + } + + final String namespace = PathUtils.getTopNamespace(path); + switch (type) { + case OpCode.create: + checkQuota(lastPrefix, dataBytes, 1, namespace); + break; + case OpCode.setData: + checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace); + break; + default: + throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type); + } + } + + /** + * check a path whether exceeded the quota. + * + * @param lastPrefix + the path of the node which has a quota. + * @param bytesDiff + * the diff to be added to number of bytes + * @param countDiff + * the diff to be added to the count + * @param namespace + * the namespace for collecting quota exceeded errors + */ + private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace) + throws KeeperException.QuotaExceededException { + LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff); + + // now check the quota we set + String limitNode = Quotas.limitPath(lastPrefix); + DataNode node = getZKDatabase().getNode(limitNode); + StatsTrack limitStats; + if (node == null) { + // should not happen + LOG.error("Missing limit node for quota {}", limitNode); + return; + } + synchronized (node) { + limitStats = new StatsTrack(node.data); + } + //check the quota + boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1); + boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1); + + if (!checkCountQuota && !checkByteQuota) { + return; + } + + //check the statPath quota + String statNode = Quotas.statPath(lastPrefix); + node = getZKDatabase().getNode(statNode); + + StatsTrack currentStats; + if (node == null) { + // should not happen + LOG.error("Missing node for stat {}", statNode); + return; + } + synchronized (node) { + currentStats = new StatsTrack(node.data); + } + + //check the Count Quota + if (checkCountQuota) { + long newCount = currentStats.getCount() + countDiff; + boolean isCountHardLimit = limitStats.getCountHardLimit() > -1; + long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount(); + + if (newCount > countLimit) { + String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]"; + RATE_LOGGER.rateLimitLog(msg); + if (isCountHardLimit) { + updateQuotaExceededMetrics(namespace); + throw new KeeperException.QuotaExceededException(lastPrefix); + } + } + } + + //check the Byte Quota + if (checkByteQuota) { + long newBytes = currentStats.getBytes() + bytesDiff; + boolean isByteHardLimit = limitStats.getByteHardLimit() > -1; + long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes(); + if (newBytes > byteLimit) { + String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]"; + RATE_LOGGER.rateLimitLog(msg); + if (isByteHardLimit) { + updateQuotaExceededMetrics(namespace); + throw new KeeperException.QuotaExceededException(lastPrefix); + } + } + } + } + + public static boolean isDigestEnabled() { + return digestEnabled; + } + + public static void setDigestEnabled(boolean digestEnabled) { + LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); + ZooKeeperServer.digestEnabled = digestEnabled; + } + + /** + * Trim a path to get the immediate predecessor. + * + * @param path + * @return + * @throws KeeperException.BadArgumentsException + */ + private String parentPath(String path) throws KeeperException.BadArgumentsException { + int lastSlash = path.lastIndexOf('/'); + if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { + throw new KeeperException.BadArgumentsException(path); + } + return lastSlash == 0 ? "/" : path.substring(0, lastSlash); + } + + private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { + boolean mustCheckACL = false; + String path = null; + List acl = null; + + switch (request.type) { + case OpCode.create: + case OpCode.create2: { + CreateRequest req = new CreateRequest(); + if (buffer2Record(request.request, req)) { + mustCheckACL = true; + acl = req.getAcl(); + path = parentPath(req.getPath()); + } + break; + } + case OpCode.delete: { + DeleteRequest req = new DeleteRequest(); + if (buffer2Record(request.request, req)) { + path = parentPath(req.getPath()); + } + break; + } + case OpCode.setData: { + SetDataRequest req = new SetDataRequest(); + if (buffer2Record(request.request, req)) { + path = req.getPath(); + } + break; + } + case OpCode.setACL: { + SetACLRequest req = new SetACLRequest(); + if (buffer2Record(request.request, req)) { + mustCheckACL = true; + acl = req.getAcl(); + path = req.getPath(); + } + break; + } + } + + if (mustCheckACL) { + /* we ignore the extrapolated ACL returned by fixupACL because + * we only care about it being well-formed (and if it isn't, an + * exception will be raised). + */ + PrepRequestProcessor.fixupACL(path, request.authInfo, acl); + } + + return path; + } + + private int effectiveACLPerms(Request request) { + switch (request.type) { + case OpCode.create: + case OpCode.create2: + return ZooDefs.Perms.CREATE; + case OpCode.delete: + return ZooDefs.Perms.DELETE; + case OpCode.setData: + return ZooDefs.Perms.WRITE; + case OpCode.setACL: + return ZooDefs.Perms.ADMIN; + default: + return ZooDefs.Perms.ALL; + } + } + + /** + * Check Write Requests for Potential Access Restrictions + *

+ * Before a request is being proposed to the quorum, lets check it + * against local ACLs. Non-write requests (read, session, etc.) + * are passed along. Invalid requests are sent a response. + *

+ * While we are at it, if the request will set an ACL: make sure it's + * a valid one. + * + * @param request + * @return true if request is permitted, false if not. + */ + public boolean authWriteRequest(Request request) { + int err; + String pathToCheck; + + if (!enableEagerACLCheck) { + return true; + } + + err = Code.OK.intValue(); + + try { + pathToCheck = effectiveACLPath(request); + if (pathToCheck != null) { + checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); + } + } catch (KeeperException.NoAuthException e) { + LOG.debug("Request failed ACL check", e); + err = e.code().intValue(); + } catch (KeeperException.InvalidACLException e) { + LOG.debug("Request has an invalid ACL check", e); + err = e.code().intValue(); + } catch (KeeperException.NoNodeException e) { + LOG.debug("ACL check against non-existent node: {}", e.getMessage()); + } catch (KeeperException.BadArgumentsException e) { + LOG.debug("ACL check against illegal node path: {}", e.getMessage()); + } catch (Throwable t) { + LOG.error("Uncaught exception in authWriteRequest with: ", t); + throw t; + } finally { + if (err != Code.OK.intValue()) { + /* This request has a bad ACL, so we are dismissing it early. */ + decInProcess(); + ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); + try { + request.cnxn.sendResponse(rh, null, null); + } catch (IOException e) { + LOG.error("IOException : {}", e); + } + } + } + + return err == Code.OK.intValue(); + } + + private boolean buffer2Record(ByteBuffer request, Record record) { + boolean rv = false; + try { + ByteBufferInputStream.byteBuffer2Record(request, record); + request.rewind(); + rv = true; + } catch (IOException ex) { + } + + return rv; + } + + public int getOutstandingHandshakeNum() { + if (serverCnxnFactory instanceof NettyServerCnxnFactory) { + return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); + } else { + return 0; + } + } + + public boolean isReconfigEnabled() { + return this.reconfigEnabled; + } + + public ZooKeeperServerShutdownHandler getZkShutdownHandler() { + return zkShutdownHandler; + } + + static void updateQuotaExceededMetrics(final String namespace) { + if (namespace == null) { + return; + } + ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1); + } +} + diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java new file mode 100644 index 00000000000..fd2ea277a40 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.server.ContainerManager; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import javax.management.JMException; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * + * Just like the standard ZooKeeperServer. We just replace the request + * processors: PrepRequestProcessor -> ProposalRequestProcessor -> + * CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> + * FinalRequestProcessor + */ +public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + + private ContainerManager containerManager; // guarded by sync + + CommitProcessor commitProcessor; + + PrepRequestProcessor prepRequestProcessor; + + /** + * @throws IOException + */ + public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); + } + + public Leader getLeader() { + return self.leader; + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor.start(); + ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); + proposalProcessor.initialize(); + prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); + prepRequestProcessor.start(); + firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); + + setupContainerManager(); + } + + private synchronized void setupContainerManager() { + containerManager = new ContainerManager( + getZKDatabase(), + prepRequestProcessor, + Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), + Integer.getInteger("znode.container.maxPerMinute", 10000), + Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) + ); + } + + @Override + public synchronized void startup() { + super.startup(); + if (containerManager != null) { + containerManager.start(); + } + } + + @Override + protected void registerMetrics() { + super.registerMetrics(); + + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + rootContext.registerGauge("learners", gaugeWithLeader( + (leader) -> leader.getLearners().size()) + ); + rootContext.registerGauge("synced_followers", gaugeWithLeader( + (leader) -> leader.getForwardingFollowers().size() + )); + rootContext.registerGauge("synced_non_voting_followers", gaugeWithLeader( + (leader) -> leader.getNonVotingFollowers().size() + )); + rootContext.registerGauge("synced_observers", self::getSynced_observers_metric); + rootContext.registerGauge("pending_syncs", gaugeWithLeader( + (leader) -> leader.getNumPendingSyncs() + )); + rootContext.registerGauge("leader_uptime", gaugeWithLeader( + (leader) -> leader.getUptime() + )); + rootContext.registerGauge("last_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getLastBufferSize() + )); + rootContext.registerGauge("max_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getMaxBufferSize() + )); + rootContext.registerGauge("min_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getMinBufferSize() + )); + } + + private org.apache.zookeeper.metrics.Gauge gaugeWithLeader(Function supplier) { + return () -> { + final Leader leader = getLeader(); + if (leader == null) { + return null; + } + return supplier.apply(leader); + }; + } + + @Override + protected void unregisterMetrics() { + super.unregisterMetrics(); + + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + rootContext.unregisterGauge("learners"); + rootContext.unregisterGauge("synced_followers"); + rootContext.unregisterGauge("synced_non_voting_followers"); + rootContext.unregisterGauge("synced_observers"); + rootContext.unregisterGauge("pending_syncs"); + rootContext.unregisterGauge("leader_uptime"); + + rootContext.unregisterGauge("last_proposal_size"); + rootContext.unregisterGauge("max_proposal_size"); + rootContext.unregisterGauge("min_proposal_size"); + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (containerManager != null) { + containerManager.stop(); + } + super.shutdown(fullyShutDown); + } + + @Override + public int getGlobalOutstandingLimit() { + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor; + return globalOutstandingLimit; + } + + @Override + public void createSessionTracker() { + sessionTracker = new LeaderSessionTracker( + this, + getZKDatabase().getSessionWithTimeOuts(), + tickTime, + self.getId(), + self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + public boolean touch(long sess, int to) { + return sessionTracker.touchSession(sess, to); + } + + public boolean checkIfValidGlobalSession(long sess, int to) { + if (self.areLocalSessionsEnabled() && !upgradeableSessionTracker.isGlobalSession(sess)) { + return false; + } + return sessionTracker.touchSession(sess, to); + } + + /** + * Requests coming from the learner should go directly to + * PrepRequestProcessor + * + * @param request + */ + public void submitLearnerRequest(Request request) { + /* + * Requests coming from the learner should have gone through + * submitRequest() on each server which already perform some request + * validation, so we don't need to do it again. + * + * Additionally, LearnerHandler should start submitting requests into + * the leader's pipeline only when the leader's server is started, so we + * can submit the request directly into PrepRequestProcessor. + * + * This is done so that requests from learners won't go through + * LeaderRequestProcessor which perform local session upgrade. + */ + prepRequestProcessor.processRequest(request); + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(LeaderBean leaderBean, LocalPeerBean localPeerBean) { + // register with JMX + if (self.jmxLeaderElectionBean != null) { + try { + MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + self.jmxLeaderElectionBean = null; + } + + try { + jmxServerBean = leaderBean; + MBeanRegistry.getInstance().register(leaderBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + boolean registerJMX(LearnerHandlerBean handlerBean) { + try { + MBeanRegistry.getInstance().register(handlerBean, jmxServerBean); + return true; + } catch (JMException e) { + LOG.warn("Could not register connection", e); + } + return false; + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(Leader leader) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public String getState() { + return "leader"; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { + super.revalidateSession(cnxn, sessionId, sessionTimeout); + try { + // setowner as the leader itself, unless updated + // via the follower handlers + setOwner(sessionId, ServerCnxn.me); + } catch (SessionExpiredException e) { + // this is ok, it just means that the session revalidation failed. + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java new file mode 100644 index 00000000000..8e80fae57dc --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLSocket; +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.server.ExitCode; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.TxnLogEntry; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ConfigUtils; +import org.apache.zookeeper.server.util.MessageTracker; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnDigest; +import org.apache.zookeeper.txn.TxnHeader; +import org.apache.zookeeper.util.ServiceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the superclass of two of the three main actors in a ZK + * ensemble: Followers and Observers. Both Followers and Observers share + * a good deal of code which is moved into Peer to avoid duplication. + */ +public class Learner { + + static class PacketInFlight { + + TxnHeader hdr; + Record rec; + TxnDigest digest; + + } + + QuorumPeer self; + LearnerZooKeeperServer zk; + + protected BufferedOutputStream bufferedOutput; + + protected Socket sock; + protected MultipleAddresses leaderAddr; + protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false); + + /** + * Socket getter + */ + public Socket getSocket() { + return sock; + } + + LearnerSender sender = null; + protected InputArchive leaderIs; + protected OutputArchive leaderOs; + /** the protocol version of the leader */ + protected int leaderProtocolVersion = 0x01; + + private static final int BUFFERED_MESSAGE_SIZE = 10; + protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); + + /** + * Time to wait after connection attempt with the Leader or LearnerMaster before this + * Learner tries to connect again. + */ + private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100); + + private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + + public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending"; + private static boolean asyncSending = + Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING)); + public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync"; + public static final boolean closeSocketAsync = Boolean + .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC)); + + static { + LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); + LOG.info("TCP NoDelay set to: {}", nodelay); + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); + } + + final ConcurrentHashMap pendingRevalidations = new ConcurrentHashMap(); + + public int getPendingRevalidationsCount() { + return pendingRevalidations.size(); + } + + // for testing + protected static void setAsyncSending(boolean newMode) { + asyncSending = newMode; + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + + } + protected static boolean getAsyncSending() { + return asyncSending; + } + /** + * validate a session for a client + * + * @param clientId + * the client to be revalidated + * @param timeout + * the timeout for which the session is valid + * @throws IOException + */ + void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { + LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(clientId); + dos.writeInt(timeout); + dos.close(); + QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); + pendingRevalidations.put(clientId, cnxn); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "To validate session 0x" + Long.toHexString(clientId)); + } + writePacket(qp, true); + } + + /** + * write a packet to the leader. + * + * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. + * When packets are sent synchronously, writing is done within a synchronization block. + * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. + * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. + * So we have only one thread writing to leaderOs at a time in either case. + * + * @param pp + * the proposal packet to be sent to the leader + * @throws IOException + */ + void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (asyncSending) { + sender.queuePacket(pp); + } else { + writePacketNow(pp, flush); + } + } + + void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { + synchronized (leaderOs) { + if (pp != null) { + messageTracker.trackSent(pp.getType()); + leaderOs.writeRecord(pp, "packet"); + } + if (flush) { + bufferedOutput.flush(); + } + } + } + + /** + * Start thread that will forward any packet in the queue to the leader + */ + protected void startSendingThread() { + sender = new LearnerSender(this); + sender.start(); + } + + /** + * read a packet from the leader + * + * @param pp + * the packet to be instantiated + * @throws IOException + */ + void readPacket(QuorumPacket pp) throws IOException { + synchronized (leaderIs) { + leaderIs.readRecord(pp, "packet"); + messageTracker.trackReceived(pp.getType()); + } + if (LOG.isTraceEnabled()) { + final long traceMask = + (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK + : ZooTrace.SERVER_PACKET_TRACE_MASK; + + ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); + } + } + + /** + * send a request packet to the leader + * + * @param request + * the request from the client + * @throws IOException + */ + void request(Request request) throws IOException { + if (request.isThrottled()) { + LOG.error("Throttled request sent to leader: {}. Exiting", request); + ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeLong(request.sessionId); + oa.writeInt(request.cxid); + oa.writeInt(request.type); + if (request.request != null) { + request.request.rewind(); + int len = request.request.remaining(); + byte[] b = new byte[len]; + request.request.get(b); + request.request.rewind(); + oa.write(b); + } + oa.close(); + QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); + writePacket(qp, true); + } + + /** + * Returns the address of the node we think is the leader. + */ + protected QuorumServer findLeader() { + QuorumServer leaderServer = null; + // Find the leader by id + Vote current = self.getCurrentVote(); + for (QuorumServer s : self.getView().values()) { + if (s.id == current.getId()) { + // Ensure we have the leader's correct IP address before + // attempting to connect. + s.recreateSocketAddresses(); + leaderServer = s; + break; + } + } + if (leaderServer == null) { + LOG.warn("Couldn't find the leader with id = {}", current.getId()); + } + return leaderServer; + } + + /** + * Overridable helper method to return the System.nanoTime(). + * This method behaves identical to System.nanoTime(). + */ + protected long nanoTime() { + return System.nanoTime(); + } + + /** + * Overridable helper method to simply call sock.connect(). This can be + * overriden in tests to fake connection success/failure for connectToLeader. + */ + protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException { + sock.connect(addr, timeout); + } + + /** + * Establish a connection with the LearnerMaster found by findLearnerMaster. + * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. + * Retries until either initLimit time has elapsed or 5 tries have happened. + * @param multiAddr - the address of the Peer to connect to. + * @throws IOException - if the socket connection fails on the 5th attempt + * if there is an authentication failure while connecting to leader + */ + protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException { + + this.leaderAddr = multiAddr; + Set addresses; + if (self.isMultiAddressReachabilityCheckEnabled()) { + // even if none of the addresses are reachable, we want to try to establish connection + // see ZOOKEEPER-3758 + addresses = multiAddr.getAllReachableAddressesOrAll(); + } else { + addresses = multiAddr.getAllAddresses(); + } + ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); + CountDownLatch latch = new CountDownLatch(addresses.size()); + AtomicReference socket = new AtomicReference<>(null); + addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit); + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while trying to connect to Leader", e); + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.error("not all the LeaderConnector terminated properly"); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while terminating LeaderConnector executor.", ie); + } + } + + if (socket.get() == null) { + throw new IOException("Failed connect to " + multiAddr); + } else { + sock = socket.get(); + sockBeingClosed.set(false); + } + + self.authLearner.authenticate(sock, hostname); + + leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); + bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); + leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + if (asyncSending) { + startSendingThread(); + } + } + + class LeaderConnector implements Runnable { + + private AtomicReference socket; + private InetSocketAddress address; + private CountDownLatch latch; + + LeaderConnector(InetSocketAddress address, AtomicReference socket, CountDownLatch latch) { + this.address = address; + this.socket = socket; + this.latch = latch; + } + + @Override + public void run() { + try { + Thread.currentThread().setName("LeaderConnector-" + address); + Socket sock = connectToLeader(); + + if (sock != null && sock.isConnected()) { + if (socket.compareAndSet(null, sock)) { + LOG.info("Successfully connected to leader, using address: {}", address); + } else { + LOG.info("Connection to the leader is already established, close the redundant connection"); + sock.close(); + } + } + + } catch (Exception e) { + LOG.error("Failed connect to {}", address, e); + } finally { + latch.countDown(); + } + } + + private Socket connectToLeader() throws IOException, X509Exception, InterruptedException { + Socket sock = createSocket(); + + // leader connection timeout defaults to tickTime * initLimit + int connectTimeout = self.tickTime * self.initLimit; + + // but if connectToLearnerMasterLimit is specified, use that value to calculate + // timeout instead of using the initLimit value + if (self.connectToLearnerMasterLimit > 0) { + connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; + } + + int remainingTimeout; + long startNanoTime = nanoTime(); + + for (int tries = 0; tries < 5 && socket.get() == null; tries++) { + try { + // recalculate the init limit time because retries sleep for 1000 milliseconds + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + if (remainingTimeout <= 0) { + LOG.error("connectToLeader exceeded on retries."); + throw new IOException("connectToLeader exceeded on retries."); + } + + sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout)); + if (self.isSslQuorum()) { + ((SSLSocket) sock).startHandshake(); + } + sock.setTcpNoDelay(nodelay); + break; + } catch (IOException e) { + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + + if (remainingTimeout <= leaderConnectDelayDuringRetryMs) { + LOG.error( + "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else if (tries >= 4) { + LOG.error( + "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else { + LOG.warn( + "Unexpected exception, tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + sock = createSocket(); + } + } + Thread.sleep(leaderConnectDelayDuringRetryMs); + } + + return sock; + } + } + + /** + * Creating a simple or and SSL socket. + * This can be overridden in tests to fake already connected sockets for connectToLeader. + */ + protected Socket createSocket() throws X509Exception, IOException { + Socket sock; + if (self.isSslQuorum()) { + sock = self.getX509Util().createSSLSocket(); + } else { + sock = new Socket(); + } + sock.setSoTimeout(self.tickTime * self.initLimit); + return sock; + } + + /** + * Once connected to the leader or learner master, perform the handshake + * protocol to establish a following / observing connection. + * @param pktType + * @return the zxid the Leader sends for synchronization purposes. + * @throws IOException + */ + protected long registerWithLeader(int pktType) throws IOException { + /* + * Send follower info, including last zxid and sid + */ + long lastLoggedZxid = self.getLastLoggedZxid(); + QuorumPacket qp = new QuorumPacket(); + qp.setType(pktType); + qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); + + /* + * Add sid to payload + */ + LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); + ByteArrayOutputStream bsid = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); + boa.writeRecord(li, "LearnerInfo"); + qp.setData(bsid.toByteArray()); + + writePacket(qp, true); + readPacket(qp); + final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + if (qp.getType() == Leader.LEADERINFO) { + // we are connected to a 1.0 server so accept the new epoch and read the next packet + leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); + byte[] epochBytes = new byte[4]; + final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); + if (newEpoch > self.getAcceptedEpoch()) { + wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); + self.setAcceptedEpoch(newEpoch); + } else if (newEpoch == self.getAcceptedEpoch()) { + // since we have already acked an epoch equal to the leaders, we cannot ack + // again, but we still need to send our lastZxid to the leader so that we can + // sync with it if it does assume leadership of the epoch. + // the -1 indicates that this reply should not count as an ack for the new epoch + wrappedEpochBytes.putInt(-1); + } else { + throw new IOException("Leaders epoch, " + + newEpoch + + " is less than accepted epoch, " + + self.getAcceptedEpoch()); + } + QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); + writePacket(ackNewEpoch, true); + return ZxidUtils.makeZxid(newEpoch, 0); + } else { + if (newEpoch > self.getAcceptedEpoch()) { + self.setAcceptedEpoch(newEpoch); + } + if (qp.getType() != Leader.NEWLEADER) { + LOG.error("First packet should have been NEWLEADER"); + throw new IOException("First packet should have been NEWLEADER"); + } + return qp.getZxid(); + } + } + + /** + * Finally, synchronize our history with the Leader (if Follower) + * or the LearnerMaster (if Observer). + * @param newLeaderZxid + * @throws IOException + * @throws InterruptedException + */ + protected void syncWithLeader(long newLeaderZxid) throws Exception { + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); + QuorumPacket qp = new QuorumPacket(); + long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); + + QuorumVerifier newLeaderQV = null; + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + boolean syncSnapshot = false; + readPacket(qp); + Deque packetsCommitted = new ArrayDeque<>(); + Deque packetsNotLogged = new ArrayDeque<>(); + Deque packetsNotCommitted = new ArrayDeque<>(); + synchronized (zk) { + if (qp.getType() == Leader.DIFF) { + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + self.setSyncMode(QuorumPeer.SyncMode.DIFF); + if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { + LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); + snapshotNeeded = true; + syncSnapshot = true; + } else { + snapshotNeeded = false; + } + } else if (qp.getType() == Leader.SNAP) { + self.setSyncMode(QuorumPeer.SyncMode.SNAP); + LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); + // The leader is going to dump the database + // db is clear as part of deserializeSnapshot() + zk.getZKDatabase().deserializeSnapshot(leaderIs); + // ZOOKEEPER-2819: overwrite config node content extracted + // from leader snapshot with local config, to avoid potential + // inconsistency of config node content during rolling restart. + if (!self.isReconfigEnabled()) { + LOG.debug("Reset config node content from local config after deserialization of snapshot."); + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + } + String signature = leaderIs.readString("signature"); + if (!signature.equals("BenWasHere")) { + LOG.error("Missing signature. Got {}", signature); + throw new IOException("Missing signature"); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; + } else if (qp.getType() == Leader.TRUNC) { + //we need to truncate the log to the lastzxid of the leader + self.setSyncMode(QuorumPeer.SyncMode.TRUNC); + LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); + boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); + if (!truncated) { + // not able to truncate the log + LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + } else { + LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + zk.createSessionTracker(); + + long lastQueued = 0; + + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) + // we need to make sure that we don't take the snapshot twice. + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; + TxnLogEntry logEntry; + // we are now going to start getting transactions to apply followed by an UPTODATE + outerLoop: + while (self.isRunning()) { + readPacket(qp); + switch (qp.getType()) { + case Leader.PROPOSAL: + PacketInFlight pif = new PacketInFlight(); + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + pif.hdr = logEntry.getHeader(); + pif.rec = logEntry.getTxn(); + pif.digest = logEntry.getDigest(); + if (pif.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(pif.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = pif.hdr.getZxid(); + + if (pif.hdr.getType() == OpCode.reconfig) { + SetDataTxn setDataTxn = (SetDataTxn) pif.rec; + QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + } + + packetsNotLogged.add(pif); + packetsNotCommitted.add(pif); + break; + case Leader.COMMIT: + case Leader.COMMITANDACTIVATE: + pif = packetsNotCommitted.peekFirst(); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); + } else { + if (qp.getType() == Leader.COMMITANDACTIVATE) { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig( + qv, + ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), + true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (!writeToTxnLog) { + zk.processTxn(pif.hdr, pif.rec); + packetsNotLogged.remove(); + packetsNotCommitted.remove(); + } else { + packetsNotCommitted.remove(); + packetsCommitted.add(qp.getZxid()); + } + } + break; + case Leader.INFORM: + case Leader.INFORMANDACTIVATE: + PacketInFlight packet = new PacketInFlight(); + + if (qp.getType() == Leader.INFORMANDACTIVATE) { + ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); + long suggestedLeaderId = buffer.getLong(); + byte[] remainingdata = new byte[buffer.remaining()]; + buffer.get(remainingdata); + logEntry = SerializeUtils.deserializeTxn(remainingdata); + packet.hdr = logEntry.getHeader(); + packet.rec = logEntry.getTxn(); + packet.digest = logEntry.getDigest(); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } else { + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + packet.rec = logEntry.getTxn(); + packet.hdr = logEntry.getHeader(); + packet.digest = logEntry.getDigest(); + // Log warning message if txn comes out-of-order + if (packet.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(packet.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = packet.hdr.getZxid(); + } + if (!writeToTxnLog) { + // Apply to db directly if we haven't taken the snapshot + zk.processTxn(packet.hdr, packet.rec); + } else { + packetsNotLogged.add(packet); + packetsCommitted.add(qp.getZxid()); + } + + break; + case Leader.UPTODATE: + LOG.info("Learner received UPTODATE message"); + if (newLeaderQV != null) { + boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (isPreZAB1_0) { + zk.takeSnapshot(syncSnapshot); + self.setCurrentEpoch(newEpoch); + } + self.setZooKeeperServer(zk); + self.adminServer.setZooKeeperServer(zk); + break outerLoop; + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 + LOG.info("Learner received NEWLEADER message"); + if (qp.getData() != null && qp.getData().length > 1) { + try { + QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + newLeaderQV = qv; + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (snapshotNeeded) { + zk.takeSnapshot(syncSnapshot); + } + + self.setCurrentEpoch(newEpoch); + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); + } + + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } + break; + } + } + } + ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); + writePacket(ack, true); + zk.startServing(); + /* + * Update the election vote here to ensure that all members of the + * ensemble report the same vote to new servers that start up and + * send leader election notifications to the ensemble. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 + */ + self.updateElectionVote(newEpoch); + + // We need to log the stuff that came in between the snapshot and the uptodate + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : packetsCommitted) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + // Similar to follower, we need to log requests between the snapshot + // and UPTODATE + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + Long zxid = packetsCommitted.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + packetsCommitted.remove(); + Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); + request.setTxn(p.rec); + request.setHdr(p.hdr); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + protected void revalidate(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long sessionId = dis.readLong(); + boolean valid = dis.readBoolean(); + ServerCnxn cnxn = pendingRevalidations.remove(sessionId); + if (cnxn == null) { + LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId)); + } else { + zk.finishSessionInit(cnxn, valid); + } + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid); + } + } + + protected void ping(QuorumPacket qp) throws IOException { + // Send back the ping with our session data + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + Map touchTable = zk.getTouchSnapshot(); + for (Entry entry : touchTable.entrySet()) { + dos.writeLong(entry.getKey()); + dos.writeInt(entry.getValue()); + } + + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); + writePacket(pingReply, true); + } + + /** + * Shutdown the Peer + */ + public void shutdown() { + self.setZooKeeperServer(null); + self.closeAllConnections(); + self.adminServer.setZooKeeperServer(null); + + if (sender != null) { + sender.shutdown(); + } + + closeSocket(); + // shutdown previous zookeeper + if (zk != null) { + // If we haven't finished SNAP sync, force fully shutdown + // to avoid potential inconsistency + zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP)); + } + } + + boolean isRunning() { + return self.isRunning() && zk.isRunning(); + } + + void closeSocket() { + if (sock != null) { + if (sockBeingClosed.compareAndSet(false, true)) { + if (closeSocketAsync) { + final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId()); + closingThread.setDaemon(true); + closingThread.start(); + } else { + closeSockSync(); + } + } + } + } + + void closeSockSync() { + try { + long startTime = Time.currentElapsedTime(); + if (sock != null) { + sock.close(); + sock = null; + } + ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime); + } catch (IOException e) { + LOG.warn("Ignoring error closing connection to leader", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java new file mode 100644 index 00000000000..990f75bee9e --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServerBean; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Parent class for all ZooKeeperServers for Learners + */ +public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { + + /* + * Request processors + */ + protected CommitProcessor commitProcessor; + protected SyncRequestProcessor syncProcessor; + + public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) throws IOException { + super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self); + } + + /** + * Abstract method to return the learner associated with this server. + * Since the Learner may change under our feet (when QuorumPeer reassigns + * it) we can't simply take a reference here. Instead, we need the + * subclasses to implement this. + */ + public abstract Learner getLearner(); + + /** + * Returns the current state of the session tracker. This is only currently + * used by a Learner to build a ping response packet. + * + */ + protected Map getTouchSnapshot() { + if (sessionTracker != null) { + return ((LearnerSessionTracker) sessionTracker).snapshot(); + } + Map map = Collections.emptyMap(); + return map; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, + getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, + self.getId(), + self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + @Override + protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { + if (upgradeableSessionTracker.isLocalSession(sessionId)) { + super.revalidateSession(cnxn, sessionId, sessionTimeout); + } else { + getLearner().validateSession(cnxn, sessionId, sessionTimeout); + } + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) { + // register with JMX + if (self.jmxLeaderElectionBean != null) { + try { + MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + self.jmxLeaderElectionBean = null; + } + + try { + jmxServerBean = serverBean; + MBeanRegistry.getInstance().register(serverBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(Learner peer) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (!canShutdown()) { + LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); + } + else { + LOG.info("Shutting down"); + try { + if (syncProcessor != null) { + // Shutting down the syncProcessor here, first, ensures queued transactions here are written to + // permanent storage, which ensures that crash recovery data is consistent with what is used for a + // leader election immediately following shutdown, because of the old leader going down; and also + // that any state on its way to being written is also loaded in the potential call to + // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader + // that contains entries we have already written to our transaction log. + syncProcessor.shutdown(); + } + } + catch (Exception e) { + LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); + } + } + try { + super.shutdown(fullyShutDown); + } catch (Exception e) { + LOG.warn("Ignoring unexpected exception during shutdown", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java new file mode 100644 index 00000000000..b71720aec89 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; + +/** + * A ZooKeeperServer for the Observer node type. Not much is different, but + * we anticipate specializing the request processors in the future. + * + */ +public class ObserverZooKeeperServer extends LearnerZooKeeperServer { + + private static final Logger LOG = LoggerFactory.getLogger(ObserverZooKeeperServer.class); + + /** + * Enable since request processor for writing txnlog to disk and + * take periodic snapshot. Default is ON. + */ + + private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled(); + + /* + * Pending sync requests + */ ConcurrentLinkedQueue pendingSyncs = new ConcurrentLinkedQueue(); + + ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); + LOG.info("syncEnabled ={}", syncRequestProcessorEnabled); + } + + public Observer getObserver() { + return self.observer; + } + + @Override + public Learner getLearner() { + return self.observer; + } + + /** + * Unlike a Follower, which sees a full request only during the PROPOSAL + * phase, Observers get all the data required with the INFORM packet. + * This method commits a request that has been unpacked by from an INFORM + * received from the Leader. + * + * @param request + */ + public void commitRequest(Request request) { + if (syncProcessor != null) { + // Write to txnlog and take periodic snapshot + syncProcessor.processRequest(request); + } + commitProcessor.commit(request); + } + + /** + * Set up the request processors for an Observer: + * firstProcesor->commitProcessor->finalProcessor + */ + @Override + protected void setupRequestProcessors() { + // We might consider changing the processor behaviour of + // Observers to, for example, remove the disk sync requirements. + // Currently, they behave almost exactly the same as followers. + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + firstProcessor = new ObserverRequestProcessor(this, commitProcessor); + ((ObserverRequestProcessor) firstProcessor).start(); + + /* + * Observer should write to disk, so that the it won't request + * too old txn from the leader which may lead to getting an entire + * snapshot. + * + * However, this may degrade performance as it has to write to disk + * and do periodic snapshot which may double the memory requirements + */ + if (syncRequestProcessorEnabled) { + syncProcessor = new SyncRequestProcessor(this, null); + syncProcessor.start(); + } + else { + syncProcessor = null; + } + } + + /* + * Process a sync request + */ + public synchronized void sync() { + if (pendingSyncs.size() == 0) { + LOG.warn("Not expecting a sync."); + return; + } + + Request r = pendingSyncs.remove(); + commitProcessor.commit(r); + } + + @Override + public String getState() { + return "observer"; + } + + @Override + public void dumpMonitorValues(BiConsumer response) { + super.dumpMonitorValues(response); + response.accept("observer_master_id", getObserver().getLearnerMasterId()); + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java new file mode 100644 index 00000000000..b1403ecd59b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerBean; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A ZooKeeperServer which comes into play when peer is partitioned from the + * majority. Handles read-only clients, but drops connections from not-read-only + * ones. + *

+ * The very first processor in the chain of request processors is a + * ReadOnlyRequestProcessor which drops state-changing requests. + */ +public class ReadOnlyZooKeeperServer extends ZooKeeperServer { + + protected final QuorumPeer self; + private volatile boolean shutdown = false; + + ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) { + super( + logFactory, + self.tickTime, + self.minSessionTimeout, + self.maxSessionTimeout, + self.clientPortListenBacklog, + zkDb, + self.getInitialConfig(), + self.isReconfigEnabled()); + this.self = self; + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor); + ((PrepRequestProcessor) prepProcessor).start(); + firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor); + ((ReadOnlyRequestProcessor) firstProcessor).start(); + } + + @Override + public synchronized void startup() { + // check to avoid startup follows shutdown + if (shutdown) { + LOG.warn("Not starting Read-only server as startup follows shutdown!"); + return; + } + registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean); + super.startup(); + self.setZooKeeperServer(this); + self.adminServer.setZooKeeperServer(this); + LOG.info("Read-only server started"); + } + + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, self.getId(), self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + @Override + protected void startSessionTracker() { + ((LearnerSessionTracker) sessionTracker).start(); + } + + @Override + protected void setLocalSessionFlag(Request si) { + switch (si.type) { + case OpCode.createSession: + if (self.areLocalSessionsEnabled()) { + si.setLocalSession(true); + } + break; + case OpCode.closeSession: + if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) { + si.setLocalSession(true); + } else { + LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode", + Long.toHexString(si.sessionId)); + } + break; + default: + break; + } + } + + @Override + protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException { + if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) { + String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress(); + LOG.info(msg); + throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE); + } + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) { + // register with JMX + try { + jmxServerBean = serverBean; + MBeanRegistry.getInstance().register(serverBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(ZooKeeperServer zks) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public String getState() { + return "read-only"; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (!canShutdown()) { + super.shutdown(fullyShutDown); + LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); + } + else { + shutdown = true; + unregisterJMX(this); + + // set peer's server to null + self.setZooKeeperServer(null); + // clear all the connections + self.closeAllConnections(); + + self.adminServer.setZooKeeperServer(null); + } + // shutdown the server itself + super.shutdown(fullyShutDown); + } + + @Override + public void dumpConf(PrintWriter pwriter) { + super.dumpConf(pwriter); + + pwriter.print("initLimit="); + pwriter.println(self.getInitLimit()); + pwriter.print("syncLimit="); + pwriter.println(self.getSyncLimit()); + pwriter.print("electionAlg="); + pwriter.println(self.getElectionType()); + pwriter.print("electionPort="); + pwriter.println(self.getElectionAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining("|"))); + pwriter.print("quorumPort="); + pwriter.println(self.getQuorumAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining("|"))); + pwriter.print("peerType="); + pwriter.println(self.getLearnerType().ordinal()); + } + + @Override + protected void setState(State state) { + this.state = state; + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java new file mode 100644 index 00000000000..3b7a9dfc331 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Flushable; +import java.io.IOException; +import java.net.Socket; + +public class SendAckRequestProcessor implements RequestProcessor, Flushable { + + private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); + + Learner learner; + + SendAckRequestProcessor(Learner peer) { + this.learner = peer; + } + + public void processRequest(Request si) { + if (si.type != OpCode.sync) { + QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); + try { + si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY); + + learner.writePacket(qp, false); + } catch (IOException e) { + LOG.warn("Closing connection to leader, exception during packet send", e); + try { + if (!learner.sock.isClosed()) { + learner.sock.close(); + } + } catch (IOException e1) { + // Nothing to do, we are shutting things down, so an exception here is irrelevant + LOG.debug("Ignoring error closing the connection", e1); + } + } + } + } + + public void flush() throws IOException { + try { + learner.writePacket(null, true); + } catch (IOException e) { + LOG.warn("Closing connection to leader, exception during packet send", e); + try { + Socket socket = learner.sock; + if (socket != null && !socket.isClosed()) { + learner.sock.close(); + } + } catch (IOException e1) { + // Nothing to do, we are shutting things down, so an exception here is irrelevant + LOG.debug("Ignoring error closing the connection", e1); + } + } + } + + public void shutdown() { + // Nothing needed + } + +} -- cgit v1.2.3