summaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java')
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java2711
1 files changed, 2711 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
new file mode 100644
index 00000000000..f6fc87d7716
--- /dev/null
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -0,0 +1,2711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.security.sasl.SaslException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException.BadArgumentsException;
+import org.apache.zookeeper.common.AtomicFileOutputStream;
+import org.apache.zookeeper.common.AtomicFileWritingIdiom;
+import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
+import org.apache.zookeeper.common.QuorumX509Util;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperThread;
+import org.apache.zookeeper.server.admin.AdminServer;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
+import org.apache.zookeeper.server.admin.AdminServerFactory;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
+import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
+import org.apache.zookeeper.server.util.ConfigUtils;
+import org.apache.zookeeper.server.util.JvmPauseMonitor;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the quorum protocol. There are three states this server
+ * can be in:
+ * <ol>
+ * <li>Leader election - each server will elect a leader (proposing itself as a
+ * leader initially).</li>
+ * <li>Follower - the server will synchronize with the leader and replicate any
+ * transactions.</li>
+ * <li>Leader - the server will process requests and forward them to followers.
+ * A majority of followers must log the request before it can be accepted.
+ * </ol>
+ *
+ * This class will setup a datagram socket that will always respond with its
+ * view of the current leader. The response will take the form of:
+ *
+ * <pre>
+ * int xid;
+ *
+ * long myid;
+ *
+ * long leader_id;
+ *
+ * long leader_zxid;
+ * </pre>
+ *
+ * The request for the current leader will consist solely of an xid: int xid;
+ */
+public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);
+
+ public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames";
+ public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false";
+
+ private QuorumBean jmxQuorumBean;
+ LocalPeerBean jmxLocalPeerBean;
+ private Map<Long, RemotePeerBean> jmxRemotePeerBean;
+ LeaderElectionBean jmxLeaderElectionBean;
+
+ // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
+ // of updates; see the implementation comment at setLastSeenQuorumVerifier().
+ private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
+
+ QuorumAuthServer authServer;
+ QuorumAuthLearner authLearner;
+
+ /**
+ * ZKDatabase is a top level member of quorumpeer
+ * which will be used in all the zookeeperservers
+ * instantiated later. Also, it is created once on
+ * bootup and only thrown away in case of a truncate
+ * message from the leader
+ */
+ private ZKDatabase zkDb;
+
+ private JvmPauseMonitor jvmPauseMonitor;
+
+ private final AtomicBoolean suspended = new AtomicBoolean(false);
+
+ public static final class AddressTuple {
+
+ public final MultipleAddresses quorumAddr;
+ public final MultipleAddresses electionAddr;
+ public final InetSocketAddress clientAddr;
+
+ public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
+ this.quorumAddr = quorumAddr;
+ this.electionAddr = electionAddr;
+ this.clientAddr = clientAddr;
+ }
+
+ }
+
+ private int observerMasterPort;
+
+ public int getObserverMasterPort() {
+ return observerMasterPort;
+ }
+
+ public void setObserverMasterPort(int observerMasterPort) {
+ this.observerMasterPort = observerMasterPort;
+ }
+
+ public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled";
+ public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false";
+
+ private boolean multiAddressEnabled = true;
+ public boolean isMultiAddressEnabled() {
+ return multiAddressEnabled;
+ }
+
+ public void setMultiAddressEnabled(boolean multiAddressEnabled) {
+ this.multiAddressEnabled = multiAddressEnabled;
+ LOG.info("multiAddress.enabled set to {}", multiAddressEnabled);
+ }
+
+ public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs";
+
+ private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
+ public int getMultiAddressReachabilityCheckTimeoutMs() {
+ return multiAddressReachabilityCheckTimeoutMs;
+ }
+
+ public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) {
+ this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs;
+ LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs);
+ }
+
+ public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled";
+
+ private boolean multiAddressReachabilityCheckEnabled = true;
+
+ public boolean isMultiAddressReachabilityCheckEnabled() {
+ return multiAddressReachabilityCheckEnabled;
+ }
+
+ public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) {
+ this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled;
+ LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled);
+ }
+
+ public static class QuorumServer {
+
+ public MultipleAddresses addr = new MultipleAddresses();
+
+ public MultipleAddresses electionAddr = new MultipleAddresses();
+
+ public InetSocketAddress clientAddr = null;
+
+ public long id;
+
+ public String hostname;
+
+ public LearnerType type = LearnerType.PARTICIPANT;
+
+ public boolean isClientAddrFromStatic = false;
+
+ private List<InetSocketAddress> myAddrs;
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
+ this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
+ }
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
+ this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
+ }
+
+ // VisibleForTesting
+ public QuorumServer(long id, InetSocketAddress addr) {
+ this(id, addr, null, null, LearnerType.PARTICIPANT);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ /**
+ * Performs a DNS lookup for server address and election address.
+ *
+ * If the DNS lookup fails, this.addr and electionAddr remain
+ * unmodified.
+ */
+ public void recreateSocketAddresses() {
+ if (this.addr.isEmpty()) {
+ LOG.warn("Server address has not been initialized");
+ return;
+ }
+ if (this.electionAddr.isEmpty()) {
+ LOG.warn("Election address has not been initialized");
+ return;
+ }
+ this.addr.recreateSocketAddresses();
+ this.electionAddr.recreateSocketAddresses();
+ }
+
+ private LearnerType getType(String s) throws ConfigException {
+ switch (s.trim().toLowerCase()) {
+ case "observer":
+ return LearnerType.OBSERVER;
+ case "participant":
+ return LearnerType.PARTICIPANT;
+ default:
+ throw new ConfigException("Unrecognised peertype: " + s);
+ }
+ }
+
+ public QuorumServer(long sid, String addressStr) throws ConfigException {
+ this(sid, addressStr, QuorumServer::getInetAddress);
+ }
+
+ QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
+ this.id = sid;
+ initializeWithAddressString(addressStr, getInetAddress);
+ }
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
+ this(id, addr, electionAddr, null, type);
+ }
+
+ public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
+ this.id = id;
+ if (addr != null) {
+ this.addr.addAddress(addr);
+ }
+ if (electionAddr != null) {
+ this.electionAddr.addAddress(electionAddr);
+ }
+ this.type = type;
+ this.clientAddr = clientAddr;
+
+ setMyAddrs();
+ }
+
+ private static final String wrongFormat =
+ " does not have the form server_config or server_config;client_config"
+ + " where server_config is the pipe separated list of host:port:port or host:port:port:type"
+ + " and client_config is port or host:port";
+
+ private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
+ LearnerType newType = null;
+ String[] serverClientParts = addressStr.split(";");
+ String[] serverAddresses = serverClientParts[0].split("\\|");
+
+ if (serverClientParts.length == 2) {
+ String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
+ if (clientParts.length > 2) {
+ throw new ConfigException(addressStr + wrongFormat);
+ }
+
+ // is client_config a host:port or just a port
+ String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
+ try {
+ clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
+ }
+ }
+
+ boolean multiAddressEnabled = Boolean.parseBoolean(
+ System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
+ if (!multiAddressEnabled && serverAddresses.length > 1) {
+ throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id);
+ }
+
+ boolean canonicalize = Boolean.parseBoolean(
+ System.getProperty(
+ CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES,
+ CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
+
+ for (String serverAddress : serverAddresses) {
+ String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
+ if ((serverClientParts.length > 2) || (serverParts.length < 3)
+ || (serverParts.length > 4)) {
+ throw new ConfigException(addressStr + wrongFormat);
+ }
+
+ String serverHostName = serverParts[0];
+
+ // server_config should be either host:port:port or host:port:port:type
+ InetSocketAddress tempAddress;
+ InetSocketAddress tempElectionAddress;
+ try {
+ tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1]));
+ addr.addAddress(tempAddress);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]);
+ }
+ try {
+ tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2]));
+ electionAddr.addAddress(tempElectionAddress);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]);
+ }
+
+ if (tempAddress.getPort() == tempElectionAddress.getPort()) {
+ throw new ConfigException("Client and election port must be different! Please update the "
+ + "configuration file on server." + this.id);
+ }
+
+ if (canonicalize) {
+ InetAddress ia = getInetAddress.apply(tempAddress);
+ if (ia == null) {
+ throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable");
+ }
+
+ String canonicalHostName = ia.getCanonicalHostName();
+
+ if (!canonicalHostName.equals(serverHostName)
+ // Avoid using literal IP address when
+ // security check fails
+ && !canonicalHostName.equals(ia.getHostAddress())) {
+ LOG.info("Host name for quorum server {} "
+ + "canonicalized from {} to {}",
+ this.id, serverHostName, canonicalHostName);
+ serverHostName = canonicalHostName;
+ }
+ }
+
+ if (serverParts.length == 4) {
+ LearnerType tempType = getType(serverParts[3]);
+ if (newType == null) {
+ newType = tempType;
+ }
+
+ if (newType != tempType) {
+ throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType);
+ }
+ }
+
+ this.hostname = serverHostName;
+ }
+
+ if (newType != null) {
+ type = newType;
+ }
+
+ setMyAddrs();
+ }
+
+ private static InetAddress getInetAddress(InetSocketAddress addr) {
+ return addr.getAddress();
+ }
+
+ private void setMyAddrs() {
+ this.myAddrs = new ArrayList<>();
+ this.myAddrs.addAll(this.addr.getAllAddresses());
+ this.myAddrs.add(this.clientAddr);
+ this.myAddrs.addAll(this.electionAddr.getAllAddresses());
+ this.myAddrs = excludedSpecialAddresses(this.myAddrs);
+ }
+
+ public static String delimitedHostString(InetSocketAddress addr) {
+ String host = addr.getHostString();
+ if (host.contains(":")) {
+ return "[" + host + "]";
+ } else {
+ return host;
+ }
+ }
+
+ public String toString() {
+ StringWriter sw = new StringWriter();
+
+ List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses());
+ List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses());
+
+ if (addrList.size() > 0 && electionAddrList.size() > 0) {
+ addrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
+ electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
+ sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d",
+ delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort()))
+ .collect(Collectors.joining("|")));
+ }
+
+ if (type == LearnerType.OBSERVER) {
+ sw.append(":observer");
+ } else if (type == LearnerType.PARTICIPANT) {
+ sw.append(":participant");
+ }
+
+ if (clientAddr != null && !isClientAddrFromStatic) {
+ sw.append(";");
+ sw.append(delimitedHostString(clientAddr));
+ sw.append(":");
+ sw.append(String.valueOf(clientAddr.getPort()));
+ }
+
+ return sw.toString();
+ }
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+
+ private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
+ return (addr1 != null || addr2 == null)
+ && (addr1 == null || addr2 != null)
+ && (addr1 == null || addr2 == null || addr1.equals(addr2));
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof QuorumServer)) {
+ return false;
+ }
+ QuorumServer qs = (QuorumServer) o;
+ if ((qs.id != id) || (qs.type != type)) {
+ return false;
+ }
+ if (!addr.equals(qs.addr)) {
+ return false;
+ }
+ if (!electionAddr.equals(qs.electionAddr)) {
+ return false;
+ }
+ return checkAddressesEqual(clientAddr, qs.clientAddr);
+ }
+
+ public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
+ List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
+ otherAddrs.add(s.clientAddr);
+ otherAddrs.addAll(s.electionAddr.getAllAddresses());
+ otherAddrs = excludedSpecialAddresses(otherAddrs);
+
+ for (InetSocketAddress my : this.myAddrs) {
+
+ for (InetSocketAddress other : otherAddrs) {
+ if (my.equals(other)) {
+ String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id);
+ throw new BadArgumentsException(error);
+ }
+ }
+ }
+ }
+
+ private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) {
+ List<InetSocketAddress> included = new ArrayList<>();
+
+ for (InetSocketAddress addr : addrs) {
+ if (addr == null) {
+ continue;
+ }
+ InetAddress inetaddr = addr.getAddress();
+
+ if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::])
+ || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1)
+ continue;
+ }
+ included.add(addr);
+ }
+ return included;
+ }
+
+ }
+
+ public enum ServerState {
+ LOOKING,
+ FOLLOWING,
+ LEADING,
+ OBSERVING
+ }
+
+ /**
+ * (Used for monitoring) shows the current phase of
+ * Zab protocol that peer is running.
+ */
+ public enum ZabState {
+ ELECTION,
+ DISCOVERY,
+ SYNCHRONIZATION,
+ BROADCAST
+ }
+
+ /**
+ * (Used for monitoring) When peer is in synchronization phase, this shows
+ * which synchronization mechanism is being used
+ */
+ public enum SyncMode {
+ NONE,
+ DIFF,
+ SNAP,
+ TRUNC
+ }
+
+ /*
+ * A peer can either be participating, which implies that it is willing to
+ * both vote in instances of consensus and to elect or become a Leader, or
+ * it may be observing in which case it isn't.
+ *
+ * We need this distinction to decide which ServerState to move to when
+ * conditions change (e.g. which state to become after LOOKING).
+ */
+ public enum LearnerType {
+ PARTICIPANT,
+ OBSERVER
+ }
+
+ /*
+ * To enable observers to have no identifier, we need a generic identifier
+ * at least for QuorumCnxManager. We use the following constant to as the
+ * value of such a generic identifier.
+ */
+
+ static final long OBSERVER_ID = Long.MAX_VALUE;
+
+ /*
+ * Record leader election time
+ */
+ public long start_fle, end_fle; // fle = fast leader election
+ public static final String FLE_TIME_UNIT = "MS";
+ private long unavailableStartTime;
+
+ /*
+ * Default value of peer is participant
+ */
+ private LearnerType learnerType = LearnerType.PARTICIPANT;
+
+ public LearnerType getLearnerType() {
+ return learnerType;
+ }
+
+ /**
+ * Sets the LearnerType
+ */
+ public void setLearnerType(LearnerType p) {
+ learnerType = p;
+ }
+
+ protected synchronized void setConfigFileName(String s) {
+ configFilename = s;
+ }
+
+ private String configFilename = null;
+
+ public int getQuorumSize() {
+ return getVotingView().size();
+ }
+
+ public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
+ this.jvmPauseMonitor = jvmPauseMonitor;
+ }
+
+ /**
+ * QuorumVerifier implementation; default (majority).
+ */
+
+ //last committed quorum verifier
+ private QuorumVerifier quorumVerifier;
+
+ //last proposed quorum verifier
+ private QuorumVerifier lastSeenQuorumVerifier = null;
+
+ // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
+ final Object QV_LOCK = new Object();
+
+ /**
+ * My id
+ */
+ private long myid;
+
+ /**
+ * get the id of this quorum peer.
+ */
+ public long getMyId() {
+ return myid;
+ }
+
+ // VisibleForTesting
+ void setId(long id) {
+ this.myid = id;
+ }
+
+ private boolean sslQuorum;
+ private boolean shouldUsePortUnification;
+
+ public boolean isSslQuorum() {
+ return sslQuorum;
+ }
+
+ public boolean shouldUsePortUnification() {
+ return shouldUsePortUnification;
+ }
+
+ private final QuorumX509Util x509Util;
+
+ QuorumX509Util getX509Util() {
+ return x509Util;
+ }
+
+ /**
+ * This is who I think the leader currently is.
+ */
+ private volatile Vote currentVote;
+
+ public synchronized Vote getCurrentVote() {
+ return currentVote;
+ }
+
+ public synchronized void setCurrentVote(Vote v) {
+ currentVote = v;
+ }
+
+ private volatile boolean running = true;
+
+ private String initialConfig;
+
+ /**
+ * The number of milliseconds of each tick
+ */
+ protected int tickTime;
+
+ /**
+ * Whether learners in this quorum should create new sessions as local.
+ * False by default to preserve existing behavior.
+ */
+ protected boolean localSessionsEnabled = false;
+
+ /**
+ * Whether learners in this quorum should upgrade local sessions to
+ * global. Only matters if local sessions are enabled.
+ */
+ protected boolean localSessionsUpgradingEnabled = true;
+
+ /**
+ * Minimum number of milliseconds to allow for session timeout.
+ * A value of -1 indicates unset, use default.
+ */
+ protected int minSessionTimeout = -1;
+
+ /**
+ * Maximum number of milliseconds to allow for session timeout.
+ * A value of -1 indicates unset, use default.
+ */
+ protected int maxSessionTimeout = -1;
+
+ /**
+ * The ZooKeeper server's socket backlog length. The number of connections
+ * that will be queued to be read before new connections are dropped. A
+ * value of one indicates the default backlog will be used.
+ */
+ protected int clientPortListenBacklog = -1;
+
+ /**
+ * The number of ticks that the initial synchronization phase can take
+ */
+ protected volatile int initLimit;
+
+ /**
+ * The number of ticks that can pass between sending a request and getting
+ * an acknowledgment
+ */
+ protected volatile int syncLimit;
+
+ /**
+ * The number of ticks that can pass before retrying to connect to learner master
+ */
+ protected volatile int connectToLearnerMasterLimit;
+
+ /**
+ * Enables/Disables sync request processor. This option is enabled
+ * by default and is to be used with observers.
+ */
+ protected boolean syncEnabled = true;
+
+ /**
+ * The current tick
+ */
+ protected AtomicInteger tick = new AtomicInteger();
+
+ /**
+ * Whether or not to listen on all IPs for the two quorum ports
+ * (broadcast and fast leader election).
+ */
+ protected boolean quorumListenOnAllIPs = false;
+
+ /**
+ * Keeps time taken for leader election in milliseconds. Sets the value to
+ * this variable only after the completion of leader election.
+ */
+ private long electionTimeTaken = -1;
+
+ /**
+ * Enable/Disables quorum authentication using sasl. Defaulting to false.
+ */
+ protected boolean quorumSaslEnableAuth;
+
+ /**
+ * If this is false, quorum peer server will accept another quorum peer client
+ * connection even if the authentication did not succeed. This can be used while
+ * upgrading ZooKeeper server. Defaulting to false (required).
+ */
+ protected boolean quorumServerSaslAuthRequired;
+
+ /**
+ * If this is false, quorum peer learner will talk to quorum peer server
+ * without authentication. This can be used while upgrading ZooKeeper
+ * server. Defaulting to false (required).
+ */
+ protected boolean quorumLearnerSaslAuthRequired;
+
+ /**
+ * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
+ */
+ protected String quorumServicePrincipal;
+
+ /**
+ * Quorum learner login context name in jaas-conf file to read the kerberos
+ * security details. Defaulting to 'QuorumLearner'.
+ */
+ protected String quorumLearnerLoginContext;
+
+ /**
+ * Quorum server login context name in jaas-conf file to read the kerberos
+ * security details. Defaulting to 'QuorumServer'.
+ */
+ protected String quorumServerLoginContext;
+
+ // TODO: need to tune the default value of thread size
+ private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
+ /**
+ * The maximum number of threads to allow in the connectionExecutors thread
+ * pool which will be used to initiate quorum server connections.
+ */
+ protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
+
+ public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
+ private static int quorumCnxnTimeoutMs;
+
+ static {
+ quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1);
+ LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs);
+ }
+
+ /**
+ * @deprecated As of release 3.4.0, this class has been deprecated, since
+ * it is used with one of the udp-based versions of leader election, which
+ * we are also deprecating.
+ *
+ * This class simply responds to requests for the current leader of this
+ * node.
+ * <p>
+ * The request contains just an xid generated by the requestor.
+ * <p>
+ * The response has the xid, the id of this server, the id of the leader,
+ * and the zxid of the leader.
+ *
+ *
+ */
+ @Deprecated
+ class ResponderThread extends ZooKeeperThread {
+
+ ResponderThread() {
+ super("ResponderThread");
+ }
+
+ volatile boolean running = true;
+
+ @Override
+ public void run() {
+ try {
+ byte[] b = new byte[36];
+ ByteBuffer responseBuffer = ByteBuffer.wrap(b);
+ DatagramPacket packet = new DatagramPacket(b, b.length);
+ while (running) {
+ udpSocket.receive(packet);
+ if (packet.getLength() != 4) {
+ LOG.warn("Got more than just an xid! Len = {}", packet.getLength());
+ } else {
+ responseBuffer.clear();
+ responseBuffer.getInt(); // Skip the xid
+ responseBuffer.putLong(myid);
+ Vote current = getCurrentVote();
+ switch (getPeerState()) {
+ case LOOKING:
+ responseBuffer.putLong(current.getId());
+ responseBuffer.putLong(current.getZxid());
+ break;
+ case LEADING:
+ responseBuffer.putLong(myid);
+ try {
+ long proposed;
+ synchronized (leader) {
+ proposed = leader.lastProposed;
+ }
+ responseBuffer.putLong(proposed);
+ } catch (NullPointerException npe) {
+ // This can happen in state transitions,
+ // just ignore the request
+ }
+ break;
+ case FOLLOWING:
+ responseBuffer.putLong(current.getId());
+ try {
+ responseBuffer.putLong(follower.getZxid());
+ } catch (NullPointerException npe) {
+ // This can happen in state transitions,
+ // just ignore the request
+ }
+ break;
+ case OBSERVING:
+ // Do nothing, Observers keep themselves to
+ // themselves.
+ break;
+ }
+ packet.setData(b);
+ udpSocket.send(packet);
+ }
+ packet.setLength(b.length);
+ }
+ } catch (RuntimeException e) {
+ LOG.warn("Unexpected runtime exception in ResponderThread", e);
+ } catch (IOException e) {
+ LOG.warn("Unexpected IO exception in ResponderThread", e);
+ } finally {
+ LOG.warn("QuorumPeer responder thread exited");
+ }
+ }
+
+ }
+
+ private ServerState state = ServerState.LOOKING;
+
+ private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
+ private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
+ private AtomicReference<String> leaderAddress = new AtomicReference<>("");
+ private AtomicLong leaderId = new AtomicLong(-1);
+
+ private boolean reconfigFlag = false; // indicates that a reconfig just committed
+
+ public synchronized void setPeerState(ServerState newState) {
+ state = newState;
+ if (newState == ServerState.LOOKING) {
+ setLeaderAddressAndId(null, -1);
+ setZabState(ZabState.ELECTION);
+ } else {
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+ }
+
+ public void setZabState(ZabState zabState) {
+ if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) {
+ long unavailableTime = Time.currentElapsedTime() - unavailableStartTime;
+ ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime);
+ if (getPeerState() == ServerState.LEADING) {
+ ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime);
+ }
+ unavailableStartTime = 0;
+ }
+ this.zabState.set(zabState);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public void setSyncMode(SyncMode syncMode) {
+ this.syncMode.set(syncMode);
+ LOG.info("Peer state changed: {}", getDetailedPeerState());
+ }
+
+ public ZabState getZabState() {
+ return zabState.get();
+ }
+
+ public SyncMode getSyncMode() {
+ return syncMode.get();
+ }
+
+ public void setLeaderAddressAndId(MultipleAddresses addr, long newId) {
+ if (addr != null) {
+ leaderAddress.set(String.join("|", addr.getAllHostStrings()));
+ } else {
+ leaderAddress.set(null);
+ }
+ leaderId.set(newId);
+ }
+
+ public String getLeaderAddress() {
+ return leaderAddress.get();
+ }
+
+ public long getLeaderId() {
+ return leaderId.get();
+ }
+
+ public String getDetailedPeerState() {
+ final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
+ final ZabState zabState = getZabState();
+ if (!ZabState.ELECTION.equals(zabState)) {
+ sb.append(" - ").append(zabState.toString().toLowerCase());
+ }
+ final SyncMode syncMode = getSyncMode();
+ if (!SyncMode.NONE.equals(syncMode)) {
+ sb.append(" - ").append(syncMode.toString().toLowerCase());
+ }
+ return sb.toString();
+ }
+
+ public synchronized void reconfigFlagSet() {
+ reconfigFlag = true;
+ }
+ public synchronized void reconfigFlagClear() {
+ reconfigFlag = false;
+ }
+ public synchronized boolean isReconfigStateChange() {
+ return reconfigFlag;
+ }
+ public synchronized ServerState getPeerState() {
+ return state;
+ }
+
+ DatagramSocket udpSocket;
+
+ private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
+
+ /**
+ * Resolves hostname for a given server ID.
+ *
+ * This method resolves hostname for a given server ID in both quorumVerifer
+ * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
+ * it also updates myAddrs.
+ */
+ public void recreateSocketAddresses(long id) {
+ QuorumVerifier qv = getQuorumVerifier();
+ if (qv != null) {
+ QuorumServer qs = qv.getAllMembers().get(id);
+ if (qs != null) {
+ qs.recreateSocketAddresses();
+ if (id == getMyId()) {
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
+ }
+ }
+ }
+ qv = getLastSeenQuorumVerifier();
+ if (qv != null) {
+ QuorumServer qs = qv.getAllMembers().get(id);
+ if (qs != null) {
+ qs.recreateSocketAddresses();
+ }
+ }
+ }
+
+ private AddressTuple getAddrs() {
+ AddressTuple addrs = myAddrs.get();
+ if (addrs != null) {
+ return addrs;
+ }
+ try {
+ synchronized (QV_LOCK) {
+ addrs = myAddrs.get();
+ while (addrs == null) {
+ QV_LOCK.wait();
+ addrs = myAddrs.get();
+ }
+ return addrs;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public MultipleAddresses getQuorumAddress() {
+ return getAddrs().quorumAddr;
+ }
+
+ public MultipleAddresses getElectionAddress() {
+ return getAddrs().electionAddr;
+ }
+
+ public InetSocketAddress getClientAddress() {
+ final AddressTuple addrs = myAddrs.get();
+ return (addrs == null) ? null : addrs.clientAddr;
+ }
+
+ private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
+ synchronized (QV_LOCK) {
+ myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
+ QV_LOCK.notifyAll();
+ }
+ }
+
+ private int electionType;
+
+ Election electionAlg;
+
+ ServerCnxnFactory cnxnFactory;
+ ServerCnxnFactory secureCnxnFactory;
+
+ private FileTxnSnapLog logFactory = null;
+
+ private final QuorumStats quorumStats;
+
+ AdminServer adminServer;
+
+ private final boolean reconfigEnabled;
+
+ public static QuorumPeer testingQuorumPeer() throws SaslException {
+ return new QuorumPeer();
+ }
+
+ public QuorumPeer() throws SaslException {
+ super("QuorumPeer");
+ quorumStats = new QuorumStats(this);
+ jmxRemotePeerBean = new HashMap<>();
+ adminServer = AdminServerFactory.createAdminServer();
+ x509Util = createX509Util();
+ initialize();
+ reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
+ }
+
+ // VisibleForTesting
+ QuorumX509Util createX509Util() {
+ return new QuorumX509Util();
+ }
+
+ /**
+ * For backward compatibility purposes, we instantiate QuorumMaj by default.
+ */
+
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException {
+ this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers));
+ }
+
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
+ this();
+ this.cnxnFactory = cnxnFactory;
+ this.electionType = electionType;
+ this.myid = myid;
+ this.tickTime = tickTime;
+ this.initLimit = initLimit;
+ this.syncLimit = syncLimit;
+ this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
+ this.quorumListenOnAllIPs = quorumListenOnAllIPs;
+ this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
+ this.zkDb = new ZKDatabase(this.logFactory);
+ if (quorumConfig == null) {
+ quorumConfig = new QuorumMaj(quorumPeers);
+ }
+ setQuorumVerifier(quorumConfig, false);
+ adminServer = AdminServerFactory.createAdminServer();
+ }
+
+ public void initialize() throws SaslException {
+ // init quorum auth server & learner
+ if (isQuorumSaslAuthEnabled()) {
+ Set<String> authzHosts = new HashSet<>();
+ for (QuorumServer qs : getView().values()) {
+ authzHosts.add(qs.hostname);
+ }
+ authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
+ authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
+ } else {
+ authServer = new NullQuorumAuthServer();
+ authLearner = new NullQuorumAuthLearner();
+ }
+ }
+
+ QuorumStats quorumStats() {
+ return quorumStats;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (!getView().containsKey(myid)) {
+ throw new RuntimeException("My id " + myid + " not in the peer list");
+ }
+ loadDataBase();
+ startServerCnxnFactory();
+ try {
+ adminServer.start();
+ } catch (AdminServerException e) {
+ LOG.warn("Problem starting AdminServer", e);
+ }
+ startLeaderElection();
+ startJvmPauseMonitor();
+ super.start();
+ }
+
+ private void loadDataBase() {
+ try {
+ zkDb.loadDataBase();
+
+ // load the epochs
+ long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
+ long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
+ try {
+ currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
+ } catch (FileNotFoundException e) {
+ // pick a reasonable epoch number
+ // this should only happen once when moving to a
+ // new code version
+ currentEpoch = epochOfZxid;
+ LOG.info(
+ "{} not found! Creating with a reasonable default of {}. "
+ + "This should only happen when you are upgrading your installation",
+ CURRENT_EPOCH_FILENAME,
+ currentEpoch);
+ writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
+ }
+ if (epochOfZxid > currentEpoch) {
+ // acceptedEpoch.tmp file in snapshot directory
+ File currentTmp = new File(getTxnFactory().getSnapDir(),
+ CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
+ if (currentTmp.exists()) {
+ long epochOfTmp = readLongFromFile(currentTmp.getName());
+ LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
+ setCurrentEpoch(epochOfTmp);
+ } else {
+ throw new IOException(
+ "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
+ + ", is older than the last zxid, " + lastProcessedZxid);
+ }
+ }
+ try {
+ acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
+ } catch (FileNotFoundException e) {
+ // pick a reasonable epoch number
+ // this should only happen once when moving to a
+ // new code version
+ acceptedEpoch = epochOfZxid;
+ LOG.info(
+ "{} not found! Creating with a reasonable default of {}. "
+ + "This should only happen when you are upgrading your installation",
+ ACCEPTED_EPOCH_FILENAME,
+ acceptedEpoch);
+ writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
+ }
+ if (acceptedEpoch < currentEpoch) {
+ throw new IOException("The accepted epoch, "
+ + ZxidUtils.zxidToString(acceptedEpoch)
+ + " is less than the current epoch, "
+ + ZxidUtils.zxidToString(currentEpoch));
+ }
+ } catch (IOException ie) {
+ LOG.error("Unable to load database on disk", ie);
+ throw new RuntimeException("Unable to run quorum server ", ie);
+ }
+ }
+
+ ResponderThread responder;
+
+ public synchronized void stopLeaderElection() {
+ responder.running = false;
+ responder.interrupt();
+ }
+ public synchronized void startLeaderElection() {
+ try {
+ if (getPeerState() == ServerState.LOOKING) {
+ currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
+ }
+ } catch (IOException e) {
+ RuntimeException re = new RuntimeException(e.getMessage());
+ re.setStackTrace(e.getStackTrace());
+ throw re;
+ }
+
+ this.electionAlg = createElectionAlgorithm(electionType);
+ }
+
+ private void startJvmPauseMonitor() {
+ if (this.jvmPauseMonitor != null) {
+ this.jvmPauseMonitor.serviceStart();
+ }
+ }
+
+ /**
+ * Count the number of nodes in the map that could be followers.
+ * @param peers
+ * @return The number of followers in the map
+ */
+ protected static int countParticipants(Map<Long, QuorumServer> peers) {
+ int count = 0;
+ for (QuorumServer q : peers.values()) {
+ if (q.type == LearnerType.PARTICIPANT) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * This constructor is only used by the existing unit test code.
+ * It defaults to FileLogProvider persistence provider.
+ */
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
+ this(
+ quorumPeers,
+ snapDir,
+ logDir,
+ electionAlg,
+ myid,
+ tickTime,
+ initLimit,
+ syncLimit,
+ connectToLearnerMasterLimit,
+ false,
+ ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
+ new QuorumMaj(quorumPeers));
+ }
+
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException {
+ this(
+ quorumPeers,
+ snapDir,
+ logDir,
+ electionAlg,
+ myid,
+ tickTime,
+ initLimit,
+ syncLimit,
+ connectToLearnerMasterLimit,
+ false,
+ ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
+ new QuorumOracleMaj(quorumPeers, oraclePath));
+ }
+
+ /**
+ * This constructor is only used by the existing unit test code.
+ * It defaults to FileLogProvider persistence provider.
+ */
+ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException {
+ this(
+ quorumPeers,
+ snapDir,
+ logDir,
+ electionAlg,
+ myid,
+ tickTime,
+ initLimit,
+ syncLimit,
+ connectToLearnerMasterLimit,
+ false,
+ ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
+ quorumConfig);
+ }
+
+ private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
+ QuorumServer quorumServer = quorumPeers.get(myid);
+ if (null == quorumServer) {
+ throw new IOException("No QuorumServer correspoding to myid " + myid);
+ }
+ if (null == quorumServer.clientAddr) {
+ return new InetSocketAddress(clientPort);
+ }
+ if (quorumServer.clientAddr.getPort() != clientPort) {
+ throw new IOException("QuorumServer port "
+ + quorumServer.clientAddr.getPort()
+ + " does not match with given port "
+ + clientPort);
+ }
+ return quorumServer.clientAddr;
+ }
+
+ /**
+ * returns the highest zxid that this host has seen
+ *
+ * @return the highest zxid for this host
+ */
+ public long getLastLoggedZxid() {
+ if (!zkDb.isInitialized()) {
+ loadDataBase();
+ }
+ return zkDb.getDataTreeLastProcessedZxid();
+ }
+
+ public Follower follower;
+ public Leader leader;
+ public Observer observer;
+
+ protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+ return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
+ }
+
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
+ return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
+ }
+
+ protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
+ return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
+ }
+
+ @SuppressWarnings("deprecation")
+ protected Election createElectionAlgorithm(int electionAlgorithm) {
+ Election le = null;
+
+ //TODO: use a factory rather than a switch
+ switch (electionAlgorithm) {
+ case 1:
+ throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
+ case 2:
+ throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
+ case 3:
+ QuorumCnxManager qcm = createCnxnManager();
+ QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
+ if (oldQcm != null) {
+ LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
+ oldQcm.halt();
+ }
+ QuorumCnxManager.Listener listener = qcm.listener;
+ if (listener != null) {
+ listener.start();
+ FastLeaderElection fle = new FastLeaderElection(this, qcm);
+ fle.start();
+ le = fle;
+ } else {
+ LOG.error("Null listener when initializing cnx manager");
+ }
+ break;
+ default:
+ assert false;
+ }
+ return le;
+ }
+
+ @SuppressWarnings("deprecation")
+ protected Election makeLEStrategy() {
+ LOG.debug("Initializing leader election protocol...");
+ return electionAlg;
+ }
+
+ protected synchronized void setLeader(Leader newLeader) {
+ leader = newLeader;
+ }
+
+ protected synchronized void setFollower(Follower newFollower) {
+ follower = newFollower;
+ }
+
+ protected synchronized void setObserver(Observer newObserver) {
+ observer = newObserver;
+ }
+
+ public synchronized ZooKeeperServer getActiveServer() {
+ if (leader != null) {
+ return leader.zk;
+ } else if (follower != null) {
+ return follower.zk;
+ } else if (observer != null) {
+ return observer.zk;
+ }
+ return null;
+ }
+
+ boolean shuttingDownLE = false;
+
+ public void setSuspended(boolean suspended) {
+ this.suspended.set(suspended);
+ }
+ private void checkSuspended() {
+ try {
+ while (suspended.get()) {
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException err) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ updateThreadName();
+
+ LOG.debug("Starting quorum peer");
+ try {
+ jmxQuorumBean = new QuorumBean(this);
+ MBeanRegistry.getInstance().register(jmxQuorumBean, null);
+ for (QuorumServer s : getView().values()) {
+ ZKMBeanInfo p;
+ if (getMyId() == s.id) {
+ p = jmxLocalPeerBean = new LocalPeerBean(this);
+ try {
+ MBeanRegistry.getInstance().register(p, jmxQuorumBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxLocalPeerBean = null;
+ }
+ } else {
+ RemotePeerBean rBean = new RemotePeerBean(this, s);
+ try {
+ MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
+ jmxRemotePeerBean.put(s.id, rBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxQuorumBean = null;
+ }
+
+ try {
+ /*
+ * Main loop
+ */
+ while (running) {
+ if (unavailableStartTime == 0) {
+ unavailableStartTime = Time.currentElapsedTime();
+ }
+
+ switch (getPeerState()) {
+ case LOOKING:
+ LOG.info("LOOKING");
+ ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
+
+ if (Boolean.getBoolean("readonlymode.enabled")) {
+ LOG.info("Attempting to start ReadOnlyZooKeeperServer");
+
+ // Create read-only server but don't start it immediately
+ final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
+
+ // Instead of starting roZk immediately, wait some grace
+ // period before we decide we're partitioned.
+ //
+ // Thread is used here because otherwise it would require
+ // changes in each of election strategy classes which is
+ // unnecessary code coupling.
+ Thread roZkMgr = new Thread() {
+ public void run() {
+ try {
+ // lower-bound grace period to 2 secs
+ sleep(Math.max(2000, tickTime));
+ if (ServerState.LOOKING.equals(getPeerState())) {
+ roZk.startup();
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
+ } catch (Exception e) {
+ LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
+ }
+ }
+ };
+ try {
+ roZkMgr.start();
+ reconfigFlagClear();
+ if (shuttingDownLE) {
+ shuttingDownLE = false;
+ startLeaderElection();
+ }
+ setCurrentVote(makeLEStrategy().lookForLeader());
+ checkSuspended();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ setPeerState(ServerState.LOOKING);
+ } finally {
+ // If the thread is in the the grace period, interrupt
+ // to come out of waiting.
+ roZkMgr.interrupt();
+ roZk.shutdown();
+ }
+ } else {
+ try {
+ reconfigFlagClear();
+ if (shuttingDownLE) {
+ shuttingDownLE = false;
+ startLeaderElection();
+ }
+ setCurrentVote(makeLEStrategy().lookForLeader());
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ setPeerState(ServerState.LOOKING);
+ }
+ }
+ break;
+ case OBSERVING:
+ try {
+ LOG.info("OBSERVING");
+ setObserver(makeObserver(logFactory));
+ observer.observeLeader();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ observer.shutdown();
+ setObserver(null);
+ updateServerState();
+
+ // Add delay jitter before we switch to LOOKING
+ // state to reduce the load of ObserverMaster
+ if (isRunning()) {
+ Observer.waitForObserverElectionDelay();
+ }
+ }
+ break;
+ case FOLLOWING:
+ try {
+ LOG.info("FOLLOWING");
+ setFollower(makeFollower(logFactory));
+ follower.followLeader();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ follower.shutdown();
+ setFollower(null);
+ updateServerState();
+ }
+ break;
+ case LEADING:
+ LOG.info("LEADING");
+ try {
+ setLeader(makeLeader(logFactory));
+ leader.lead();
+ setLeader(null);
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ if (leader != null) {
+ leader.shutdown("Forcing shutdown");
+ setLeader(null);
+ }
+ updateServerState();
+ }
+ break;
+ }
+ }
+ } finally {
+ LOG.warn("QuorumPeer main thread exited");
+ MBeanRegistry instance = MBeanRegistry.getInstance();
+ instance.unregister(jmxQuorumBean);
+ instance.unregister(jmxLocalPeerBean);
+
+ for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
+ instance.unregister(remotePeerBean);
+ }
+
+ jmxQuorumBean = null;
+ jmxLocalPeerBean = null;
+ jmxRemotePeerBean = null;
+ }
+ }
+
+ private synchronized void updateServerState() {
+ if (!reconfigFlag) {
+ setPeerState(ServerState.LOOKING);
+ LOG.warn("PeerState set to LOOKING");
+ return;
+ }
+
+ if (getMyId() == getCurrentVote().getId()) {
+ setPeerState(ServerState.LEADING);
+ LOG.debug("PeerState set to LEADING");
+ } else if (getLearnerType() == LearnerType.PARTICIPANT) {
+ setPeerState(ServerState.FOLLOWING);
+ LOG.debug("PeerState set to FOLLOWING");
+ } else if (getLearnerType() == LearnerType.OBSERVER) {
+ setPeerState(ServerState.OBSERVING);
+ LOG.debug("PeerState set to OBSERVER");
+ } else { // currently shouldn't happen since there are only 2 learner types
+ setPeerState(ServerState.LOOKING);
+ LOG.debug("Should not be here");
+ }
+ reconfigFlag = false;
+ }
+
+ public void shutdown() {
+ running = false;
+ x509Util.close();
+ if (leader != null) {
+ leader.shutdown("quorum Peer shutdown");
+ }
+ if (follower != null) {
+ follower.shutdown();
+ }
+ shutdownServerCnxnFactory();
+ if (udpSocket != null) {
+ udpSocket.close();
+ }
+ if (jvmPauseMonitor != null) {
+ jvmPauseMonitor.serviceStop();
+ }
+
+ try {
+ adminServer.shutdown();
+ } catch (AdminServerException e) {
+ LOG.warn("Problem stopping AdminServer", e);
+ }
+
+ if (getElectionAlg() != null) {
+ this.interrupt();
+ getElectionAlg().shutdown();
+ }
+ try {
+ zkDb.close();
+ } catch (IOException ie) {
+ LOG.warn("Error closing logs ", ie);
+ }
+ }
+
+ /**
+ * A 'view' is a node's current opinion of the membership of the entire
+ * ensemble.
+ */
+ public Map<Long, QuorumPeer.QuorumServer> getView() {
+ return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
+ }
+
+ /**
+ * Observers are not contained in this view, only nodes with
+ * PeerType=PARTICIPANT.
+ */
+ public Map<Long, QuorumPeer.QuorumServer> getVotingView() {
+ return getQuorumVerifier().getVotingMembers();
+ }
+
+ /**
+ * Returns only observers, no followers.
+ */
+ public Map<Long, QuorumPeer.QuorumServer> getObservingView() {
+ return getQuorumVerifier().getObservingMembers();
+ }
+
+ public synchronized Set<Long> getCurrentAndNextConfigVoters() {
+ Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet());
+ if (getLastSeenQuorumVerifier() != null) {
+ voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet());
+ }
+ return voterIds;
+ }
+
+ /**
+ * Check if a node is in the current view. With static membership, the
+ * result of this check will never change; only when dynamic membership
+ * is introduced will this be more useful.
+ */
+ public boolean viewContains(Long sid) {
+ return this.getView().containsKey(sid);
+ }
+
+ /**
+ * Only used by QuorumStats at the moment
+ */
+ public String[] getQuorumPeers() {
+ List<String> l = new ArrayList<>();
+ synchronized (this) {
+ if (leader != null) {
+ for (LearnerHandler fh : leader.getLearners()) {
+ if (fh.getSocket() != null) {
+ String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress());
+ if (leader.isLearnerSynced(fh)) {
+ s += "*";
+ }
+ l.add(s);
+ }
+ }
+ } else if (follower != null) {
+ l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress()));
+ }
+ }
+ return l.toArray(new String[0]);
+ }
+
+ public String getServerState() {
+ switch (getPeerState()) {
+ case LOOKING:
+ return QuorumStats.Provider.LOOKING_STATE;
+ case LEADING:
+ return QuorumStats.Provider.LEADING_STATE;
+ case FOLLOWING:
+ return QuorumStats.Provider.FOLLOWING_STATE;
+ case OBSERVING:
+ return QuorumStats.Provider.OBSERVING_STATE;
+ }
+ return QuorumStats.Provider.UNKNOWN_STATE;
+ }
+
+ /**
+ * set the id of this quorum peer.
+ */
+ public void setMyid(long myid) {
+ this.myid = myid;
+ }
+
+ public void setInitialConfig(String initialConfig) {
+ this.initialConfig = initialConfig;
+ }
+
+ public String getInitialConfig() {
+ return initialConfig;
+ }
+
+ /**
+ * Get the number of milliseconds of each tick
+ */
+ public int getTickTime() {
+ return tickTime;
+ }
+
+ /**
+ * Set the number of milliseconds of each tick
+ */
+ public void setTickTime(int tickTime) {
+ LOG.info("tickTime set to {}", tickTime);
+ this.tickTime = tickTime;
+ }
+
+ /** Maximum number of connections allowed from particular host (ip) */
+ public int getMaxClientCnxnsPerHost() {
+ if (cnxnFactory != null) {
+ return cnxnFactory.getMaxClientCnxnsPerHost();
+ }
+ if (secureCnxnFactory != null) {
+ return secureCnxnFactory.getMaxClientCnxnsPerHost();
+ }
+ return -1;
+ }
+
+ /** Whether local sessions are enabled */
+ public boolean areLocalSessionsEnabled() {
+ return localSessionsEnabled;
+ }
+
+ /** Whether to enable local sessions */
+ public void enableLocalSessions(boolean flag) {
+ LOG.info("Local sessions {}", (flag ? "enabled" : "disabled"));
+ localSessionsEnabled = flag;
+ }
+
+ /** Whether local sessions are allowed to upgrade to global sessions */
+ public boolean isLocalSessionsUpgradingEnabled() {
+ return localSessionsUpgradingEnabled;
+ }
+
+ /** Whether to allow local sessions to upgrade to global sessions */
+ public void enableLocalSessionsUpgrading(boolean flag) {
+ LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled"));
+ localSessionsUpgradingEnabled = flag;
+ }
+
+ /** minimum session timeout in milliseconds */
+ public int getMinSessionTimeout() {
+ return minSessionTimeout;
+ }
+
+ /** minimum session timeout in milliseconds */
+ public void setMinSessionTimeout(int min) {
+ LOG.info("minSessionTimeout set to {}", min);
+ this.minSessionTimeout = min;
+ }
+
+ /** maximum session timeout in milliseconds */
+ public int getMaxSessionTimeout() {
+ return maxSessionTimeout;
+ }
+
+ /** maximum session timeout in milliseconds */
+ public void setMaxSessionTimeout(int max) {
+ LOG.info("maxSessionTimeout set to {}", max);
+ this.maxSessionTimeout = max;
+ }
+
+ /** The server socket's listen backlog length */
+ public int getClientPortListenBacklog() {
+ return this.clientPortListenBacklog;
+ }
+
+ /** Sets the server socket's listen backlog length. */
+ public void setClientPortListenBacklog(int backlog) {
+ this.clientPortListenBacklog = backlog;
+ }
+
+ /**
+ * Get the number of ticks that the initial synchronization phase can take
+ */
+ public int getInitLimit() {
+ return initLimit;
+ }
+
+ /**
+ * Set the number of ticks that the initial synchronization phase can take
+ */
+ public void setInitLimit(int initLimit) {
+ LOG.info("initLimit set to {}", initLimit);
+ this.initLimit = initLimit;
+ }
+
+ /**
+ * Get the current tick
+ */
+ public int getTick() {
+ return tick.get();
+ }
+
+ public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
+ Properties props = new Properties();
+ props.load(new StringReader(s));
+ return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath());
+ }
+
+ /**
+ * Return QuorumVerifier object for the last committed configuration.
+ */
+ public QuorumVerifier getQuorumVerifier() {
+ synchronized (QV_LOCK) {
+ return quorumVerifier;
+ }
+ }
+
+ /**
+ * Return QuorumVerifier object for the last proposed configuration.
+ */
+ public QuorumVerifier getLastSeenQuorumVerifier() {
+ synchronized (QV_LOCK) {
+ return lastSeenQuorumVerifier;
+ }
+ }
+
+ public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) {
+ if (qvOLD == null || !qvOLD.equals(qvNEW)) {
+ LOG.warn("Restarting Leader Election");
+ getElectionAlg().shutdown();
+ shuttingDownLE = false;
+ startLeaderElection();
+ }
+ }
+
+ public String getNextDynamicConfigFilename() {
+ if (configFilename == null) {
+ LOG.warn("configFilename is null! This should only happen in tests.");
+ return null;
+ }
+ return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
+ }
+
+ // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
+ // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
+ // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
+ // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
+ // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
+ private void connectNewPeers(QuorumCnxManager qcm) {
+ if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
+ Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
+ for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
+ if (e.getKey() != getMyId() && !committedView.containsKey(e.getKey())) {
+ qcm.connectOne(e.getKey());
+ }
+ }
+ }
+ }
+
+ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
+ if (!isReconfigEnabled()) {
+ LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
+ return;
+ }
+
+ // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
+ // and then take QV_LOCK. Take the locks in the same order to ensure that we don't
+ // deadlock against other callers of connectOne(). If qcmRef gets set in another
+ // thread while we're inside the synchronized block, that does no harm; if we didn't
+ // take a lock on qcm (because it was null when we sampled it), we won't call
+ // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility
+ // of updates that provably happen in another thread before entering this method.)
+ QuorumCnxManager qcm = qcmRef.get();
+ Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
+ synchronized (outerLockObject) {
+ synchronized (QV_LOCK) {
+ if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
+ LOG.error("setLastSeenQuorumVerifier called with stale config "
+ + qv.getVersion()
+ + ". Current version: "
+ + quorumVerifier.getVersion());
+ }
+ // assuming that a version uniquely identifies a configuration, so if
+ // version is the same, nothing to do here.
+ if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
+ return;
+ }
+ lastSeenQuorumVerifier = qv;
+ if (qcm != null) {
+ connectNewPeers(qcm);
+ }
+
+ if (writeToDisk) {
+ try {
+ String fileName = getNextDynamicConfigFilename();
+ if (fileName != null) {
+ QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing next dynamic config file to disk", e);
+ }
+ }
+ }
+ }
+ }
+
+ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
+ synchronized (QV_LOCK) {
+ if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
+ // this is normal. For example - server found out about new config through FastLeaderElection gossiping
+ // and then got the same config in UPTODATE message so its already known
+ LOG.debug(
+ "{} setQuorumVerifier called with known or old config {}. Current version: {}",
+ getMyId(),
+ qv.getVersion(),
+ quorumVerifier.getVersion());
+ return quorumVerifier;
+ }
+ QuorumVerifier prevQV = quorumVerifier;
+ quorumVerifier = qv;
+ if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
+ lastSeenQuorumVerifier = qv;
+ }
+
+ if (writeToDisk) {
+ // some tests initialize QuorumPeer without a static config file
+ if (configFilename != null) {
+ try {
+ String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
+ QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
+ QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
+ } catch (IOException e) {
+ LOG.error("Error closing file", e);
+ }
+ } else {
+ LOG.info("writeToDisk == true but configFilename == null");
+ }
+ }
+
+ if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
+ QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
+ }
+ QuorumServer qs = qv.getAllMembers().get(getMyId());
+ if (qs != null) {
+ setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
+ }
+ updateObserverMasterList();
+ return prevQV;
+ }
+ }
+
+ private String makeDynamicConfigFilename(long version) {
+ return configFilename + ".dynamic." + Long.toHexString(version);
+ }
+
+ private boolean needEraseClientInfoFromStaticConfig() {
+ QuorumServer server = quorumVerifier.getAllMembers().get(getMyId());
+ return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
+ }
+
+ /**
+ * Get an instance of LeaderElection
+ */
+ public Election getElectionAlg() {
+ return electionAlg;
+ }
+
+ /**
+ * Get the synclimit
+ */
+ public int getSyncLimit() {
+ return syncLimit;
+ }
+
+ /**
+ * Set the synclimit
+ */
+ public void setSyncLimit(int syncLimit) {
+ LOG.info("syncLimit set to {}", syncLimit);
+ this.syncLimit = syncLimit;
+ }
+
+ /**
+ * Get the connectToLearnerMasterLimit
+ */
+ public int getConnectToLearnerMasterLimit() {
+ return connectToLearnerMasterLimit;
+ }
+
+ /**
+ * Set the connectToLearnerMasterLimit
+ */
+ public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
+ LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit);
+ this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
+ }
+
+ /**
+ * The syncEnabled can also be set via a system property.
+ */
+ public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
+
+ /**
+ * Return syncEnabled.
+ */
+ public boolean getSyncEnabled() {
+ if (System.getProperty(SYNC_ENABLED) != null) {
+ LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED));
+ return Boolean.getBoolean(SYNC_ENABLED);
+ } else {
+ return syncEnabled;
+ }
+ }
+
+ /**
+ * Set syncEnabled.
+ *
+ * @param syncEnabled
+ */
+ public void setSyncEnabled(boolean syncEnabled) {
+ this.syncEnabled = syncEnabled;
+ }
+
+ /**
+ * Gets the election type
+ */
+ public int getElectionType() {
+ return electionType;
+ }
+
+ /**
+ * Sets the election type
+ */
+ public void setElectionType(int electionType) {
+ this.electionType = electionType;
+ }
+
+ public boolean getQuorumListenOnAllIPs() {
+ return quorumListenOnAllIPs;
+ }
+
+ public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
+ this.quorumListenOnAllIPs = quorumListenOnAllIPs;
+ }
+
+ public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
+ this.cnxnFactory = cnxnFactory;
+ }
+
+ public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) {
+ this.secureCnxnFactory = secureCnxnFactory;
+ }
+
+ public void setSslQuorum(boolean sslQuorum) {
+ if (sslQuorum) {
+ LOG.info("Using TLS encrypted quorum communication");
+ } else {
+ LOG.info("Using insecure (non-TLS) quorum communication");
+ }
+ this.sslQuorum = sslQuorum;
+ }
+
+ public void setUsePortUnification(boolean shouldUsePortUnification) {
+ LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
+ this.shouldUsePortUnification = shouldUsePortUnification;
+ }
+
+ private void startServerCnxnFactory() {
+ if (cnxnFactory != null) {
+ cnxnFactory.start();
+ }
+ if (secureCnxnFactory != null) {
+ secureCnxnFactory.start();
+ }
+ }
+
+ private void shutdownServerCnxnFactory() {
+ if (cnxnFactory != null) {
+ cnxnFactory.shutdown();
+ }
+ if (secureCnxnFactory != null) {
+ secureCnxnFactory.shutdown();
+ }
+ }
+
+ // Leader and learner will control the zookeeper server and pass it into QuorumPeer.
+ public void setZooKeeperServer(ZooKeeperServer zks) {
+ if (cnxnFactory != null) {
+ cnxnFactory.setZooKeeperServer(zks);
+ }
+ if (secureCnxnFactory != null) {
+ secureCnxnFactory.setZooKeeperServer(zks);
+ }
+ }
+
+ public void closeAllConnections() {
+ if (cnxnFactory != null) {
+ cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
+ }
+ if (secureCnxnFactory != null) {
+ secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
+ }
+ }
+
+ public int getClientPort() {
+ if (cnxnFactory != null) {
+ return cnxnFactory.getLocalPort();
+ }
+ return -1;
+ }
+
+ public int getSecureClientPort() {
+ if (secureCnxnFactory != null) {
+ return secureCnxnFactory.getLocalPort();
+ }
+ return -1;
+ }
+
+ public void setTxnFactory(FileTxnSnapLog factory) {
+ this.logFactory = factory;
+ }
+
+ public FileTxnSnapLog getTxnFactory() {
+ return this.logFactory;
+ }
+
+ /**
+ * set zk database for this node
+ * @param database
+ */
+ public void setZKDatabase(ZKDatabase database) {
+ this.zkDb = database;
+ }
+
+ protected ZKDatabase getZkDb() {
+ return zkDb;
+ }
+
+ public synchronized void initConfigInZKDatabase() {
+ if (zkDb != null) {
+ zkDb.initConfigInZKDatabase(getQuorumVerifier());
+ }
+ }
+
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * get reference to QuorumCnxManager
+ */
+ public QuorumCnxManager getQuorumCnxManager() {
+ return qcmRef.get();
+ }
+ private long readLongFromFile(String name) throws IOException {
+ File file = new File(logFactory.getSnapDir(), name);
+ BufferedReader br = new BufferedReader(new FileReader(file));
+ String line = "";
+ try {
+ line = br.readLine();
+ return Long.parseLong(line);
+ } catch (NumberFormatException e) {
+ throw new IOException("Found " + line + " in " + file);
+ } finally {
+ br.close();
+ }
+ }
+
+ private long acceptedEpoch = -1;
+ private long currentEpoch = -1;
+
+ public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
+
+ public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
+
+ /**
+ * Write a long value to disk atomically. Either succeeds or an exception
+ * is thrown.
+ * @param name file name to write the long to
+ * @param value the long value to write to the named file
+ * @throws IOException if the file cannot be written atomically
+ */
+ // visibleForTest
+ void writeLongToFile(String name, final long value) throws IOException {
+ File file = new File(logFactory.getSnapDir(), name);
+ new AtomicFileWritingIdiom(file, new WriterStatement() {
+ @Override
+ public void write(Writer bw) throws IOException {
+ bw.write(Long.toString(value));
+ }
+ });
+ }
+
+ public long getCurrentEpoch() throws IOException {
+ if (currentEpoch == -1) {
+ currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
+ }
+ return currentEpoch;
+ }
+
+ public long getAcceptedEpoch() throws IOException {
+ if (acceptedEpoch == -1) {
+ acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
+ }
+ return acceptedEpoch;
+ }
+
+ public void setCurrentEpoch(long e) throws IOException {
+ writeLongToFile(CURRENT_EPOCH_FILENAME, e);
+ currentEpoch = e;
+ }
+
+ public void setAcceptedEpoch(long e) throws IOException {
+ writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
+ acceptedEpoch = e;
+ }
+
+ public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
+ if (!isReconfigEnabled()) {
+ LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
+ return false;
+ }
+
+ InetSocketAddress oldClientAddr = getClientAddress();
+
+ // update last committed quorum verifier, write the new config to disk
+ // and restart leader election if config changed.
+ QuorumVerifier prevQV = setQuorumVerifier(qv, true);
+
+ // There is no log record for the initial config, thus after syncing
+ // with leader
+ // /zookeeper/config is empty! it is also possible that last committed
+ // config is propagated during leader election
+ // without the propagation the corresponding log records.
+ // so we should explicitly do this (this is not necessary when we're
+ // already a Follower/Observer, only
+ // for Learner):
+ initConfigInZKDatabase();
+
+ if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
+ Map<Long, QuorumServer> newMembers = qv.getAllMembers();
+ updateRemotePeerMXBeans(newMembers);
+ if (restartLE) {
+ restartLeaderElection(prevQV, qv);
+ }
+
+ QuorumServer myNewQS = newMembers.get(getMyId());
+ if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
+ cnxnFactory.reconfigure(myNewQS.clientAddr);
+ updateThreadName();
+ }
+
+ boolean roleChange = updateLearnerType(qv);
+ boolean leaderChange = false;
+ if (suggestedLeaderId != null) {
+ // zxid should be non-null too
+ leaderChange = updateVote(suggestedLeaderId, zxid);
+ } else {
+ long currentLeaderId = getCurrentVote().getId();
+ QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId);
+ QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId);
+ leaderChange = (myleaderInCurQV == null
+ || myleaderInCurQV.addr == null
+ || myleaderInNewQV == null
+ || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr));
+ // we don't have a designated leader - need to go into leader
+ // election
+ reconfigFlagClear();
+ }
+
+ return roleChange || leaderChange;
+ }
+ return false;
+
+ }
+
+ private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) {
+ Set<Long> existingMembers = new HashSet<>(newMembers.keySet());
+ existingMembers.retainAll(jmxRemotePeerBean.keySet());
+ for (Long id : existingMembers) {
+ RemotePeerBean rBean = jmxRemotePeerBean.get(id);
+ rBean.setQuorumServer(newMembers.get(id));
+ }
+
+ Set<Long> joiningMembers = new HashSet<>(newMembers.keySet());
+ joiningMembers.removeAll(jmxRemotePeerBean.keySet());
+ joiningMembers.remove(getMyId()); // remove self as it is local bean
+ for (Long id : joiningMembers) {
+ QuorumServer qs = newMembers.get(id);
+ RemotePeerBean rBean = new RemotePeerBean(this, qs);
+ try {
+ MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
+ jmxRemotePeerBean.put(qs.id, rBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+ }
+
+ Set<Long> leavingMembers = new HashSet<>(jmxRemotePeerBean.keySet());
+ leavingMembers.removeAll(newMembers.keySet());
+ for (Long id : leavingMembers) {
+ RemotePeerBean rBean = jmxRemotePeerBean.remove(id);
+ try {
+ MBeanRegistry.getInstance().unregister(rBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ }
+ }
+
+ private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
+ private void updateObserverMasterList() {
+ if (observerMasterPort <= 0) {
+ return; // observer masters not enabled
+ }
+ observerMasters.clear();
+ StringBuilder sb = new StringBuilder();
+ for (QuorumServer server : quorumVerifier.getVotingMembers().values()) {
+ InetAddress address = server.addr.getReachableOrOne().getAddress();
+ InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort);
+ observerMasters.add(new QuorumServer(server.id, addr));
+ sb.append(addr).append(",");
+ }
+ LOG.info("Updated learner master list to be {}", sb.toString());
+ Collections.shuffle(observerMasters);
+ // Reset the internal index of the observerMaster when
+ // the observerMaster List is refreshed
+ nextObserverMaster = 0;
+ }
+
+ private boolean useObserverMasters() {
+ return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0;
+ }
+
+ private int nextObserverMaster = 0;
+ private QuorumServer nextObserverMaster() {
+ if (nextObserverMaster >= observerMasters.size()) {
+ nextObserverMaster = 0;
+ // Add a reconnect delay only after the observer
+ // has exhausted trying to connect to all the masters
+ // from the observerMasterList
+ if (isRunning()) {
+ Observer.waitForReconnectDelay();
+ }
+ }
+ return observerMasters.get(nextObserverMaster++);
+ }
+
+ QuorumServer findLearnerMaster(QuorumServer leader) {
+ if (useObserverMasters()) {
+ return nextObserverMaster();
+ } else {
+ // Add delay jitter to reduce the load on the leader
+ if (isRunning()) {
+ Observer.waitForReconnectDelay();
+ }
+ return leader;
+ }
+ }
+
+ /**
+ * Vet a given learner master's information.
+ * Allows specification by server id, ip only, or ip and port
+ */
+ QuorumServer validateLearnerMaster(String desiredMaster) {
+ if (useObserverMasters()) {
+ Long sid;
+ try {
+ sid = Long.parseLong(desiredMaster);
+ } catch (NumberFormatException e) {
+ sid = null;
+ }
+ for (QuorumServer server : observerMasters) {
+ if (sid == null) {
+ for (InetSocketAddress address : server.addr.getAllAddresses()) {
+ String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort();
+ if (serverAddr.startsWith(desiredMaster)) {
+ return server;
+ }
+ }
+ } else {
+ if (sid.equals(server.id)) {
+ return server;
+ }
+ }
+ }
+ if (sid == null) {
+ LOG.info("could not find learner master address={}", desiredMaster);
+ } else {
+ LOG.warn("could not find learner master sid={}", sid);
+ }
+ } else {
+ LOG.info("cannot validate request, observer masters not enabled");
+ }
+ return null;
+ }
+
+ private boolean updateLearnerType(QuorumVerifier newQV) {
+ //check if I'm an observer in new config
+ if (newQV.getObservingMembers().containsKey(getMyId())) {
+ if (getLearnerType() != LearnerType.OBSERVER) {
+ setLearnerType(LearnerType.OBSERVER);
+ LOG.info("Becoming an observer");
+ reconfigFlagSet();
+ return true;
+ } else {
+ return false;
+ }
+ } else if (newQV.getVotingMembers().containsKey(getMyId())) {
+ if (getLearnerType() != LearnerType.PARTICIPANT) {
+ setLearnerType(LearnerType.PARTICIPANT);
+ LOG.info("Becoming a voting participant");
+ reconfigFlagSet();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ // I'm not in the view
+ if (getLearnerType() != LearnerType.PARTICIPANT) {
+ setLearnerType(LearnerType.PARTICIPANT);
+ LOG.info("Becoming a non-voting participant");
+ reconfigFlagSet();
+ return true;
+ }
+ return false;
+ }
+
+ private boolean updateVote(long designatedLeader, long zxid) {
+ Vote currentVote = getCurrentVote();
+ if (currentVote != null && designatedLeader != currentVote.getId()) {
+ setCurrentVote(new Vote(designatedLeader, zxid));
+ reconfigFlagSet();
+ LOG.warn("Suggested leader: {}", designatedLeader);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Updates leader election info to avoid inconsistencies when
+ * a new server tries to join the ensemble.
+ *
+ * Here is the inconsistency scenario we try to solve by updating the peer
+ * epoch after following leader:
+ *
+ * Let's say we have an ensemble with 3 servers z1, z2 and z3.
+ *
+ * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is
+ * 0xb9, aka current accepted epoch on disk.
+ * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading
+ * the current accept epoch from disk.
+ * 3. z2 received notification from z1 and z3, which is following z3 with
+ * epoch 0xb8, so it started following z3 again with peer epoch 0xb8.
+ * 4. before z2 successfully connected to z3, z3 get restarted with new
+ * epoch 0xb9.
+ * 5. z2 will retry around a few round (default 5s) before giving up,
+ * meanwhile it will report z3 as leader.
+ * 6. z1 restarted, and looking with peer epoch 0xb9.
+ * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9.
+ * 8. z2 successfully connected to z3 before giving up, but with peer
+ * epoch 0xb8.
+ * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot
+ * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting
+ * 0xb9.
+ *
+ * By updating the election vote after actually following leader, we can
+ * avoid this kind of stuck happened.
+ *
+ * Btw, the zxid and electionEpoch could be inconsistent because of the same
+ * reason, it's better to update these as well after syncing with leader, but
+ * that required protocol change which is non trivial. This problem is worked
+ * around by skipping comparing the zxid and electionEpoch when counting for
+ * votes for out of election servers during looking for leader.
+ *
+ * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732
+ */
+ protected void updateElectionVote(long newEpoch) {
+ Vote currentVote = getCurrentVote();
+ if (currentVote != null) {
+ setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote
+ .getState()));
+ }
+ }
+
+ private void updateThreadName() {
+ String plain = cnxnFactory != null
+ ? cnxnFactory.getLocalAddress() != null
+ ? formatInetAddr(cnxnFactory.getLocalAddress())
+ : "disabled"
+ : "disabled";
+ String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled";
+ setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getMyId(), plain, secure));
+ }
+
+ /**
+ * Sets the time taken for leader election in milliseconds.
+ *
+ * @param electionTimeTaken time taken for leader election
+ */
+ void setElectionTimeTaken(long electionTimeTaken) {
+ this.electionTimeTaken = electionTimeTaken;
+ }
+
+ /**
+ * @return the time taken for leader election in milliseconds.
+ */
+ long getElectionTimeTaken() {
+ return electionTimeTaken;
+ }
+
+ void setQuorumServerSaslRequired(boolean serverSaslRequired) {
+ quorumServerSaslAuthRequired = serverSaslRequired;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired);
+ }
+
+ void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
+ quorumLearnerSaslAuthRequired = learnerSaslRequired;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired);
+ }
+
+ void setQuorumSaslEnabled(boolean enableAuth) {
+ quorumSaslEnableAuth = enableAuth;
+ if (!quorumSaslEnableAuth) {
+ LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)");
+ } else {
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
+ }
+ }
+
+ void setQuorumServicePrincipal(String servicePrincipal) {
+ quorumServicePrincipal = servicePrincipal;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal);
+ }
+
+ void setQuorumLearnerLoginContext(String learnerContext) {
+ quorumLearnerLoginContext = learnerContext;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext);
+ }
+
+ void setQuorumServerLoginContext(String serverContext) {
+ quorumServerLoginContext = serverContext;
+ LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext);
+ }
+
+ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
+ if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
+ quorumCnxnThreadsSize = qCnxnThreadsSize;
+ }
+ LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
+ }
+
+ boolean isQuorumSaslAuthEnabled() {
+ return quorumSaslEnableAuth;
+ }
+
+ private boolean isQuorumServerSaslAuthRequired() {
+ return quorumServerSaslAuthRequired;
+ }
+
+ private boolean isQuorumLearnerSaslAuthRequired() {
+ return quorumLearnerSaslAuthRequired;
+ }
+
+ public QuorumCnxManager createCnxnManager() {
+ int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
+ LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
+ return new QuorumCnxManager(
+ this,
+ this.getMyId(),
+ this.getView(),
+ this.authServer,
+ this.authLearner,
+ timeout,
+ this.getQuorumListenOnAllIPs(),
+ this.quorumCnxnThreadsSize,
+ this.isQuorumSaslAuthEnabled());
+ }
+
+ boolean isLeader(long id) {
+ Vote vote = getCurrentVote();
+ return vote != null && id == vote.getId();
+ }
+
+ public boolean isReconfigEnabled() {
+ return reconfigEnabled;
+ }
+
+ @InterfaceAudience.Private
+ /**
+ * This is a metric that depends on the status of the peer.
+ */ public Integer getSynced_observers_metric() {
+ if (leader != null) {
+ return leader.getObservingLearners().size();
+ } else if (follower != null) {
+ return follower.getSyncedObserverSize();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Create a new QuorumPeer and apply all the values per the already-parsed config.
+ *
+ * @param config The appertained quorum peer config.
+ * @return A QuorumPeer instantiated with specified peer config. Note this peer
+ * is not fully initialized; caller should finish initialization through
+ * additional configurations (connection factory settings, etc).
+ *
+ * @throws IOException
+ */
+ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException {
+ QuorumPeer quorumPeer = new QuorumPeer();
+ quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
+ quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
+ quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
+ quorumPeer.setElectionType(config.getElectionAlg());
+ quorumPeer.setMyid(config.getServerId());
+ quorumPeer.setTickTime(config.getTickTime());
+ quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
+ quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
+ quorumPeer.setInitLimit(config.getInitLimit());
+ quorumPeer.setSyncLimit(config.getSyncLimit());
+ quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
+ quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
+ quorumPeer.setConfigFileName(config.getConfigFilename());
+ quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
+ quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
+ quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+ if (config.getLastSeenQuorumVerifier() != null) {
+ quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
+ }
+ quorumPeer.initConfigInZKDatabase();
+ quorumPeer.setSslQuorum(config.isSslQuorum());
+ quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
+ quorumPeer.setLearnerType(config.getPeerType());
+ quorumPeer.setSyncEnabled(config.getSyncEnabled());
+ quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
+ if (config.sslQuorumReloadCertFiles) {
+ quorumPeer.getX509Util().enableCertFileReloading();
+ }
+ quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
+ quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
+ quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
+
+ // sets quorum sasl authentication configurations
+ quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
+ if (quorumPeer.isQuorumSaslAuthEnabled()) {
+ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
+ quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
+ quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
+ quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
+ quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
+ }
+ quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
+
+ if (config.jvmPauseMonitorToRun) {
+ quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
+ }
+
+ return quorumPeer;
+ }
+
+}