aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java')
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java2711
1 files changed, 0 insertions, 2711 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
deleted file mode 100644
index f6fc87d7716..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ /dev/null
@@ -1,2711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import javax.security.sasl.SaslException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException.BadArgumentsException;
-import org.apache.zookeeper.common.AtomicFileOutputStream;
-import org.apache.zookeeper.common.AtomicFileWritingIdiom;
-import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
-import org.apache.zookeeper.common.QuorumX509Util;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperThread;
-import org.apache.zookeeper.server.admin.AdminServer;
-import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
-import org.apache.zookeeper.server.admin.AdminServerFactory;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
-import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuth;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
-import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner;
-import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.util.ConfigUtils;
-import org.apache.zookeeper.server.util.JvmPauseMonitor;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class manages the quorum protocol. There are three states this server
- * can be in:
- * <ol>
- * <li>Leader election - each server will elect a leader (proposing itself as a
- * leader initially).</li>
- * <li>Follower - the server will synchronize with the leader and replicate any
- * transactions.</li>
- * <li>Leader - the server will process requests and forward them to followers.
- * A majority of followers must log the request before it can be accepted.
- * </ol>
- *
- * This class will setup a datagram socket that will always respond with its
- * view of the current leader. The response will take the form of:
- *
- * <pre>
- * int xid;
- *
- * long myid;
- *
- * long leader_id;
- *
- * long leader_zxid;
- * </pre>
- *
- * The request for the current leader will consist solely of an xid: int xid;
- */
-public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
-
- private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class);
-
- public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames";
- public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false";
-
- private QuorumBean jmxQuorumBean;
- LocalPeerBean jmxLocalPeerBean;
- private Map<Long, RemotePeerBean> jmxRemotePeerBean;
- LeaderElectionBean jmxLeaderElectionBean;
-
- // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility
- // of updates; see the implementation comment at setLastSeenQuorumVerifier().
- private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
-
- QuorumAuthServer authServer;
- QuorumAuthLearner authLearner;
-
- /**
- * ZKDatabase is a top level member of quorumpeer
- * which will be used in all the zookeeperservers
- * instantiated later. Also, it is created once on
- * bootup and only thrown away in case of a truncate
- * message from the leader
- */
- private ZKDatabase zkDb;
-
- private JvmPauseMonitor jvmPauseMonitor;
-
- private final AtomicBoolean suspended = new AtomicBoolean(false);
-
- public static final class AddressTuple {
-
- public final MultipleAddresses quorumAddr;
- public final MultipleAddresses electionAddr;
- public final InetSocketAddress clientAddr;
-
- public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
- this.quorumAddr = quorumAddr;
- this.electionAddr = electionAddr;
- this.clientAddr = clientAddr;
- }
-
- }
-
- private int observerMasterPort;
-
- public int getObserverMasterPort() {
- return observerMasterPort;
- }
-
- public void setObserverMasterPort(int observerMasterPort) {
- this.observerMasterPort = observerMasterPort;
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled";
- public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false";
-
- private boolean multiAddressEnabled = true;
- public boolean isMultiAddressEnabled() {
- return multiAddressEnabled;
- }
-
- public void setMultiAddressEnabled(boolean multiAddressEnabled) {
- this.multiAddressEnabled = multiAddressEnabled;
- LOG.info("multiAddress.enabled set to {}", multiAddressEnabled);
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs";
-
- private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
- public int getMultiAddressReachabilityCheckTimeoutMs() {
- return multiAddressReachabilityCheckTimeoutMs;
- }
-
- public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) {
- this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs;
- LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs);
- }
-
- public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled";
-
- private boolean multiAddressReachabilityCheckEnabled = true;
-
- public boolean isMultiAddressReachabilityCheckEnabled() {
- return multiAddressReachabilityCheckEnabled;
- }
-
- public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) {
- this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled;
- LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled);
- }
-
- public static class QuorumServer {
-
- public MultipleAddresses addr = new MultipleAddresses();
-
- public MultipleAddresses electionAddr = new MultipleAddresses();
-
- public InetSocketAddress clientAddr = null;
-
- public long id;
-
- public String hostname;
-
- public LearnerType type = LearnerType.PARTICIPANT;
-
- public boolean isClientAddrFromStatic = false;
-
- private List<InetSocketAddress> myAddrs;
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
- this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) {
- this(id, addr, electionAddr, null, LearnerType.PARTICIPANT);
- }
-
- // VisibleForTesting
- public QuorumServer(long id, InetSocketAddress addr) {
- this(id, addr, null, null, LearnerType.PARTICIPANT);
- }
-
- public long getId() {
- return id;
- }
-
- /**
- * Performs a DNS lookup for server address and election address.
- *
- * If the DNS lookup fails, this.addr and electionAddr remain
- * unmodified.
- */
- public void recreateSocketAddresses() {
- if (this.addr.isEmpty()) {
- LOG.warn("Server address has not been initialized");
- return;
- }
- if (this.electionAddr.isEmpty()) {
- LOG.warn("Election address has not been initialized");
- return;
- }
- this.addr.recreateSocketAddresses();
- this.electionAddr.recreateSocketAddresses();
- }
-
- private LearnerType getType(String s) throws ConfigException {
- switch (s.trim().toLowerCase()) {
- case "observer":
- return LearnerType.OBSERVER;
- case "participant":
- return LearnerType.PARTICIPANT;
- default:
- throw new ConfigException("Unrecognised peertype: " + s);
- }
- }
-
- public QuorumServer(long sid, String addressStr) throws ConfigException {
- this(sid, addressStr, QuorumServer::getInetAddress);
- }
-
- QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
- this.id = sid;
- initializeWithAddressString(addressStr, getInetAddress);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) {
- this(id, addr, electionAddr, null, type);
- }
-
- public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
- this.id = id;
- if (addr != null) {
- this.addr.addAddress(addr);
- }
- if (electionAddr != null) {
- this.electionAddr.addAddress(electionAddr);
- }
- this.type = type;
- this.clientAddr = clientAddr;
-
- setMyAddrs();
- }
-
- private static final String wrongFormat =
- " does not have the form server_config or server_config;client_config"
- + " where server_config is the pipe separated list of host:port:port or host:port:port:type"
- + " and client_config is port or host:port";
-
- private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException {
- LearnerType newType = null;
- String[] serverClientParts = addressStr.split(";");
- String[] serverAddresses = serverClientParts[0].split("\\|");
-
- if (serverClientParts.length == 2) {
- String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]);
- if (clientParts.length > 2) {
- throw new ConfigException(addressStr + wrongFormat);
- }
-
- // is client_config a host:port or just a port
- String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0";
- try {
- clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1]));
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
- }
- }
-
- boolean multiAddressEnabled = Boolean.parseBoolean(
- System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
- if (!multiAddressEnabled && serverAddresses.length > 1) {
- throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id);
- }
-
- boolean canonicalize = Boolean.parseBoolean(
- System.getProperty(
- CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES,
- CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES));
-
- for (String serverAddress : serverAddresses) {
- String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
- if ((serverClientParts.length > 2) || (serverParts.length < 3)
- || (serverParts.length > 4)) {
- throw new ConfigException(addressStr + wrongFormat);
- }
-
- String serverHostName = serverParts[0];
-
- // server_config should be either host:port:port or host:port:port:type
- InetSocketAddress tempAddress;
- InetSocketAddress tempElectionAddress;
- try {
- tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1]));
- addr.addAddress(tempAddress);
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]);
- }
- try {
- tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2]));
- electionAddr.addAddress(tempElectionAddress);
- } catch (NumberFormatException e) {
- throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]);
- }
-
- if (tempAddress.getPort() == tempElectionAddress.getPort()) {
- throw new ConfigException("Client and election port must be different! Please update the "
- + "configuration file on server." + this.id);
- }
-
- if (canonicalize) {
- InetAddress ia = getInetAddress.apply(tempAddress);
- if (ia == null) {
- throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable");
- }
-
- String canonicalHostName = ia.getCanonicalHostName();
-
- if (!canonicalHostName.equals(serverHostName)
- // Avoid using literal IP address when
- // security check fails
- && !canonicalHostName.equals(ia.getHostAddress())) {
- LOG.info("Host name for quorum server {} "
- + "canonicalized from {} to {}",
- this.id, serverHostName, canonicalHostName);
- serverHostName = canonicalHostName;
- }
- }
-
- if (serverParts.length == 4) {
- LearnerType tempType = getType(serverParts[3]);
- if (newType == null) {
- newType = tempType;
- }
-
- if (newType != tempType) {
- throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType);
- }
- }
-
- this.hostname = serverHostName;
- }
-
- if (newType != null) {
- type = newType;
- }
-
- setMyAddrs();
- }
-
- private static InetAddress getInetAddress(InetSocketAddress addr) {
- return addr.getAddress();
- }
-
- private void setMyAddrs() {
- this.myAddrs = new ArrayList<>();
- this.myAddrs.addAll(this.addr.getAllAddresses());
- this.myAddrs.add(this.clientAddr);
- this.myAddrs.addAll(this.electionAddr.getAllAddresses());
- this.myAddrs = excludedSpecialAddresses(this.myAddrs);
- }
-
- public static String delimitedHostString(InetSocketAddress addr) {
- String host = addr.getHostString();
- if (host.contains(":")) {
- return "[" + host + "]";
- } else {
- return host;
- }
- }
-
- public String toString() {
- StringWriter sw = new StringWriter();
-
- List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses());
- List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses());
-
- if (addrList.size() > 0 && electionAddrList.size() > 0) {
- addrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
- electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString));
- sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d",
- delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort()))
- .collect(Collectors.joining("|")));
- }
-
- if (type == LearnerType.OBSERVER) {
- sw.append(":observer");
- } else if (type == LearnerType.PARTICIPANT) {
- sw.append(":participant");
- }
-
- if (clientAddr != null && !isClientAddrFromStatic) {
- sw.append(";");
- sw.append(delimitedHostString(clientAddr));
- sw.append(":");
- sw.append(String.valueOf(clientAddr.getPort()));
- }
-
- return sw.toString();
- }
-
- public int hashCode() {
- assert false : "hashCode not designed";
- return 42; // any arbitrary constant will do
- }
-
- private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) {
- return (addr1 != null || addr2 == null)
- && (addr1 == null || addr2 != null)
- && (addr1 == null || addr2 == null || addr1.equals(addr2));
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof QuorumServer)) {
- return false;
- }
- QuorumServer qs = (QuorumServer) o;
- if ((qs.id != id) || (qs.type != type)) {
- return false;
- }
- if (!addr.equals(qs.addr)) {
- return false;
- }
- if (!electionAddr.equals(qs.electionAddr)) {
- return false;
- }
- return checkAddressesEqual(clientAddr, qs.clientAddr);
- }
-
- public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException {
- List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses());
- otherAddrs.add(s.clientAddr);
- otherAddrs.addAll(s.electionAddr.getAllAddresses());
- otherAddrs = excludedSpecialAddresses(otherAddrs);
-
- for (InetSocketAddress my : this.myAddrs) {
-
- for (InetSocketAddress other : otherAddrs) {
- if (my.equals(other)) {
- String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id);
- throw new BadArgumentsException(error);
- }
- }
- }
- }
-
- private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) {
- List<InetSocketAddress> included = new ArrayList<>();
-
- for (InetSocketAddress addr : addrs) {
- if (addr == null) {
- continue;
- }
- InetAddress inetaddr = addr.getAddress();
-
- if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::])
- || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1)
- continue;
- }
- included.add(addr);
- }
- return included;
- }
-
- }
-
- public enum ServerState {
- LOOKING,
- FOLLOWING,
- LEADING,
- OBSERVING
- }
-
- /**
- * (Used for monitoring) shows the current phase of
- * Zab protocol that peer is running.
- */
- public enum ZabState {
- ELECTION,
- DISCOVERY,
- SYNCHRONIZATION,
- BROADCAST
- }
-
- /**
- * (Used for monitoring) When peer is in synchronization phase, this shows
- * which synchronization mechanism is being used
- */
- public enum SyncMode {
- NONE,
- DIFF,
- SNAP,
- TRUNC
- }
-
- /*
- * A peer can either be participating, which implies that it is willing to
- * both vote in instances of consensus and to elect or become a Leader, or
- * it may be observing in which case it isn't.
- *
- * We need this distinction to decide which ServerState to move to when
- * conditions change (e.g. which state to become after LOOKING).
- */
- public enum LearnerType {
- PARTICIPANT,
- OBSERVER
- }
-
- /*
- * To enable observers to have no identifier, we need a generic identifier
- * at least for QuorumCnxManager. We use the following constant to as the
- * value of such a generic identifier.
- */
-
- static final long OBSERVER_ID = Long.MAX_VALUE;
-
- /*
- * Record leader election time
- */
- public long start_fle, end_fle; // fle = fast leader election
- public static final String FLE_TIME_UNIT = "MS";
- private long unavailableStartTime;
-
- /*
- * Default value of peer is participant
- */
- private LearnerType learnerType = LearnerType.PARTICIPANT;
-
- public LearnerType getLearnerType() {
- return learnerType;
- }
-
- /**
- * Sets the LearnerType
- */
- public void setLearnerType(LearnerType p) {
- learnerType = p;
- }
-
- protected synchronized void setConfigFileName(String s) {
- configFilename = s;
- }
-
- private String configFilename = null;
-
- public int getQuorumSize() {
- return getVotingView().size();
- }
-
- public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) {
- this.jvmPauseMonitor = jvmPauseMonitor;
- }
-
- /**
- * QuorumVerifier implementation; default (majority).
- */
-
- //last committed quorum verifier
- private QuorumVerifier quorumVerifier;
-
- //last proposed quorum verifier
- private QuorumVerifier lastSeenQuorumVerifier = null;
-
- // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier.
- final Object QV_LOCK = new Object();
-
- /**
- * My id
- */
- private long myid;
-
- /**
- * get the id of this quorum peer.
- */
- public long getMyId() {
- return myid;
- }
-
- // VisibleForTesting
- void setId(long id) {
- this.myid = id;
- }
-
- private boolean sslQuorum;
- private boolean shouldUsePortUnification;
-
- public boolean isSslQuorum() {
- return sslQuorum;
- }
-
- public boolean shouldUsePortUnification() {
- return shouldUsePortUnification;
- }
-
- private final QuorumX509Util x509Util;
-
- QuorumX509Util getX509Util() {
- return x509Util;
- }
-
- /**
- * This is who I think the leader currently is.
- */
- private volatile Vote currentVote;
-
- public synchronized Vote getCurrentVote() {
- return currentVote;
- }
-
- public synchronized void setCurrentVote(Vote v) {
- currentVote = v;
- }
-
- private volatile boolean running = true;
-
- private String initialConfig;
-
- /**
- * The number of milliseconds of each tick
- */
- protected int tickTime;
-
- /**
- * Whether learners in this quorum should create new sessions as local.
- * False by default to preserve existing behavior.
- */
- protected boolean localSessionsEnabled = false;
-
- /**
- * Whether learners in this quorum should upgrade local sessions to
- * global. Only matters if local sessions are enabled.
- */
- protected boolean localSessionsUpgradingEnabled = true;
-
- /**
- * Minimum number of milliseconds to allow for session timeout.
- * A value of -1 indicates unset, use default.
- */
- protected int minSessionTimeout = -1;
-
- /**
- * Maximum number of milliseconds to allow for session timeout.
- * A value of -1 indicates unset, use default.
- */
- protected int maxSessionTimeout = -1;
-
- /**
- * The ZooKeeper server's socket backlog length. The number of connections
- * that will be queued to be read before new connections are dropped. A
- * value of one indicates the default backlog will be used.
- */
- protected int clientPortListenBacklog = -1;
-
- /**
- * The number of ticks that the initial synchronization phase can take
- */
- protected volatile int initLimit;
-
- /**
- * The number of ticks that can pass between sending a request and getting
- * an acknowledgment
- */
- protected volatile int syncLimit;
-
- /**
- * The number of ticks that can pass before retrying to connect to learner master
- */
- protected volatile int connectToLearnerMasterLimit;
-
- /**
- * Enables/Disables sync request processor. This option is enabled
- * by default and is to be used with observers.
- */
- protected boolean syncEnabled = true;
-
- /**
- * The current tick
- */
- protected AtomicInteger tick = new AtomicInteger();
-
- /**
- * Whether or not to listen on all IPs for the two quorum ports
- * (broadcast and fast leader election).
- */
- protected boolean quorumListenOnAllIPs = false;
-
- /**
- * Keeps time taken for leader election in milliseconds. Sets the value to
- * this variable only after the completion of leader election.
- */
- private long electionTimeTaken = -1;
-
- /**
- * Enable/Disables quorum authentication using sasl. Defaulting to false.
- */
- protected boolean quorumSaslEnableAuth;
-
- /**
- * If this is false, quorum peer server will accept another quorum peer client
- * connection even if the authentication did not succeed. This can be used while
- * upgrading ZooKeeper server. Defaulting to false (required).
- */
- protected boolean quorumServerSaslAuthRequired;
-
- /**
- * If this is false, quorum peer learner will talk to quorum peer server
- * without authentication. This can be used while upgrading ZooKeeper
- * server. Defaulting to false (required).
- */
- protected boolean quorumLearnerSaslAuthRequired;
-
- /**
- * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
- */
- protected String quorumServicePrincipal;
-
- /**
- * Quorum learner login context name in jaas-conf file to read the kerberos
- * security details. Defaulting to 'QuorumLearner'.
- */
- protected String quorumLearnerLoginContext;
-
- /**
- * Quorum server login context name in jaas-conf file to read the kerberos
- * security details. Defaulting to 'QuorumServer'.
- */
- protected String quorumServerLoginContext;
-
- // TODO: need to tune the default value of thread size
- private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
- /**
- * The maximum number of threads to allow in the connectionExecutors thread
- * pool which will be used to initiate quorum server connections.
- */
- protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;
-
- public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
- private static int quorumCnxnTimeoutMs;
-
- static {
- quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1);
- LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs);
- }
-
- /**
- * @deprecated As of release 3.4.0, this class has been deprecated, since
- * it is used with one of the udp-based versions of leader election, which
- * we are also deprecating.
- *
- * This class simply responds to requests for the current leader of this
- * node.
- * <p>
- * The request contains just an xid generated by the requestor.
- * <p>
- * The response has the xid, the id of this server, the id of the leader,
- * and the zxid of the leader.
- *
- *
- */
- @Deprecated
- class ResponderThread extends ZooKeeperThread {
-
- ResponderThread() {
- super("ResponderThread");
- }
-
- volatile boolean running = true;
-
- @Override
- public void run() {
- try {
- byte[] b = new byte[36];
- ByteBuffer responseBuffer = ByteBuffer.wrap(b);
- DatagramPacket packet = new DatagramPacket(b, b.length);
- while (running) {
- udpSocket.receive(packet);
- if (packet.getLength() != 4) {
- LOG.warn("Got more than just an xid! Len = {}", packet.getLength());
- } else {
- responseBuffer.clear();
- responseBuffer.getInt(); // Skip the xid
- responseBuffer.putLong(myid);
- Vote current = getCurrentVote();
- switch (getPeerState()) {
- case LOOKING:
- responseBuffer.putLong(current.getId());
- responseBuffer.putLong(current.getZxid());
- break;
- case LEADING:
- responseBuffer.putLong(myid);
- try {
- long proposed;
- synchronized (leader) {
- proposed = leader.lastProposed;
- }
- responseBuffer.putLong(proposed);
- } catch (NullPointerException npe) {
- // This can happen in state transitions,
- // just ignore the request
- }
- break;
- case FOLLOWING:
- responseBuffer.putLong(current.getId());
- try {
- responseBuffer.putLong(follower.getZxid());
- } catch (NullPointerException npe) {
- // This can happen in state transitions,
- // just ignore the request
- }
- break;
- case OBSERVING:
- // Do nothing, Observers keep themselves to
- // themselves.
- break;
- }
- packet.setData(b);
- udpSocket.send(packet);
- }
- packet.setLength(b.length);
- }
- } catch (RuntimeException e) {
- LOG.warn("Unexpected runtime exception in ResponderThread", e);
- } catch (IOException e) {
- LOG.warn("Unexpected IO exception in ResponderThread", e);
- } finally {
- LOG.warn("QuorumPeer responder thread exited");
- }
- }
-
- }
-
- private ServerState state = ServerState.LOOKING;
-
- private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION);
- private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE);
- private AtomicReference<String> leaderAddress = new AtomicReference<>("");
- private AtomicLong leaderId = new AtomicLong(-1);
-
- private boolean reconfigFlag = false; // indicates that a reconfig just committed
-
- public synchronized void setPeerState(ServerState newState) {
- state = newState;
- if (newState == ServerState.LOOKING) {
- setLeaderAddressAndId(null, -1);
- setZabState(ZabState.ELECTION);
- } else {
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
- }
-
- public void setZabState(ZabState zabState) {
- if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) {
- long unavailableTime = Time.currentElapsedTime() - unavailableStartTime;
- ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime);
- if (getPeerState() == ServerState.LEADING) {
- ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime);
- }
- unavailableStartTime = 0;
- }
- this.zabState.set(zabState);
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
-
- public void setSyncMode(SyncMode syncMode) {
- this.syncMode.set(syncMode);
- LOG.info("Peer state changed: {}", getDetailedPeerState());
- }
-
- public ZabState getZabState() {
- return zabState.get();
- }
-
- public SyncMode getSyncMode() {
- return syncMode.get();
- }
-
- public void setLeaderAddressAndId(MultipleAddresses addr, long newId) {
- if (addr != null) {
- leaderAddress.set(String.join("|", addr.getAllHostStrings()));
- } else {
- leaderAddress.set(null);
- }
- leaderId.set(newId);
- }
-
- public String getLeaderAddress() {
- return leaderAddress.get();
- }
-
- public long getLeaderId() {
- return leaderId.get();
- }
-
- public String getDetailedPeerState() {
- final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase());
- final ZabState zabState = getZabState();
- if (!ZabState.ELECTION.equals(zabState)) {
- sb.append(" - ").append(zabState.toString().toLowerCase());
- }
- final SyncMode syncMode = getSyncMode();
- if (!SyncMode.NONE.equals(syncMode)) {
- sb.append(" - ").append(syncMode.toString().toLowerCase());
- }
- return sb.toString();
- }
-
- public synchronized void reconfigFlagSet() {
- reconfigFlag = true;
- }
- public synchronized void reconfigFlagClear() {
- reconfigFlag = false;
- }
- public synchronized boolean isReconfigStateChange() {
- return reconfigFlag;
- }
- public synchronized ServerState getPeerState() {
- return state;
- }
-
- DatagramSocket udpSocket;
-
- private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>();
-
- /**
- * Resolves hostname for a given server ID.
- *
- * This method resolves hostname for a given server ID in both quorumVerifer
- * and lastSeenQuorumVerifier. If the server ID matches the local server ID,
- * it also updates myAddrs.
- */
- public void recreateSocketAddresses(long id) {
- QuorumVerifier qv = getQuorumVerifier();
- if (qv != null) {
- QuorumServer qs = qv.getAllMembers().get(id);
- if (qs != null) {
- qs.recreateSocketAddresses();
- if (id == getMyId()) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
- }
- }
- }
- qv = getLastSeenQuorumVerifier();
- if (qv != null) {
- QuorumServer qs = qv.getAllMembers().get(id);
- if (qs != null) {
- qs.recreateSocketAddresses();
- }
- }
- }
-
- private AddressTuple getAddrs() {
- AddressTuple addrs = myAddrs.get();
- if (addrs != null) {
- return addrs;
- }
- try {
- synchronized (QV_LOCK) {
- addrs = myAddrs.get();
- while (addrs == null) {
- QV_LOCK.wait();
- addrs = myAddrs.get();
- }
- return addrs;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
-
- public MultipleAddresses getQuorumAddress() {
- return getAddrs().quorumAddr;
- }
-
- public MultipleAddresses getElectionAddress() {
- return getAddrs().electionAddr;
- }
-
- public InetSocketAddress getClientAddress() {
- final AddressTuple addrs = myAddrs.get();
- return (addrs == null) ? null : addrs.clientAddr;
- }
-
- private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) {
- synchronized (QV_LOCK) {
- myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr));
- QV_LOCK.notifyAll();
- }
- }
-
- private int electionType;
-
- Election electionAlg;
-
- ServerCnxnFactory cnxnFactory;
- ServerCnxnFactory secureCnxnFactory;
-
- private FileTxnSnapLog logFactory = null;
-
- private final QuorumStats quorumStats;
-
- AdminServer adminServer;
-
- private final boolean reconfigEnabled;
-
- public static QuorumPeer testingQuorumPeer() throws SaslException {
- return new QuorumPeer();
- }
-
- public QuorumPeer() throws SaslException {
- super("QuorumPeer");
- quorumStats = new QuorumStats(this);
- jmxRemotePeerBean = new HashMap<>();
- adminServer = AdminServerFactory.createAdminServer();
- x509Util = createX509Util();
- initialize();
- reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
- }
-
- // VisibleForTesting
- QuorumX509Util createX509Util() {
- return new QuorumX509Util();
- }
-
- /**
- * For backward compatibility purposes, we instantiate QuorumMaj by default.
- */
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException {
- this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers));
- }
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
- this();
- this.cnxnFactory = cnxnFactory;
- this.electionType = electionType;
- this.myid = myid;
- this.tickTime = tickTime;
- this.initLimit = initLimit;
- this.syncLimit = syncLimit;
- this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
- this.quorumListenOnAllIPs = quorumListenOnAllIPs;
- this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
- this.zkDb = new ZKDatabase(this.logFactory);
- if (quorumConfig == null) {
- quorumConfig = new QuorumMaj(quorumPeers);
- }
- setQuorumVerifier(quorumConfig, false);
- adminServer = AdminServerFactory.createAdminServer();
- }
-
- public void initialize() throws SaslException {
- // init quorum auth server & learner
- if (isQuorumSaslAuthEnabled()) {
- Set<String> authzHosts = new HashSet<>();
- for (QuorumServer qs : getView().values()) {
- authzHosts.add(qs.hostname);
- }
- authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
- authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
- } else {
- authServer = new NullQuorumAuthServer();
- authLearner = new NullQuorumAuthLearner();
- }
- }
-
- QuorumStats quorumStats() {
- return quorumStats;
- }
-
- @Override
- public synchronized void start() {
- if (!getView().containsKey(myid)) {
- throw new RuntimeException("My id " + myid + " not in the peer list");
- }
- loadDataBase();
- startServerCnxnFactory();
- try {
- adminServer.start();
- } catch (AdminServerException e) {
- LOG.warn("Problem starting AdminServer", e);
- }
- startLeaderElection();
- startJvmPauseMonitor();
- super.start();
- }
-
- private void loadDataBase() {
- try {
- zkDb.loadDataBase();
-
- // load the epochs
- long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
- long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
- try {
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- } catch (FileNotFoundException e) {
- // pick a reasonable epoch number
- // this should only happen once when moving to a
- // new code version
- currentEpoch = epochOfZxid;
- LOG.info(
- "{} not found! Creating with a reasonable default of {}. "
- + "This should only happen when you are upgrading your installation",
- CURRENT_EPOCH_FILENAME,
- currentEpoch);
- writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
- }
- if (epochOfZxid > currentEpoch) {
- // acceptedEpoch.tmp file in snapshot directory
- File currentTmp = new File(getTxnFactory().getSnapDir(),
- CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION);
- if (currentTmp.exists()) {
- long epochOfTmp = readLongFromFile(currentTmp.getName());
- LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp);
- setCurrentEpoch(epochOfTmp);
- } else {
- throw new IOException(
- "The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
- + ", is older than the last zxid, " + lastProcessedZxid);
- }
- }
- try {
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- } catch (FileNotFoundException e) {
- // pick a reasonable epoch number
- // this should only happen once when moving to a
- // new code version
- acceptedEpoch = epochOfZxid;
- LOG.info(
- "{} not found! Creating with a reasonable default of {}. "
- + "This should only happen when you are upgrading your installation",
- ACCEPTED_EPOCH_FILENAME,
- acceptedEpoch);
- writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
- }
- if (acceptedEpoch < currentEpoch) {
- throw new IOException("The accepted epoch, "
- + ZxidUtils.zxidToString(acceptedEpoch)
- + " is less than the current epoch, "
- + ZxidUtils.zxidToString(currentEpoch));
- }
- } catch (IOException ie) {
- LOG.error("Unable to load database on disk", ie);
- throw new RuntimeException("Unable to run quorum server ", ie);
- }
- }
-
- ResponderThread responder;
-
- public synchronized void stopLeaderElection() {
- responder.running = false;
- responder.interrupt();
- }
- public synchronized void startLeaderElection() {
- try {
- if (getPeerState() == ServerState.LOOKING) {
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
- }
- } catch (IOException e) {
- RuntimeException re = new RuntimeException(e.getMessage());
- re.setStackTrace(e.getStackTrace());
- throw re;
- }
-
- this.electionAlg = createElectionAlgorithm(electionType);
- }
-
- private void startJvmPauseMonitor() {
- if (this.jvmPauseMonitor != null) {
- this.jvmPauseMonitor.serviceStart();
- }
- }
-
- /**
- * Count the number of nodes in the map that could be followers.
- * @param peers
- * @return The number of followers in the map
- */
- protected static int countParticipants(Map<Long, QuorumServer> peers) {
- int count = 0;
- for (QuorumServer q : peers.values()) {
- if (q.type == LearnerType.PARTICIPANT) {
- count++;
- }
- }
- return count;
- }
-
- /**
- * This constructor is only used by the existing unit test code.
- * It defaults to FileLogProvider persistence provider.
- */
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- new QuorumMaj(quorumPeers));
- }
-
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- new QuorumOracleMaj(quorumPeers, oraclePath));
- }
-
- /**
- * This constructor is only used by the existing unit test code.
- * It defaults to FileLogProvider persistence provider.
- */
- public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException {
- this(
- quorumPeers,
- snapDir,
- logDir,
- electionAlg,
- myid,
- tickTime,
- initLimit,
- syncLimit,
- connectToLearnerMasterLimit,
- false,
- ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
- quorumConfig);
- }
-
- private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException {
- QuorumServer quorumServer = quorumPeers.get(myid);
- if (null == quorumServer) {
- throw new IOException("No QuorumServer correspoding to myid " + myid);
- }
- if (null == quorumServer.clientAddr) {
- return new InetSocketAddress(clientPort);
- }
- if (quorumServer.clientAddr.getPort() != clientPort) {
- throw new IOException("QuorumServer port "
- + quorumServer.clientAddr.getPort()
- + " does not match with given port "
- + clientPort);
- }
- return quorumServer.clientAddr;
- }
-
- /**
- * returns the highest zxid that this host has seen
- *
- * @return the highest zxid for this host
- */
- public long getLastLoggedZxid() {
- if (!zkDb.isInitialized()) {
- loadDataBase();
- }
- return zkDb.getDataTreeLastProcessedZxid();
- }
-
- public Follower follower;
- public Leader leader;
- public Observer observer;
-
- protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
- return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
- return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
- return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
- }
-
- @SuppressWarnings("deprecation")
- protected Election createElectionAlgorithm(int electionAlgorithm) {
- Election le = null;
-
- //TODO: use a factory rather than a switch
- switch (electionAlgorithm) {
- case 1:
- throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
- case 2:
- throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
- case 3:
- QuorumCnxManager qcm = createCnxnManager();
- QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
- if (oldQcm != null) {
- LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
- oldQcm.halt();
- }
- QuorumCnxManager.Listener listener = qcm.listener;
- if (listener != null) {
- listener.start();
- FastLeaderElection fle = new FastLeaderElection(this, qcm);
- fle.start();
- le = fle;
- } else {
- LOG.error("Null listener when initializing cnx manager");
- }
- break;
- default:
- assert false;
- }
- return le;
- }
-
- @SuppressWarnings("deprecation")
- protected Election makeLEStrategy() {
- LOG.debug("Initializing leader election protocol...");
- return electionAlg;
- }
-
- protected synchronized void setLeader(Leader newLeader) {
- leader = newLeader;
- }
-
- protected synchronized void setFollower(Follower newFollower) {
- follower = newFollower;
- }
-
- protected synchronized void setObserver(Observer newObserver) {
- observer = newObserver;
- }
-
- public synchronized ZooKeeperServer getActiveServer() {
- if (leader != null) {
- return leader.zk;
- } else if (follower != null) {
- return follower.zk;
- } else if (observer != null) {
- return observer.zk;
- }
- return null;
- }
-
- boolean shuttingDownLE = false;
-
- public void setSuspended(boolean suspended) {
- this.suspended.set(suspended);
- }
- private void checkSuspended() {
- try {
- while (suspended.get()) {
- Thread.sleep(10);
- }
- } catch (InterruptedException err) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void run() {
- updateThreadName();
-
- LOG.debug("Starting quorum peer");
- try {
- jmxQuorumBean = new QuorumBean(this);
- MBeanRegistry.getInstance().register(jmxQuorumBean, null);
- for (QuorumServer s : getView().values()) {
- ZKMBeanInfo p;
- if (getMyId() == s.id) {
- p = jmxLocalPeerBean = new LocalPeerBean(this);
- try {
- MBeanRegistry.getInstance().register(p, jmxQuorumBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxLocalPeerBean = null;
- }
- } else {
- RemotePeerBean rBean = new RemotePeerBean(this, s);
- try {
- MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
- jmxRemotePeerBean.put(s.id, rBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- }
- }
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxQuorumBean = null;
- }
-
- try {
- /*
- * Main loop
- */
- while (running) {
- if (unavailableStartTime == 0) {
- unavailableStartTime = Time.currentElapsedTime();
- }
-
- switch (getPeerState()) {
- case LOOKING:
- LOG.info("LOOKING");
- ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
-
- if (Boolean.getBoolean("readonlymode.enabled")) {
- LOG.info("Attempting to start ReadOnlyZooKeeperServer");
-
- // Create read-only server but don't start it immediately
- final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
-
- // Instead of starting roZk immediately, wait some grace
- // period before we decide we're partitioned.
- //
- // Thread is used here because otherwise it would require
- // changes in each of election strategy classes which is
- // unnecessary code coupling.
- Thread roZkMgr = new Thread() {
- public void run() {
- try {
- // lower-bound grace period to 2 secs
- sleep(Math.max(2000, tickTime));
- if (ServerState.LOOKING.equals(getPeerState())) {
- roZk.startup();
- }
- } catch (InterruptedException e) {
- LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
- } catch (Exception e) {
- LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
- }
- }
- };
- try {
- roZkMgr.start();
- reconfigFlagClear();
- if (shuttingDownLE) {
- shuttingDownLE = false;
- startLeaderElection();
- }
- setCurrentVote(makeLEStrategy().lookForLeader());
- checkSuspended();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- setPeerState(ServerState.LOOKING);
- } finally {
- // If the thread is in the the grace period, interrupt
- // to come out of waiting.
- roZkMgr.interrupt();
- roZk.shutdown();
- }
- } else {
- try {
- reconfigFlagClear();
- if (shuttingDownLE) {
- shuttingDownLE = false;
- startLeaderElection();
- }
- setCurrentVote(makeLEStrategy().lookForLeader());
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- setPeerState(ServerState.LOOKING);
- }
- }
- break;
- case OBSERVING:
- try {
- LOG.info("OBSERVING");
- setObserver(makeObserver(logFactory));
- observer.observeLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- observer.shutdown();
- setObserver(null);
- updateServerState();
-
- // Add delay jitter before we switch to LOOKING
- // state to reduce the load of ObserverMaster
- if (isRunning()) {
- Observer.waitForObserverElectionDelay();
- }
- }
- break;
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING");
- setFollower(makeFollower(logFactory));
- follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- follower.shutdown();
- setFollower(null);
- updateServerState();
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown");
- setLeader(null);
- }
- updateServerState();
- }
- break;
- }
- }
- } finally {
- LOG.warn("QuorumPeer main thread exited");
- MBeanRegistry instance = MBeanRegistry.getInstance();
- instance.unregister(jmxQuorumBean);
- instance.unregister(jmxLocalPeerBean);
-
- for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
- instance.unregister(remotePeerBean);
- }
-
- jmxQuorumBean = null;
- jmxLocalPeerBean = null;
- jmxRemotePeerBean = null;
- }
- }
-
- private synchronized void updateServerState() {
- if (!reconfigFlag) {
- setPeerState(ServerState.LOOKING);
- LOG.warn("PeerState set to LOOKING");
- return;
- }
-
- if (getMyId() == getCurrentVote().getId()) {
- setPeerState(ServerState.LEADING);
- LOG.debug("PeerState set to LEADING");
- } else if (getLearnerType() == LearnerType.PARTICIPANT) {
- setPeerState(ServerState.FOLLOWING);
- LOG.debug("PeerState set to FOLLOWING");
- } else if (getLearnerType() == LearnerType.OBSERVER) {
- setPeerState(ServerState.OBSERVING);
- LOG.debug("PeerState set to OBSERVER");
- } else { // currently shouldn't happen since there are only 2 learner types
- setPeerState(ServerState.LOOKING);
- LOG.debug("Should not be here");
- }
- reconfigFlag = false;
- }
-
- public void shutdown() {
- running = false;
- x509Util.close();
- if (leader != null) {
- leader.shutdown("quorum Peer shutdown");
- }
- if (follower != null) {
- follower.shutdown();
- }
- shutdownServerCnxnFactory();
- if (udpSocket != null) {
- udpSocket.close();
- }
- if (jvmPauseMonitor != null) {
- jvmPauseMonitor.serviceStop();
- }
-
- try {
- adminServer.shutdown();
- } catch (AdminServerException e) {
- LOG.warn("Problem stopping AdminServer", e);
- }
-
- if (getElectionAlg() != null) {
- this.interrupt();
- getElectionAlg().shutdown();
- }
- try {
- zkDb.close();
- } catch (IOException ie) {
- LOG.warn("Error closing logs ", ie);
- }
- }
-
- /**
- * A 'view' is a node's current opinion of the membership of the entire
- * ensemble.
- */
- public Map<Long, QuorumPeer.QuorumServer> getView() {
- return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
- }
-
- /**
- * Observers are not contained in this view, only nodes with
- * PeerType=PARTICIPANT.
- */
- public Map<Long, QuorumPeer.QuorumServer> getVotingView() {
- return getQuorumVerifier().getVotingMembers();
- }
-
- /**
- * Returns only observers, no followers.
- */
- public Map<Long, QuorumPeer.QuorumServer> getObservingView() {
- return getQuorumVerifier().getObservingMembers();
- }
-
- public synchronized Set<Long> getCurrentAndNextConfigVoters() {
- Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet());
- if (getLastSeenQuorumVerifier() != null) {
- voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet());
- }
- return voterIds;
- }
-
- /**
- * Check if a node is in the current view. With static membership, the
- * result of this check will never change; only when dynamic membership
- * is introduced will this be more useful.
- */
- public boolean viewContains(Long sid) {
- return this.getView().containsKey(sid);
- }
-
- /**
- * Only used by QuorumStats at the moment
- */
- public String[] getQuorumPeers() {
- List<String> l = new ArrayList<>();
- synchronized (this) {
- if (leader != null) {
- for (LearnerHandler fh : leader.getLearners()) {
- if (fh.getSocket() != null) {
- String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress());
- if (leader.isLearnerSynced(fh)) {
- s += "*";
- }
- l.add(s);
- }
- }
- } else if (follower != null) {
- l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress()));
- }
- }
- return l.toArray(new String[0]);
- }
-
- public String getServerState() {
- switch (getPeerState()) {
- case LOOKING:
- return QuorumStats.Provider.LOOKING_STATE;
- case LEADING:
- return QuorumStats.Provider.LEADING_STATE;
- case FOLLOWING:
- return QuorumStats.Provider.FOLLOWING_STATE;
- case OBSERVING:
- return QuorumStats.Provider.OBSERVING_STATE;
- }
- return QuorumStats.Provider.UNKNOWN_STATE;
- }
-
- /**
- * set the id of this quorum peer.
- */
- public void setMyid(long myid) {
- this.myid = myid;
- }
-
- public void setInitialConfig(String initialConfig) {
- this.initialConfig = initialConfig;
- }
-
- public String getInitialConfig() {
- return initialConfig;
- }
-
- /**
- * Get the number of milliseconds of each tick
- */
- public int getTickTime() {
- return tickTime;
- }
-
- /**
- * Set the number of milliseconds of each tick
- */
- public void setTickTime(int tickTime) {
- LOG.info("tickTime set to {}", tickTime);
- this.tickTime = tickTime;
- }
-
- /** Maximum number of connections allowed from particular host (ip) */
- public int getMaxClientCnxnsPerHost() {
- if (cnxnFactory != null) {
- return cnxnFactory.getMaxClientCnxnsPerHost();
- }
- if (secureCnxnFactory != null) {
- return secureCnxnFactory.getMaxClientCnxnsPerHost();
- }
- return -1;
- }
-
- /** Whether local sessions are enabled */
- public boolean areLocalSessionsEnabled() {
- return localSessionsEnabled;
- }
-
- /** Whether to enable local sessions */
- public void enableLocalSessions(boolean flag) {
- LOG.info("Local sessions {}", (flag ? "enabled" : "disabled"));
- localSessionsEnabled = flag;
- }
-
- /** Whether local sessions are allowed to upgrade to global sessions */
- public boolean isLocalSessionsUpgradingEnabled() {
- return localSessionsUpgradingEnabled;
- }
-
- /** Whether to allow local sessions to upgrade to global sessions */
- public void enableLocalSessionsUpgrading(boolean flag) {
- LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled"));
- localSessionsUpgradingEnabled = flag;
- }
-
- /** minimum session timeout in milliseconds */
- public int getMinSessionTimeout() {
- return minSessionTimeout;
- }
-
- /** minimum session timeout in milliseconds */
- public void setMinSessionTimeout(int min) {
- LOG.info("minSessionTimeout set to {}", min);
- this.minSessionTimeout = min;
- }
-
- /** maximum session timeout in milliseconds */
- public int getMaxSessionTimeout() {
- return maxSessionTimeout;
- }
-
- /** maximum session timeout in milliseconds */
- public void setMaxSessionTimeout(int max) {
- LOG.info("maxSessionTimeout set to {}", max);
- this.maxSessionTimeout = max;
- }
-
- /** The server socket's listen backlog length */
- public int getClientPortListenBacklog() {
- return this.clientPortListenBacklog;
- }
-
- /** Sets the server socket's listen backlog length. */
- public void setClientPortListenBacklog(int backlog) {
- this.clientPortListenBacklog = backlog;
- }
-
- /**
- * Get the number of ticks that the initial synchronization phase can take
- */
- public int getInitLimit() {
- return initLimit;
- }
-
- /**
- * Set the number of ticks that the initial synchronization phase can take
- */
- public void setInitLimit(int initLimit) {
- LOG.info("initLimit set to {}", initLimit);
- this.initLimit = initLimit;
- }
-
- /**
- * Get the current tick
- */
- public int getTick() {
- return tick.get();
- }
-
- public QuorumVerifier configFromString(String s) throws IOException, ConfigException {
- Properties props = new Properties();
- props.load(new StringReader(s));
- return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath());
- }
-
- /**
- * Return QuorumVerifier object for the last committed configuration.
- */
- public QuorumVerifier getQuorumVerifier() {
- synchronized (QV_LOCK) {
- return quorumVerifier;
- }
- }
-
- /**
- * Return QuorumVerifier object for the last proposed configuration.
- */
- public QuorumVerifier getLastSeenQuorumVerifier() {
- synchronized (QV_LOCK) {
- return lastSeenQuorumVerifier;
- }
- }
-
- public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) {
- if (qvOLD == null || !qvOLD.equals(qvNEW)) {
- LOG.warn("Restarting Leader Election");
- getElectionAlg().shutdown();
- shuttingDownLE = false;
- startLeaderElection();
- }
- }
-
- public String getNextDynamicConfigFilename() {
- if (configFilename == null) {
- LOG.warn("configFilename is null! This should only happen in tests.");
- return null;
- }
- return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix;
- }
-
- // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK
- // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from
- // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take
- // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken
- // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne().
- private void connectNewPeers(QuorumCnxManager qcm) {
- if (quorumVerifier != null && lastSeenQuorumVerifier != null) {
- Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers();
- for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) {
- if (e.getKey() != getMyId() && !committedView.containsKey(e.getKey())) {
- qcm.connectOne(e.getKey());
- }
- }
- }
- }
-
- public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
- if (!isReconfigEnabled()) {
- LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
- return;
- }
-
- // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
- // and then take QV_LOCK. Take the locks in the same order to ensure that we don't
- // deadlock against other callers of connectOne(). If qcmRef gets set in another
- // thread while we're inside the synchronized block, that does no harm; if we didn't
- // take a lock on qcm (because it was null when we sampled it), we won't call
- // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility
- // of updates that provably happen in another thread before entering this method.)
- QuorumCnxManager qcm = qcmRef.get();
- Object outerLockObject = (qcm != null) ? qcm : QV_LOCK;
- synchronized (outerLockObject) {
- synchronized (QV_LOCK) {
- if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) {
- LOG.error("setLastSeenQuorumVerifier called with stale config "
- + qv.getVersion()
- + ". Current version: "
- + quorumVerifier.getVersion());
- }
- // assuming that a version uniquely identifies a configuration, so if
- // version is the same, nothing to do here.
- if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) {
- return;
- }
- lastSeenQuorumVerifier = qv;
- if (qcm != null) {
- connectNewPeers(qcm);
- }
-
- if (writeToDisk) {
- try {
- String fileName = getNextDynamicConfigFilename();
- if (fileName != null) {
- QuorumPeerConfig.writeDynamicConfig(fileName, qv, true);
- }
- } catch (IOException e) {
- LOG.error("Error writing next dynamic config file to disk", e);
- }
- }
- }
- }
- }
-
- public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
- synchronized (QV_LOCK) {
- if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
- // this is normal. For example - server found out about new config through FastLeaderElection gossiping
- // and then got the same config in UPTODATE message so its already known
- LOG.debug(
- "{} setQuorumVerifier called with known or old config {}. Current version: {}",
- getMyId(),
- qv.getVersion(),
- quorumVerifier.getVersion());
- return quorumVerifier;
- }
- QuorumVerifier prevQV = quorumVerifier;
- quorumVerifier = qv;
- if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) {
- lastSeenQuorumVerifier = qv;
- }
-
- if (writeToDisk) {
- // some tests initialize QuorumPeer without a static config file
- if (configFilename != null) {
- try {
- String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion());
- QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false);
- QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig());
- } catch (IOException e) {
- LOG.error("Error closing file", e);
- }
- } else {
- LOG.info("writeToDisk == true but configFilename == null");
- }
- }
-
- if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) {
- QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename());
- }
- QuorumServer qs = qv.getAllMembers().get(getMyId());
- if (qs != null) {
- setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
- }
- updateObserverMasterList();
- return prevQV;
- }
- }
-
- private String makeDynamicConfigFilename(long version) {
- return configFilename + ".dynamic." + Long.toHexString(version);
- }
-
- private boolean needEraseClientInfoFromStaticConfig() {
- QuorumServer server = quorumVerifier.getAllMembers().get(getMyId());
- return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic);
- }
-
- /**
- * Get an instance of LeaderElection
- */
- public Election getElectionAlg() {
- return electionAlg;
- }
-
- /**
- * Get the synclimit
- */
- public int getSyncLimit() {
- return syncLimit;
- }
-
- /**
- * Set the synclimit
- */
- public void setSyncLimit(int syncLimit) {
- LOG.info("syncLimit set to {}", syncLimit);
- this.syncLimit = syncLimit;
- }
-
- /**
- * Get the connectToLearnerMasterLimit
- */
- public int getConnectToLearnerMasterLimit() {
- return connectToLearnerMasterLimit;
- }
-
- /**
- * Set the connectToLearnerMasterLimit
- */
- public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
- LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit);
- this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
- }
-
- /**
- * The syncEnabled can also be set via a system property.
- */
- public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled";
-
- /**
- * Return syncEnabled.
- */
- public boolean getSyncEnabled() {
- if (System.getProperty(SYNC_ENABLED) != null) {
- LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED));
- return Boolean.getBoolean(SYNC_ENABLED);
- } else {
- return syncEnabled;
- }
- }
-
- /**
- * Set syncEnabled.
- *
- * @param syncEnabled
- */
- public void setSyncEnabled(boolean syncEnabled) {
- this.syncEnabled = syncEnabled;
- }
-
- /**
- * Gets the election type
- */
- public int getElectionType() {
- return electionType;
- }
-
- /**
- * Sets the election type
- */
- public void setElectionType(int electionType) {
- this.electionType = electionType;
- }
-
- public boolean getQuorumListenOnAllIPs() {
- return quorumListenOnAllIPs;
- }
-
- public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
- this.quorumListenOnAllIPs = quorumListenOnAllIPs;
- }
-
- public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
- this.cnxnFactory = cnxnFactory;
- }
-
- public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) {
- this.secureCnxnFactory = secureCnxnFactory;
- }
-
- public void setSslQuorum(boolean sslQuorum) {
- if (sslQuorum) {
- LOG.info("Using TLS encrypted quorum communication");
- } else {
- LOG.info("Using insecure (non-TLS) quorum communication");
- }
- this.sslQuorum = sslQuorum;
- }
-
- public void setUsePortUnification(boolean shouldUsePortUnification) {
- LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
- this.shouldUsePortUnification = shouldUsePortUnification;
- }
-
- private void startServerCnxnFactory() {
- if (cnxnFactory != null) {
- cnxnFactory.start();
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.start();
- }
- }
-
- private void shutdownServerCnxnFactory() {
- if (cnxnFactory != null) {
- cnxnFactory.shutdown();
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.shutdown();
- }
- }
-
- // Leader and learner will control the zookeeper server and pass it into QuorumPeer.
- public void setZooKeeperServer(ZooKeeperServer zks) {
- if (cnxnFactory != null) {
- cnxnFactory.setZooKeeperServer(zks);
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.setZooKeeperServer(zks);
- }
- }
-
- public void closeAllConnections() {
- if (cnxnFactory != null) {
- cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
- }
- if (secureCnxnFactory != null) {
- secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
- }
- }
-
- public int getClientPort() {
- if (cnxnFactory != null) {
- return cnxnFactory.getLocalPort();
- }
- return -1;
- }
-
- public int getSecureClientPort() {
- if (secureCnxnFactory != null) {
- return secureCnxnFactory.getLocalPort();
- }
- return -1;
- }
-
- public void setTxnFactory(FileTxnSnapLog factory) {
- this.logFactory = factory;
- }
-
- public FileTxnSnapLog getTxnFactory() {
- return this.logFactory;
- }
-
- /**
- * set zk database for this node
- * @param database
- */
- public void setZKDatabase(ZKDatabase database) {
- this.zkDb = database;
- }
-
- protected ZKDatabase getZkDb() {
- return zkDb;
- }
-
- public synchronized void initConfigInZKDatabase() {
- if (zkDb != null) {
- zkDb.initConfigInZKDatabase(getQuorumVerifier());
- }
- }
-
- public boolean isRunning() {
- return running;
- }
-
- /**
- * get reference to QuorumCnxManager
- */
- public QuorumCnxManager getQuorumCnxManager() {
- return qcmRef.get();
- }
- private long readLongFromFile(String name) throws IOException {
- File file = new File(logFactory.getSnapDir(), name);
- BufferedReader br = new BufferedReader(new FileReader(file));
- String line = "";
- try {
- line = br.readLine();
- return Long.parseLong(line);
- } catch (NumberFormatException e) {
- throw new IOException("Found " + line + " in " + file);
- } finally {
- br.close();
- }
- }
-
- private long acceptedEpoch = -1;
- private long currentEpoch = -1;
-
- public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";
-
- public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";
-
- /**
- * Write a long value to disk atomically. Either succeeds or an exception
- * is thrown.
- * @param name file name to write the long to
- * @param value the long value to write to the named file
- * @throws IOException if the file cannot be written atomically
- */
- // visibleForTest
- void writeLongToFile(String name, final long value) throws IOException {
- File file = new File(logFactory.getSnapDir(), name);
- new AtomicFileWritingIdiom(file, new WriterStatement() {
- @Override
- public void write(Writer bw) throws IOException {
- bw.write(Long.toString(value));
- }
- });
- }
-
- public long getCurrentEpoch() throws IOException {
- if (currentEpoch == -1) {
- currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
- }
- return currentEpoch;
- }
-
- public long getAcceptedEpoch() throws IOException {
- if (acceptedEpoch == -1) {
- acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
- }
- return acceptedEpoch;
- }
-
- public void setCurrentEpoch(long e) throws IOException {
- writeLongToFile(CURRENT_EPOCH_FILENAME, e);
- currentEpoch = e;
- }
-
- public void setAcceptedEpoch(long e) throws IOException {
- writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
- acceptedEpoch = e;
- }
-
- public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
- if (!isReconfigEnabled()) {
- LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
- return false;
- }
-
- InetSocketAddress oldClientAddr = getClientAddress();
-
- // update last committed quorum verifier, write the new config to disk
- // and restart leader election if config changed.
- QuorumVerifier prevQV = setQuorumVerifier(qv, true);
-
- // There is no log record for the initial config, thus after syncing
- // with leader
- // /zookeeper/config is empty! it is also possible that last committed
- // config is propagated during leader election
- // without the propagation the corresponding log records.
- // so we should explicitly do this (this is not necessary when we're
- // already a Follower/Observer, only
- // for Learner):
- initConfigInZKDatabase();
-
- if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) {
- Map<Long, QuorumServer> newMembers = qv.getAllMembers();
- updateRemotePeerMXBeans(newMembers);
- if (restartLE) {
- restartLeaderElection(prevQV, qv);
- }
-
- QuorumServer myNewQS = newMembers.get(getMyId());
- if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) {
- cnxnFactory.reconfigure(myNewQS.clientAddr);
- updateThreadName();
- }
-
- boolean roleChange = updateLearnerType(qv);
- boolean leaderChange = false;
- if (suggestedLeaderId != null) {
- // zxid should be non-null too
- leaderChange = updateVote(suggestedLeaderId, zxid);
- } else {
- long currentLeaderId = getCurrentVote().getId();
- QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId);
- QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId);
- leaderChange = (myleaderInCurQV == null
- || myleaderInCurQV.addr == null
- || myleaderInNewQV == null
- || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr));
- // we don't have a designated leader - need to go into leader
- // election
- reconfigFlagClear();
- }
-
- return roleChange || leaderChange;
- }
- return false;
-
- }
-
- private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) {
- Set<Long> existingMembers = new HashSet<>(newMembers.keySet());
- existingMembers.retainAll(jmxRemotePeerBean.keySet());
- for (Long id : existingMembers) {
- RemotePeerBean rBean = jmxRemotePeerBean.get(id);
- rBean.setQuorumServer(newMembers.get(id));
- }
-
- Set<Long> joiningMembers = new HashSet<>(newMembers.keySet());
- joiningMembers.removeAll(jmxRemotePeerBean.keySet());
- joiningMembers.remove(getMyId()); // remove self as it is local bean
- for (Long id : joiningMembers) {
- QuorumServer qs = newMembers.get(id);
- RemotePeerBean rBean = new RemotePeerBean(this, qs);
- try {
- MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
- jmxRemotePeerBean.put(qs.id, rBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- }
-
- Set<Long> leavingMembers = new HashSet<>(jmxRemotePeerBean.keySet());
- leavingMembers.removeAll(newMembers.keySet());
- for (Long id : leavingMembers) {
- RemotePeerBean rBean = jmxRemotePeerBean.remove(id);
- try {
- MBeanRegistry.getInstance().unregister(rBean);
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- }
- }
-
- private ArrayList<QuorumServer> observerMasters = new ArrayList<>();
- private void updateObserverMasterList() {
- if (observerMasterPort <= 0) {
- return; // observer masters not enabled
- }
- observerMasters.clear();
- StringBuilder sb = new StringBuilder();
- for (QuorumServer server : quorumVerifier.getVotingMembers().values()) {
- InetAddress address = server.addr.getReachableOrOne().getAddress();
- InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort);
- observerMasters.add(new QuorumServer(server.id, addr));
- sb.append(addr).append(",");
- }
- LOG.info("Updated learner master list to be {}", sb.toString());
- Collections.shuffle(observerMasters);
- // Reset the internal index of the observerMaster when
- // the observerMaster List is refreshed
- nextObserverMaster = 0;
- }
-
- private boolean useObserverMasters() {
- return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0;
- }
-
- private int nextObserverMaster = 0;
- private QuorumServer nextObserverMaster() {
- if (nextObserverMaster >= observerMasters.size()) {
- nextObserverMaster = 0;
- // Add a reconnect delay only after the observer
- // has exhausted trying to connect to all the masters
- // from the observerMasterList
- if (isRunning()) {
- Observer.waitForReconnectDelay();
- }
- }
- return observerMasters.get(nextObserverMaster++);
- }
-
- QuorumServer findLearnerMaster(QuorumServer leader) {
- if (useObserverMasters()) {
- return nextObserverMaster();
- } else {
- // Add delay jitter to reduce the load on the leader
- if (isRunning()) {
- Observer.waitForReconnectDelay();
- }
- return leader;
- }
- }
-
- /**
- * Vet a given learner master's information.
- * Allows specification by server id, ip only, or ip and port
- */
- QuorumServer validateLearnerMaster(String desiredMaster) {
- if (useObserverMasters()) {
- Long sid;
- try {
- sid = Long.parseLong(desiredMaster);
- } catch (NumberFormatException e) {
- sid = null;
- }
- for (QuorumServer server : observerMasters) {
- if (sid == null) {
- for (InetSocketAddress address : server.addr.getAllAddresses()) {
- String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort();
- if (serverAddr.startsWith(desiredMaster)) {
- return server;
- }
- }
- } else {
- if (sid.equals(server.id)) {
- return server;
- }
- }
- }
- if (sid == null) {
- LOG.info("could not find learner master address={}", desiredMaster);
- } else {
- LOG.warn("could not find learner master sid={}", sid);
- }
- } else {
- LOG.info("cannot validate request, observer masters not enabled");
- }
- return null;
- }
-
- private boolean updateLearnerType(QuorumVerifier newQV) {
- //check if I'm an observer in new config
- if (newQV.getObservingMembers().containsKey(getMyId())) {
- if (getLearnerType() != LearnerType.OBSERVER) {
- setLearnerType(LearnerType.OBSERVER);
- LOG.info("Becoming an observer");
- reconfigFlagSet();
- return true;
- } else {
- return false;
- }
- } else if (newQV.getVotingMembers().containsKey(getMyId())) {
- if (getLearnerType() != LearnerType.PARTICIPANT) {
- setLearnerType(LearnerType.PARTICIPANT);
- LOG.info("Becoming a voting participant");
- reconfigFlagSet();
- return true;
- } else {
- return false;
- }
- }
- // I'm not in the view
- if (getLearnerType() != LearnerType.PARTICIPANT) {
- setLearnerType(LearnerType.PARTICIPANT);
- LOG.info("Becoming a non-voting participant");
- reconfigFlagSet();
- return true;
- }
- return false;
- }
-
- private boolean updateVote(long designatedLeader, long zxid) {
- Vote currentVote = getCurrentVote();
- if (currentVote != null && designatedLeader != currentVote.getId()) {
- setCurrentVote(new Vote(designatedLeader, zxid));
- reconfigFlagSet();
- LOG.warn("Suggested leader: {}", designatedLeader);
- return true;
- }
- return false;
- }
-
- /**
- * Updates leader election info to avoid inconsistencies when
- * a new server tries to join the ensemble.
- *
- * Here is the inconsistency scenario we try to solve by updating the peer
- * epoch after following leader:
- *
- * Let's say we have an ensemble with 3 servers z1, z2 and z3.
- *
- * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is
- * 0xb9, aka current accepted epoch on disk.
- * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading
- * the current accept epoch from disk.
- * 3. z2 received notification from z1 and z3, which is following z3 with
- * epoch 0xb8, so it started following z3 again with peer epoch 0xb8.
- * 4. before z2 successfully connected to z3, z3 get restarted with new
- * epoch 0xb9.
- * 5. z2 will retry around a few round (default 5s) before giving up,
- * meanwhile it will report z3 as leader.
- * 6. z1 restarted, and looking with peer epoch 0xb9.
- * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9.
- * 8. z2 successfully connected to z3 before giving up, but with peer
- * epoch 0xb8.
- * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot
- * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting
- * 0xb9.
- *
- * By updating the election vote after actually following leader, we can
- * avoid this kind of stuck happened.
- *
- * Btw, the zxid and electionEpoch could be inconsistent because of the same
- * reason, it's better to update these as well after syncing with leader, but
- * that required protocol change which is non trivial. This problem is worked
- * around by skipping comparing the zxid and electionEpoch when counting for
- * votes for out of election servers during looking for leader.
- *
- * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732
- */
- protected void updateElectionVote(long newEpoch) {
- Vote currentVote = getCurrentVote();
- if (currentVote != null) {
- setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote
- .getState()));
- }
- }
-
- private void updateThreadName() {
- String plain = cnxnFactory != null
- ? cnxnFactory.getLocalAddress() != null
- ? formatInetAddr(cnxnFactory.getLocalAddress())
- : "disabled"
- : "disabled";
- String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled";
- setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getMyId(), plain, secure));
- }
-
- /**
- * Sets the time taken for leader election in milliseconds.
- *
- * @param electionTimeTaken time taken for leader election
- */
- void setElectionTimeTaken(long electionTimeTaken) {
- this.electionTimeTaken = electionTimeTaken;
- }
-
- /**
- * @return the time taken for leader election in milliseconds.
- */
- long getElectionTimeTaken() {
- return electionTimeTaken;
- }
-
- void setQuorumServerSaslRequired(boolean serverSaslRequired) {
- quorumServerSaslAuthRequired = serverSaslRequired;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired);
- }
-
- void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) {
- quorumLearnerSaslAuthRequired = learnerSaslRequired;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired);
- }
-
- void setQuorumSaslEnabled(boolean enableAuth) {
- quorumSaslEnableAuth = enableAuth;
- if (!quorumSaslEnableAuth) {
- LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)");
- } else {
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth);
- }
- }
-
- void setQuorumServicePrincipal(String servicePrincipal) {
- quorumServicePrincipal = servicePrincipal;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal);
- }
-
- void setQuorumLearnerLoginContext(String learnerContext) {
- quorumLearnerLoginContext = learnerContext;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext);
- }
-
- void setQuorumServerLoginContext(String serverContext) {
- quorumServerLoginContext = serverContext;
- LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext);
- }
-
- void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
- if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
- quorumCnxnThreadsSize = qCnxnThreadsSize;
- }
- LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
- }
-
- boolean isQuorumSaslAuthEnabled() {
- return quorumSaslEnableAuth;
- }
-
- private boolean isQuorumServerSaslAuthRequired() {
- return quorumServerSaslAuthRequired;
- }
-
- private boolean isQuorumLearnerSaslAuthRequired() {
- return quorumLearnerSaslAuthRequired;
- }
-
- public QuorumCnxManager createCnxnManager() {
- int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit;
- LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout);
- return new QuorumCnxManager(
- this,
- this.getMyId(),
- this.getView(),
- this.authServer,
- this.authLearner,
- timeout,
- this.getQuorumListenOnAllIPs(),
- this.quorumCnxnThreadsSize,
- this.isQuorumSaslAuthEnabled());
- }
-
- boolean isLeader(long id) {
- Vote vote = getCurrentVote();
- return vote != null && id == vote.getId();
- }
-
- public boolean isReconfigEnabled() {
- return reconfigEnabled;
- }
-
- @InterfaceAudience.Private
- /**
- * This is a metric that depends on the status of the peer.
- */ public Integer getSynced_observers_metric() {
- if (leader != null) {
- return leader.getObservingLearners().size();
- } else if (follower != null) {
- return follower.getSyncedObserverSize();
- } else {
- return null;
- }
- }
-
- /**
- * Create a new QuorumPeer and apply all the values per the already-parsed config.
- *
- * @param config The appertained quorum peer config.
- * @return A QuorumPeer instantiated with specified peer config. Note this peer
- * is not fully initialized; caller should finish initialization through
- * additional configurations (connection factory settings, etc).
- *
- * @throws IOException
- */
- public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException {
- QuorumPeer quorumPeer = new QuorumPeer();
- quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
- quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
- quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
- quorumPeer.setElectionType(config.getElectionAlg());
- quorumPeer.setMyid(config.getServerId());
- quorumPeer.setTickTime(config.getTickTime());
- quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
- quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
- quorumPeer.setInitLimit(config.getInitLimit());
- quorumPeer.setSyncLimit(config.getSyncLimit());
- quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
- quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
- quorumPeer.setConfigFileName(config.getConfigFilename());
- quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
- quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
- if (config.getLastSeenQuorumVerifier() != null) {
- quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
- }
- quorumPeer.initConfigInZKDatabase();
- quorumPeer.setSslQuorum(config.isSslQuorum());
- quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
- quorumPeer.setLearnerType(config.getPeerType());
- quorumPeer.setSyncEnabled(config.getSyncEnabled());
- quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
- if (config.sslQuorumReloadCertFiles) {
- quorumPeer.getX509Util().enableCertFileReloading();
- }
- quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
- quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
- quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
-
- // sets quorum sasl authentication configurations
- quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
- if (quorumPeer.isQuorumSaslAuthEnabled()) {
- quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
- quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
- quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
- quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
- quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
- }
- quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
-
- if (config.jvmPauseMonitorToRun) {
- quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
- }
-
- return quorumPeer;
- }
-
-}