diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java')
-rw-r--r-- | zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java | 2711 |
1 files changed, 2711 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java new file mode 100644 index 00000000000..f6fc87d7716 --- /dev/null +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -0,0 +1,2711 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.apache.zookeeper.common.NetUtils.formatInetAddr; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.io.Writer; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.security.sasl.SaslException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException.BadArgumentsException; +import org.apache.zookeeper.common.AtomicFileOutputStream; +import org.apache.zookeeper.common.AtomicFileWritingIdiom; +import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement; +import org.apache.zookeeper.common.QuorumX509Util; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.jmx.ZKMBeanInfo; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperThread; +import org.apache.zookeeper.server.admin.AdminServer; +import org.apache.zookeeper.server.admin.AdminServer.AdminServerException; +import org.apache.zookeeper.server.admin.AdminServerFactory; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner; +import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer; +import org.apache.zookeeper.server.quorum.auth.QuorumAuth; +import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner; +import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; +import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner; +import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ConfigUtils; +import org.apache.zookeeper.server.util.JvmPauseMonitor; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages the quorum protocol. There are three states this server + * can be in: + * <ol> + * <li>Leader election - each server will elect a leader (proposing itself as a + * leader initially).</li> + * <li>Follower - the server will synchronize with the leader and replicate any + * transactions.</li> + * <li>Leader - the server will process requests and forward them to followers. + * A majority of followers must log the request before it can be accepted. + * </ol> + * + * This class will setup a datagram socket that will always respond with its + * view of the current leader. The response will take the form of: + * + * <pre> + * int xid; + * + * long myid; + * + * long leader_id; + * + * long leader_zxid; + * </pre> + * + * The request for the current leader will consist solely of an xid: int xid; + */ +public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { + + private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class); + + public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames"; + public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false"; + + private QuorumBean jmxQuorumBean; + LocalPeerBean jmxLocalPeerBean; + private Map<Long, RemotePeerBean> jmxRemotePeerBean; + LeaderElectionBean jmxLeaderElectionBean; + + // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility + // of updates; see the implementation comment at setLastSeenQuorumVerifier(). + private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>(); + + QuorumAuthServer authServer; + QuorumAuthLearner authLearner; + + /** + * ZKDatabase is a top level member of quorumpeer + * which will be used in all the zookeeperservers + * instantiated later. Also, it is created once on + * bootup and only thrown away in case of a truncate + * message from the leader + */ + private ZKDatabase zkDb; + + private JvmPauseMonitor jvmPauseMonitor; + + private final AtomicBoolean suspended = new AtomicBoolean(false); + + public static final class AddressTuple { + + public final MultipleAddresses quorumAddr; + public final MultipleAddresses electionAddr; + public final InetSocketAddress clientAddr; + + public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) { + this.quorumAddr = quorumAddr; + this.electionAddr = electionAddr; + this.clientAddr = clientAddr; + } + + } + + private int observerMasterPort; + + public int getObserverMasterPort() { + return observerMasterPort; + } + + public void setObserverMasterPort(int observerMasterPort) { + this.observerMasterPort = observerMasterPort; + } + + public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled"; + public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false"; + + private boolean multiAddressEnabled = true; + public boolean isMultiAddressEnabled() { + return multiAddressEnabled; + } + + public void setMultiAddressEnabled(boolean multiAddressEnabled) { + this.multiAddressEnabled = multiAddressEnabled; + LOG.info("multiAddress.enabled set to {}", multiAddressEnabled); + } + + public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs"; + + private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis(); + public int getMultiAddressReachabilityCheckTimeoutMs() { + return multiAddressReachabilityCheckTimeoutMs; + } + + public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) { + this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs; + LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs); + } + + public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled"; + + private boolean multiAddressReachabilityCheckEnabled = true; + + public boolean isMultiAddressReachabilityCheckEnabled() { + return multiAddressReachabilityCheckEnabled; + } + + public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) { + this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled; + LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled); + } + + public static class QuorumServer { + + public MultipleAddresses addr = new MultipleAddresses(); + + public MultipleAddresses electionAddr = new MultipleAddresses(); + + public InetSocketAddress clientAddr = null; + + public long id; + + public String hostname; + + public LearnerType type = LearnerType.PARTICIPANT; + + public boolean isClientAddrFromStatic = false; + + private List<InetSocketAddress> myAddrs; + + public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) { + this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT); + } + + public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) { + this(id, addr, electionAddr, null, LearnerType.PARTICIPANT); + } + + // VisibleForTesting + public QuorumServer(long id, InetSocketAddress addr) { + this(id, addr, null, null, LearnerType.PARTICIPANT); + } + + public long getId() { + return id; + } + + /** + * Performs a DNS lookup for server address and election address. + * + * If the DNS lookup fails, this.addr and electionAddr remain + * unmodified. + */ + public void recreateSocketAddresses() { + if (this.addr.isEmpty()) { + LOG.warn("Server address has not been initialized"); + return; + } + if (this.electionAddr.isEmpty()) { + LOG.warn("Election address has not been initialized"); + return; + } + this.addr.recreateSocketAddresses(); + this.electionAddr.recreateSocketAddresses(); + } + + private LearnerType getType(String s) throws ConfigException { + switch (s.trim().toLowerCase()) { + case "observer": + return LearnerType.OBSERVER; + case "participant": + return LearnerType.PARTICIPANT; + default: + throw new ConfigException("Unrecognised peertype: " + s); + } + } + + public QuorumServer(long sid, String addressStr) throws ConfigException { + this(sid, addressStr, QuorumServer::getInetAddress); + } + + QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException { + this.id = sid; + initializeWithAddressString(addressStr, getInetAddress); + } + + public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) { + this(id, addr, electionAddr, null, type); + } + + public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) { + this.id = id; + if (addr != null) { + this.addr.addAddress(addr); + } + if (electionAddr != null) { + this.electionAddr.addAddress(electionAddr); + } + this.type = type; + this.clientAddr = clientAddr; + + setMyAddrs(); + } + + private static final String wrongFormat = + " does not have the form server_config or server_config;client_config" + + " where server_config is the pipe separated list of host:port:port or host:port:port:type" + + " and client_config is port or host:port"; + + private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException { + LearnerType newType = null; + String[] serverClientParts = addressStr.split(";"); + String[] serverAddresses = serverClientParts[0].split("\\|"); + + if (serverClientParts.length == 2) { + String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]); + if (clientParts.length > 2) { + throw new ConfigException(addressStr + wrongFormat); + } + + // is client_config a host:port or just a port + String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0"; + try { + clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1])); + } catch (NumberFormatException e) { + throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]); + } + } + + boolean multiAddressEnabled = Boolean.parseBoolean( + System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED)); + if (!multiAddressEnabled && serverAddresses.length > 1) { + throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id); + } + + boolean canonicalize = Boolean.parseBoolean( + System.getProperty( + CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES, + CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES)); + + for (String serverAddress : serverAddresses) { + String serverParts[] = ConfigUtils.getHostAndPort(serverAddress); + if ((serverClientParts.length > 2) || (serverParts.length < 3) + || (serverParts.length > 4)) { + throw new ConfigException(addressStr + wrongFormat); + } + + String serverHostName = serverParts[0]; + + // server_config should be either host:port:port or host:port:port:type + InetSocketAddress tempAddress; + InetSocketAddress tempElectionAddress; + try { + tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1])); + addr.addAddress(tempAddress); + } catch (NumberFormatException e) { + throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]); + } + try { + tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2])); + electionAddr.addAddress(tempElectionAddress); + } catch (NumberFormatException e) { + throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]); + } + + if (tempAddress.getPort() == tempElectionAddress.getPort()) { + throw new ConfigException("Client and election port must be different! Please update the " + + "configuration file on server." + this.id); + } + + if (canonicalize) { + InetAddress ia = getInetAddress.apply(tempAddress); + if (ia == null) { + throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable"); + } + + String canonicalHostName = ia.getCanonicalHostName(); + + if (!canonicalHostName.equals(serverHostName) + // Avoid using literal IP address when + // security check fails + && !canonicalHostName.equals(ia.getHostAddress())) { + LOG.info("Host name for quorum server {} " + + "canonicalized from {} to {}", + this.id, serverHostName, canonicalHostName); + serverHostName = canonicalHostName; + } + } + + if (serverParts.length == 4) { + LearnerType tempType = getType(serverParts[3]); + if (newType == null) { + newType = tempType; + } + + if (newType != tempType) { + throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType); + } + } + + this.hostname = serverHostName; + } + + if (newType != null) { + type = newType; + } + + setMyAddrs(); + } + + private static InetAddress getInetAddress(InetSocketAddress addr) { + return addr.getAddress(); + } + + private void setMyAddrs() { + this.myAddrs = new ArrayList<>(); + this.myAddrs.addAll(this.addr.getAllAddresses()); + this.myAddrs.add(this.clientAddr); + this.myAddrs.addAll(this.electionAddr.getAllAddresses()); + this.myAddrs = excludedSpecialAddresses(this.myAddrs); + } + + public static String delimitedHostString(InetSocketAddress addr) { + String host = addr.getHostString(); + if (host.contains(":")) { + return "[" + host + "]"; + } else { + return host; + } + } + + public String toString() { + StringWriter sw = new StringWriter(); + + List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses()); + List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses()); + + if (addrList.size() > 0 && electionAddrList.size() > 0) { + addrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); + electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); + sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d", + delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort())) + .collect(Collectors.joining("|"))); + } + + if (type == LearnerType.OBSERVER) { + sw.append(":observer"); + } else if (type == LearnerType.PARTICIPANT) { + sw.append(":participant"); + } + + if (clientAddr != null && !isClientAddrFromStatic) { + sw.append(";"); + sw.append(delimitedHostString(clientAddr)); + sw.append(":"); + sw.append(String.valueOf(clientAddr.getPort())); + } + + return sw.toString(); + } + + public int hashCode() { + assert false : "hashCode not designed"; + return 42; // any arbitrary constant will do + } + + private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) { + return (addr1 != null || addr2 == null) + && (addr1 == null || addr2 != null) + && (addr1 == null || addr2 == null || addr1.equals(addr2)); + } + + public boolean equals(Object o) { + if (!(o instanceof QuorumServer)) { + return false; + } + QuorumServer qs = (QuorumServer) o; + if ((qs.id != id) || (qs.type != type)) { + return false; + } + if (!addr.equals(qs.addr)) { + return false; + } + if (!electionAddr.equals(qs.electionAddr)) { + return false; + } + return checkAddressesEqual(clientAddr, qs.clientAddr); + } + + public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException { + List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses()); + otherAddrs.add(s.clientAddr); + otherAddrs.addAll(s.electionAddr.getAllAddresses()); + otherAddrs = excludedSpecialAddresses(otherAddrs); + + for (InetSocketAddress my : this.myAddrs) { + + for (InetSocketAddress other : otherAddrs) { + if (my.equals(other)) { + String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id); + throw new BadArgumentsException(error); + } + } + } + } + + private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) { + List<InetSocketAddress> included = new ArrayList<>(); + + for (InetSocketAddress addr : addrs) { + if (addr == null) { + continue; + } + InetAddress inetaddr = addr.getAddress(); + + if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::]) + || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1) + continue; + } + included.add(addr); + } + return included; + } + + } + + public enum ServerState { + LOOKING, + FOLLOWING, + LEADING, + OBSERVING + } + + /** + * (Used for monitoring) shows the current phase of + * Zab protocol that peer is running. + */ + public enum ZabState { + ELECTION, + DISCOVERY, + SYNCHRONIZATION, + BROADCAST + } + + /** + * (Used for monitoring) When peer is in synchronization phase, this shows + * which synchronization mechanism is being used + */ + public enum SyncMode { + NONE, + DIFF, + SNAP, + TRUNC + } + + /* + * A peer can either be participating, which implies that it is willing to + * both vote in instances of consensus and to elect or become a Leader, or + * it may be observing in which case it isn't. + * + * We need this distinction to decide which ServerState to move to when + * conditions change (e.g. which state to become after LOOKING). + */ + public enum LearnerType { + PARTICIPANT, + OBSERVER + } + + /* + * To enable observers to have no identifier, we need a generic identifier + * at least for QuorumCnxManager. We use the following constant to as the + * value of such a generic identifier. + */ + + static final long OBSERVER_ID = Long.MAX_VALUE; + + /* + * Record leader election time + */ + public long start_fle, end_fle; // fle = fast leader election + public static final String FLE_TIME_UNIT = "MS"; + private long unavailableStartTime; + + /* + * Default value of peer is participant + */ + private LearnerType learnerType = LearnerType.PARTICIPANT; + + public LearnerType getLearnerType() { + return learnerType; + } + + /** + * Sets the LearnerType + */ + public void setLearnerType(LearnerType p) { + learnerType = p; + } + + protected synchronized void setConfigFileName(String s) { + configFilename = s; + } + + private String configFilename = null; + + public int getQuorumSize() { + return getVotingView().size(); + } + + public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) { + this.jvmPauseMonitor = jvmPauseMonitor; + } + + /** + * QuorumVerifier implementation; default (majority). + */ + + //last committed quorum verifier + private QuorumVerifier quorumVerifier; + + //last proposed quorum verifier + private QuorumVerifier lastSeenQuorumVerifier = null; + + // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier. + final Object QV_LOCK = new Object(); + + /** + * My id + */ + private long myid; + + /** + * get the id of this quorum peer. + */ + public long getMyId() { + return myid; + } + + // VisibleForTesting + void setId(long id) { + this.myid = id; + } + + private boolean sslQuorum; + private boolean shouldUsePortUnification; + + public boolean isSslQuorum() { + return sslQuorum; + } + + public boolean shouldUsePortUnification() { + return shouldUsePortUnification; + } + + private final QuorumX509Util x509Util; + + QuorumX509Util getX509Util() { + return x509Util; + } + + /** + * This is who I think the leader currently is. + */ + private volatile Vote currentVote; + + public synchronized Vote getCurrentVote() { + return currentVote; + } + + public synchronized void setCurrentVote(Vote v) { + currentVote = v; + } + + private volatile boolean running = true; + + private String initialConfig; + + /** + * The number of milliseconds of each tick + */ + protected int tickTime; + + /** + * Whether learners in this quorum should create new sessions as local. + * False by default to preserve existing behavior. + */ + protected boolean localSessionsEnabled = false; + + /** + * Whether learners in this quorum should upgrade local sessions to + * global. Only matters if local sessions are enabled. + */ + protected boolean localSessionsUpgradingEnabled = true; + + /** + * Minimum number of milliseconds to allow for session timeout. + * A value of -1 indicates unset, use default. + */ + protected int minSessionTimeout = -1; + + /** + * Maximum number of milliseconds to allow for session timeout. + * A value of -1 indicates unset, use default. + */ + protected int maxSessionTimeout = -1; + + /** + * The ZooKeeper server's socket backlog length. The number of connections + * that will be queued to be read before new connections are dropped. A + * value of one indicates the default backlog will be used. + */ + protected int clientPortListenBacklog = -1; + + /** + * The number of ticks that the initial synchronization phase can take + */ + protected volatile int initLimit; + + /** + * The number of ticks that can pass between sending a request and getting + * an acknowledgment + */ + protected volatile int syncLimit; + + /** + * The number of ticks that can pass before retrying to connect to learner master + */ + protected volatile int connectToLearnerMasterLimit; + + /** + * Enables/Disables sync request processor. This option is enabled + * by default and is to be used with observers. + */ + protected boolean syncEnabled = true; + + /** + * The current tick + */ + protected AtomicInteger tick = new AtomicInteger(); + + /** + * Whether or not to listen on all IPs for the two quorum ports + * (broadcast and fast leader election). + */ + protected boolean quorumListenOnAllIPs = false; + + /** + * Keeps time taken for leader election in milliseconds. Sets the value to + * this variable only after the completion of leader election. + */ + private long electionTimeTaken = -1; + + /** + * Enable/Disables quorum authentication using sasl. Defaulting to false. + */ + protected boolean quorumSaslEnableAuth; + + /** + * If this is false, quorum peer server will accept another quorum peer client + * connection even if the authentication did not succeed. This can be used while + * upgrading ZooKeeper server. Defaulting to false (required). + */ + protected boolean quorumServerSaslAuthRequired; + + /** + * If this is false, quorum peer learner will talk to quorum peer server + * without authentication. This can be used while upgrading ZooKeeper + * server. Defaulting to false (required). + */ + protected boolean quorumLearnerSaslAuthRequired; + + /** + * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'. + */ + protected String quorumServicePrincipal; + + /** + * Quorum learner login context name in jaas-conf file to read the kerberos + * security details. Defaulting to 'QuorumLearner'. + */ + protected String quorumLearnerLoginContext; + + /** + * Quorum server login context name in jaas-conf file to read the kerberos + * security details. Defaulting to 'QuorumServer'. + */ + protected String quorumServerLoginContext; + + // TODO: need to tune the default value of thread size + private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20; + /** + * The maximum number of threads to allow in the connectionExecutors thread + * pool which will be used to initiate quorum server connections. + */ + protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE; + + public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs"; + private static int quorumCnxnTimeoutMs; + + static { + quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1); + LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs); + } + + /** + * @deprecated As of release 3.4.0, this class has been deprecated, since + * it is used with one of the udp-based versions of leader election, which + * we are also deprecating. + * + * This class simply responds to requests for the current leader of this + * node. + * <p> + * The request contains just an xid generated by the requestor. + * <p> + * The response has the xid, the id of this server, the id of the leader, + * and the zxid of the leader. + * + * + */ + @Deprecated + class ResponderThread extends ZooKeeperThread { + + ResponderThread() { + super("ResponderThread"); + } + + volatile boolean running = true; + + @Override + public void run() { + try { + byte[] b = new byte[36]; + ByteBuffer responseBuffer = ByteBuffer.wrap(b); + DatagramPacket packet = new DatagramPacket(b, b.length); + while (running) { + udpSocket.receive(packet); + if (packet.getLength() != 4) { + LOG.warn("Got more than just an xid! Len = {}", packet.getLength()); + } else { + responseBuffer.clear(); + responseBuffer.getInt(); // Skip the xid + responseBuffer.putLong(myid); + Vote current = getCurrentVote(); + switch (getPeerState()) { + case LOOKING: + responseBuffer.putLong(current.getId()); + responseBuffer.putLong(current.getZxid()); + break; + case LEADING: + responseBuffer.putLong(myid); + try { + long proposed; + synchronized (leader) { + proposed = leader.lastProposed; + } + responseBuffer.putLong(proposed); + } catch (NullPointerException npe) { + // This can happen in state transitions, + // just ignore the request + } + break; + case FOLLOWING: + responseBuffer.putLong(current.getId()); + try { + responseBuffer.putLong(follower.getZxid()); + } catch (NullPointerException npe) { + // This can happen in state transitions, + // just ignore the request + } + break; + case OBSERVING: + // Do nothing, Observers keep themselves to + // themselves. + break; + } + packet.setData(b); + udpSocket.send(packet); + } + packet.setLength(b.length); + } + } catch (RuntimeException e) { + LOG.warn("Unexpected runtime exception in ResponderThread", e); + } catch (IOException e) { + LOG.warn("Unexpected IO exception in ResponderThread", e); + } finally { + LOG.warn("QuorumPeer responder thread exited"); + } + } + + } + + private ServerState state = ServerState.LOOKING; + + private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION); + private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE); + private AtomicReference<String> leaderAddress = new AtomicReference<>(""); + private AtomicLong leaderId = new AtomicLong(-1); + + private boolean reconfigFlag = false; // indicates that a reconfig just committed + + public synchronized void setPeerState(ServerState newState) { + state = newState; + if (newState == ServerState.LOOKING) { + setLeaderAddressAndId(null, -1); + setZabState(ZabState.ELECTION); + } else { + LOG.info("Peer state changed: {}", getDetailedPeerState()); + } + } + + public void setZabState(ZabState zabState) { + if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) { + long unavailableTime = Time.currentElapsedTime() - unavailableStartTime; + ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime); + if (getPeerState() == ServerState.LEADING) { + ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime); + } + unavailableStartTime = 0; + } + this.zabState.set(zabState); + LOG.info("Peer state changed: {}", getDetailedPeerState()); + } + + public void setSyncMode(SyncMode syncMode) { + this.syncMode.set(syncMode); + LOG.info("Peer state changed: {}", getDetailedPeerState()); + } + + public ZabState getZabState() { + return zabState.get(); + } + + public SyncMode getSyncMode() { + return syncMode.get(); + } + + public void setLeaderAddressAndId(MultipleAddresses addr, long newId) { + if (addr != null) { + leaderAddress.set(String.join("|", addr.getAllHostStrings())); + } else { + leaderAddress.set(null); + } + leaderId.set(newId); + } + + public String getLeaderAddress() { + return leaderAddress.get(); + } + + public long getLeaderId() { + return leaderId.get(); + } + + public String getDetailedPeerState() { + final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase()); + final ZabState zabState = getZabState(); + if (!ZabState.ELECTION.equals(zabState)) { + sb.append(" - ").append(zabState.toString().toLowerCase()); + } + final SyncMode syncMode = getSyncMode(); + if (!SyncMode.NONE.equals(syncMode)) { + sb.append(" - ").append(syncMode.toString().toLowerCase()); + } + return sb.toString(); + } + + public synchronized void reconfigFlagSet() { + reconfigFlag = true; + } + public synchronized void reconfigFlagClear() { + reconfigFlag = false; + } + public synchronized boolean isReconfigStateChange() { + return reconfigFlag; + } + public synchronized ServerState getPeerState() { + return state; + } + + DatagramSocket udpSocket; + + private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>(); + + /** + * Resolves hostname for a given server ID. + * + * This method resolves hostname for a given server ID in both quorumVerifer + * and lastSeenQuorumVerifier. If the server ID matches the local server ID, + * it also updates myAddrs. + */ + public void recreateSocketAddresses(long id) { + QuorumVerifier qv = getQuorumVerifier(); + if (qv != null) { + QuorumServer qs = qv.getAllMembers().get(id); + if (qs != null) { + qs.recreateSocketAddresses(); + if (id == getMyId()) { + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); + } + } + } + qv = getLastSeenQuorumVerifier(); + if (qv != null) { + QuorumServer qs = qv.getAllMembers().get(id); + if (qs != null) { + qs.recreateSocketAddresses(); + } + } + } + + private AddressTuple getAddrs() { + AddressTuple addrs = myAddrs.get(); + if (addrs != null) { + return addrs; + } + try { + synchronized (QV_LOCK) { + addrs = myAddrs.get(); + while (addrs == null) { + QV_LOCK.wait(); + addrs = myAddrs.get(); + } + return addrs; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + public MultipleAddresses getQuorumAddress() { + return getAddrs().quorumAddr; + } + + public MultipleAddresses getElectionAddress() { + return getAddrs().electionAddr; + } + + public InetSocketAddress getClientAddress() { + final AddressTuple addrs = myAddrs.get(); + return (addrs == null) ? null : addrs.clientAddr; + } + + private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) { + synchronized (QV_LOCK) { + myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr)); + QV_LOCK.notifyAll(); + } + } + + private int electionType; + + Election electionAlg; + + ServerCnxnFactory cnxnFactory; + ServerCnxnFactory secureCnxnFactory; + + private FileTxnSnapLog logFactory = null; + + private final QuorumStats quorumStats; + + AdminServer adminServer; + + private final boolean reconfigEnabled; + + public static QuorumPeer testingQuorumPeer() throws SaslException { + return new QuorumPeer(); + } + + public QuorumPeer() throws SaslException { + super("QuorumPeer"); + quorumStats = new QuorumStats(this); + jmxRemotePeerBean = new HashMap<>(); + adminServer = AdminServerFactory.createAdminServer(); + x509Util = createX509Util(); + initialize(); + reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); + } + + // VisibleForTesting + QuorumX509Util createX509Util() { + return new QuorumX509Util(); + } + + /** + * For backward compatibility purposes, we instantiate QuorumMaj by default. + */ + + public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException { + this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers)); + } + + public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { + this(); + this.cnxnFactory = cnxnFactory; + this.electionType = electionType; + this.myid = myid; + this.tickTime = tickTime; + this.initLimit = initLimit; + this.syncLimit = syncLimit; + this.connectToLearnerMasterLimit = connectToLearnerMasterLimit; + this.quorumListenOnAllIPs = quorumListenOnAllIPs; + this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); + this.zkDb = new ZKDatabase(this.logFactory); + if (quorumConfig == null) { + quorumConfig = new QuorumMaj(quorumPeers); + } + setQuorumVerifier(quorumConfig, false); + adminServer = AdminServerFactory.createAdminServer(); + } + + public void initialize() throws SaslException { + // init quorum auth server & learner + if (isQuorumSaslAuthEnabled()) { + Set<String> authzHosts = new HashSet<>(); + for (QuorumServer qs : getView().values()) { + authzHosts.add(qs.hostname); + } + authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts); + authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext); + } else { + authServer = new NullQuorumAuthServer(); + authLearner = new NullQuorumAuthLearner(); + } + } + + QuorumStats quorumStats() { + return quorumStats; + } + + @Override + public synchronized void start() { + if (!getView().containsKey(myid)) { + throw new RuntimeException("My id " + myid + " not in the peer list"); + } + loadDataBase(); + startServerCnxnFactory(); + try { + adminServer.start(); + } catch (AdminServerException e) { + LOG.warn("Problem starting AdminServer", e); + } + startLeaderElection(); + startJvmPauseMonitor(); + super.start(); + } + + private void loadDataBase() { + try { + zkDb.loadDataBase(); + + // load the epochs + long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; + long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); + try { + currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); + } catch (FileNotFoundException e) { + // pick a reasonable epoch number + // this should only happen once when moving to a + // new code version + currentEpoch = epochOfZxid; + LOG.info( + "{} not found! Creating with a reasonable default of {}. " + + "This should only happen when you are upgrading your installation", + CURRENT_EPOCH_FILENAME, + currentEpoch); + writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); + } + if (epochOfZxid > currentEpoch) { + // acceptedEpoch.tmp file in snapshot directory + File currentTmp = new File(getTxnFactory().getSnapDir(), + CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION); + if (currentTmp.exists()) { + long epochOfTmp = readLongFromFile(currentTmp.getName()); + LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp); + setCurrentEpoch(epochOfTmp); + } else { + throw new IOException( + "The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + + ", is older than the last zxid, " + lastProcessedZxid); + } + } + try { + acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); + } catch (FileNotFoundException e) { + // pick a reasonable epoch number + // this should only happen once when moving to a + // new code version + acceptedEpoch = epochOfZxid; + LOG.info( + "{} not found! Creating with a reasonable default of {}. " + + "This should only happen when you are upgrading your installation", + ACCEPTED_EPOCH_FILENAME, + acceptedEpoch); + writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); + } + if (acceptedEpoch < currentEpoch) { + throw new IOException("The accepted epoch, " + + ZxidUtils.zxidToString(acceptedEpoch) + + " is less than the current epoch, " + + ZxidUtils.zxidToString(currentEpoch)); + } + } catch (IOException ie) { + LOG.error("Unable to load database on disk", ie); + throw new RuntimeException("Unable to run quorum server ", ie); + } + } + + ResponderThread responder; + + public synchronized void stopLeaderElection() { + responder.running = false; + responder.interrupt(); + } + public synchronized void startLeaderElection() { + try { + if (getPeerState() == ServerState.LOOKING) { + currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); + } + } catch (IOException e) { + RuntimeException re = new RuntimeException(e.getMessage()); + re.setStackTrace(e.getStackTrace()); + throw re; + } + + this.electionAlg = createElectionAlgorithm(electionType); + } + + private void startJvmPauseMonitor() { + if (this.jvmPauseMonitor != null) { + this.jvmPauseMonitor.serviceStart(); + } + } + + /** + * Count the number of nodes in the map that could be followers. + * @param peers + * @return The number of followers in the map + */ + protected static int countParticipants(Map<Long, QuorumServer> peers) { + int count = 0; + for (QuorumServer q : peers.values()) { + if (q.type == LearnerType.PARTICIPANT) { + count++; + } + } + return count; + } + + /** + * This constructor is only used by the existing unit test code. + * It defaults to FileLogProvider persistence provider. + */ + public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException { + this( + quorumPeers, + snapDir, + logDir, + electionAlg, + myid, + tickTime, + initLimit, + syncLimit, + connectToLearnerMasterLimit, + false, + ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), + new QuorumMaj(quorumPeers)); + } + + public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException { + this( + quorumPeers, + snapDir, + logDir, + electionAlg, + myid, + tickTime, + initLimit, + syncLimit, + connectToLearnerMasterLimit, + false, + ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), + new QuorumOracleMaj(quorumPeers, oraclePath)); + } + + /** + * This constructor is only used by the existing unit test code. + * It defaults to FileLogProvider persistence provider. + */ + public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException { + this( + quorumPeers, + snapDir, + logDir, + electionAlg, + myid, + tickTime, + initLimit, + syncLimit, + connectToLearnerMasterLimit, + false, + ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), + quorumConfig); + } + + private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException { + QuorumServer quorumServer = quorumPeers.get(myid); + if (null == quorumServer) { + throw new IOException("No QuorumServer correspoding to myid " + myid); + } + if (null == quorumServer.clientAddr) { + return new InetSocketAddress(clientPort); + } + if (quorumServer.clientAddr.getPort() != clientPort) { + throw new IOException("QuorumServer port " + + quorumServer.clientAddr.getPort() + + " does not match with given port " + + clientPort); + } + return quorumServer.clientAddr; + } + + /** + * returns the highest zxid that this host has seen + * + * @return the highest zxid for this host + */ + public long getLastLoggedZxid() { + if (!zkDb.isInitialized()) { + loadDataBase(); + } + return zkDb.getDataTreeLastProcessedZxid(); + } + + public Follower follower; + public Leader leader; + public Observer observer; + + protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { + return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb)); + } + + protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { + return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb)); + } + + protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException { + return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb)); + } + + @SuppressWarnings("deprecation") + protected Election createElectionAlgorithm(int electionAlgorithm) { + Election le = null; + + //TODO: use a factory rather than a switch + switch (electionAlgorithm) { + case 1: + throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); + case 2: + throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); + case 3: + QuorumCnxManager qcm = createCnxnManager(); + QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); + if (oldQcm != null) { + LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); + oldQcm.halt(); + } + QuorumCnxManager.Listener listener = qcm.listener; + if (listener != null) { + listener.start(); + FastLeaderElection fle = new FastLeaderElection(this, qcm); + fle.start(); + le = fle; + } else { + LOG.error("Null listener when initializing cnx manager"); + } + break; + default: + assert false; + } + return le; + } + + @SuppressWarnings("deprecation") + protected Election makeLEStrategy() { + LOG.debug("Initializing leader election protocol..."); + return electionAlg; + } + + protected synchronized void setLeader(Leader newLeader) { + leader = newLeader; + } + + protected synchronized void setFollower(Follower newFollower) { + follower = newFollower; + } + + protected synchronized void setObserver(Observer newObserver) { + observer = newObserver; + } + + public synchronized ZooKeeperServer getActiveServer() { + if (leader != null) { + return leader.zk; + } else if (follower != null) { + return follower.zk; + } else if (observer != null) { + return observer.zk; + } + return null; + } + + boolean shuttingDownLE = false; + + public void setSuspended(boolean suspended) { + this.suspended.set(suspended); + } + private void checkSuspended() { + try { + while (suspended.get()) { + Thread.sleep(10); + } + } catch (InterruptedException err) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + updateThreadName(); + + LOG.debug("Starting quorum peer"); + try { + jmxQuorumBean = new QuorumBean(this); + MBeanRegistry.getInstance().register(jmxQuorumBean, null); + for (QuorumServer s : getView().values()) { + ZKMBeanInfo p; + if (getMyId() == s.id) { + p = jmxLocalPeerBean = new LocalPeerBean(this); + try { + MBeanRegistry.getInstance().register(p, jmxQuorumBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxLocalPeerBean = null; + } + } else { + RemotePeerBean rBean = new RemotePeerBean(this, s); + try { + MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); + jmxRemotePeerBean.put(s.id, rBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + } + } + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxQuorumBean = null; + } + + try { + /* + * Main loop + */ + while (running) { + if (unavailableStartTime == 0) { + unavailableStartTime = Time.currentElapsedTime(); + } + + switch (getPeerState()) { + case LOOKING: + LOG.info("LOOKING"); + ServerMetrics.getMetrics().LOOKING_COUNT.add(1); + + if (Boolean.getBoolean("readonlymode.enabled")) { + LOG.info("Attempting to start ReadOnlyZooKeeperServer"); + + // Create read-only server but don't start it immediately + final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); + + // Instead of starting roZk immediately, wait some grace + // period before we decide we're partitioned. + // + // Thread is used here because otherwise it would require + // changes in each of election strategy classes which is + // unnecessary code coupling. + Thread roZkMgr = new Thread() { + public void run() { + try { + // lower-bound grace period to 2 secs + sleep(Math.max(2000, tickTime)); + if (ServerState.LOOKING.equals(getPeerState())) { + roZk.startup(); + } + } catch (InterruptedException e) { + LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); + } catch (Exception e) { + LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); + } + } + }; + try { + roZkMgr.start(); + reconfigFlagClear(); + if (shuttingDownLE) { + shuttingDownLE = false; + startLeaderElection(); + } + setCurrentVote(makeLEStrategy().lookForLeader()); + checkSuspended(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + setPeerState(ServerState.LOOKING); + } finally { + // If the thread is in the the grace period, interrupt + // to come out of waiting. + roZkMgr.interrupt(); + roZk.shutdown(); + } + } else { + try { + reconfigFlagClear(); + if (shuttingDownLE) { + shuttingDownLE = false; + startLeaderElection(); + } + setCurrentVote(makeLEStrategy().lookForLeader()); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + setPeerState(ServerState.LOOKING); + } + } + break; + case OBSERVING: + try { + LOG.info("OBSERVING"); + setObserver(makeObserver(logFactory)); + observer.observeLeader(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + observer.shutdown(); + setObserver(null); + updateServerState(); + + // Add delay jitter before we switch to LOOKING + // state to reduce the load of ObserverMaster + if (isRunning()) { + Observer.waitForObserverElectionDelay(); + } + } + break; + case FOLLOWING: + try { + LOG.info("FOLLOWING"); + setFollower(makeFollower(logFactory)); + follower.followLeader(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + follower.shutdown(); + setFollower(null); + updateServerState(); + } + break; + case LEADING: + LOG.info("LEADING"); + try { + setLeader(makeLeader(logFactory)); + leader.lead(); + setLeader(null); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + if (leader != null) { + leader.shutdown("Forcing shutdown"); + setLeader(null); + } + updateServerState(); + } + break; + } + } + } finally { + LOG.warn("QuorumPeer main thread exited"); + MBeanRegistry instance = MBeanRegistry.getInstance(); + instance.unregister(jmxQuorumBean); + instance.unregister(jmxLocalPeerBean); + + for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { + instance.unregister(remotePeerBean); + } + + jmxQuorumBean = null; + jmxLocalPeerBean = null; + jmxRemotePeerBean = null; + } + } + + private synchronized void updateServerState() { + if (!reconfigFlag) { + setPeerState(ServerState.LOOKING); + LOG.warn("PeerState set to LOOKING"); + return; + } + + if (getMyId() == getCurrentVote().getId()) { + setPeerState(ServerState.LEADING); + LOG.debug("PeerState set to LEADING"); + } else if (getLearnerType() == LearnerType.PARTICIPANT) { + setPeerState(ServerState.FOLLOWING); + LOG.debug("PeerState set to FOLLOWING"); + } else if (getLearnerType() == LearnerType.OBSERVER) { + setPeerState(ServerState.OBSERVING); + LOG.debug("PeerState set to OBSERVER"); + } else { // currently shouldn't happen since there are only 2 learner types + setPeerState(ServerState.LOOKING); + LOG.debug("Should not be here"); + } + reconfigFlag = false; + } + + public void shutdown() { + running = false; + x509Util.close(); + if (leader != null) { + leader.shutdown("quorum Peer shutdown"); + } + if (follower != null) { + follower.shutdown(); + } + shutdownServerCnxnFactory(); + if (udpSocket != null) { + udpSocket.close(); + } + if (jvmPauseMonitor != null) { + jvmPauseMonitor.serviceStop(); + } + + try { + adminServer.shutdown(); + } catch (AdminServerException e) { + LOG.warn("Problem stopping AdminServer", e); + } + + if (getElectionAlg() != null) { + this.interrupt(); + getElectionAlg().shutdown(); + } + try { + zkDb.close(); + } catch (IOException ie) { + LOG.warn("Error closing logs ", ie); + } + } + + /** + * A 'view' is a node's current opinion of the membership of the entire + * ensemble. + */ + public Map<Long, QuorumPeer.QuorumServer> getView() { + return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers()); + } + + /** + * Observers are not contained in this view, only nodes with + * PeerType=PARTICIPANT. + */ + public Map<Long, QuorumPeer.QuorumServer> getVotingView() { + return getQuorumVerifier().getVotingMembers(); + } + + /** + * Returns only observers, no followers. + */ + public Map<Long, QuorumPeer.QuorumServer> getObservingView() { + return getQuorumVerifier().getObservingMembers(); + } + + public synchronized Set<Long> getCurrentAndNextConfigVoters() { + Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet()); + if (getLastSeenQuorumVerifier() != null) { + voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet()); + } + return voterIds; + } + + /** + * Check if a node is in the current view. With static membership, the + * result of this check will never change; only when dynamic membership + * is introduced will this be more useful. + */ + public boolean viewContains(Long sid) { + return this.getView().containsKey(sid); + } + + /** + * Only used by QuorumStats at the moment + */ + public String[] getQuorumPeers() { + List<String> l = new ArrayList<>(); + synchronized (this) { + if (leader != null) { + for (LearnerHandler fh : leader.getLearners()) { + if (fh.getSocket() != null) { + String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress()); + if (leader.isLearnerSynced(fh)) { + s += "*"; + } + l.add(s); + } + } + } else if (follower != null) { + l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress())); + } + } + return l.toArray(new String[0]); + } + + public String getServerState() { + switch (getPeerState()) { + case LOOKING: + return QuorumStats.Provider.LOOKING_STATE; + case LEADING: + return QuorumStats.Provider.LEADING_STATE; + case FOLLOWING: + return QuorumStats.Provider.FOLLOWING_STATE; + case OBSERVING: + return QuorumStats.Provider.OBSERVING_STATE; + } + return QuorumStats.Provider.UNKNOWN_STATE; + } + + /** + * set the id of this quorum peer. + */ + public void setMyid(long myid) { + this.myid = myid; + } + + public void setInitialConfig(String initialConfig) { + this.initialConfig = initialConfig; + } + + public String getInitialConfig() { + return initialConfig; + } + + /** + * Get the number of milliseconds of each tick + */ + public int getTickTime() { + return tickTime; + } + + /** + * Set the number of milliseconds of each tick + */ + public void setTickTime(int tickTime) { + LOG.info("tickTime set to {}", tickTime); + this.tickTime = tickTime; + } + + /** Maximum number of connections allowed from particular host (ip) */ + public int getMaxClientCnxnsPerHost() { + if (cnxnFactory != null) { + return cnxnFactory.getMaxClientCnxnsPerHost(); + } + if (secureCnxnFactory != null) { + return secureCnxnFactory.getMaxClientCnxnsPerHost(); + } + return -1; + } + + /** Whether local sessions are enabled */ + public boolean areLocalSessionsEnabled() { + return localSessionsEnabled; + } + + /** Whether to enable local sessions */ + public void enableLocalSessions(boolean flag) { + LOG.info("Local sessions {}", (flag ? "enabled" : "disabled")); + localSessionsEnabled = flag; + } + + /** Whether local sessions are allowed to upgrade to global sessions */ + public boolean isLocalSessionsUpgradingEnabled() { + return localSessionsUpgradingEnabled; + } + + /** Whether to allow local sessions to upgrade to global sessions */ + public void enableLocalSessionsUpgrading(boolean flag) { + LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled")); + localSessionsUpgradingEnabled = flag; + } + + /** minimum session timeout in milliseconds */ + public int getMinSessionTimeout() { + return minSessionTimeout; + } + + /** minimum session timeout in milliseconds */ + public void setMinSessionTimeout(int min) { + LOG.info("minSessionTimeout set to {}", min); + this.minSessionTimeout = min; + } + + /** maximum session timeout in milliseconds */ + public int getMaxSessionTimeout() { + return maxSessionTimeout; + } + + /** maximum session timeout in milliseconds */ + public void setMaxSessionTimeout(int max) { + LOG.info("maxSessionTimeout set to {}", max); + this.maxSessionTimeout = max; + } + + /** The server socket's listen backlog length */ + public int getClientPortListenBacklog() { + return this.clientPortListenBacklog; + } + + /** Sets the server socket's listen backlog length. */ + public void setClientPortListenBacklog(int backlog) { + this.clientPortListenBacklog = backlog; + } + + /** + * Get the number of ticks that the initial synchronization phase can take + */ + public int getInitLimit() { + return initLimit; + } + + /** + * Set the number of ticks that the initial synchronization phase can take + */ + public void setInitLimit(int initLimit) { + LOG.info("initLimit set to {}", initLimit); + this.initLimit = initLimit; + } + + /** + * Get the current tick + */ + public int getTick() { + return tick.get(); + } + + public QuorumVerifier configFromString(String s) throws IOException, ConfigException { + Properties props = new Properties(); + props.load(new StringReader(s)); + return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath()); + } + + /** + * Return QuorumVerifier object for the last committed configuration. + */ + public QuorumVerifier getQuorumVerifier() { + synchronized (QV_LOCK) { + return quorumVerifier; + } + } + + /** + * Return QuorumVerifier object for the last proposed configuration. + */ + public QuorumVerifier getLastSeenQuorumVerifier() { + synchronized (QV_LOCK) { + return lastSeenQuorumVerifier; + } + } + + public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) { + if (qvOLD == null || !qvOLD.equals(qvNEW)) { + LOG.warn("Restarting Leader Election"); + getElectionAlg().shutdown(); + shuttingDownLE = false; + startLeaderElection(); + } + } + + public String getNextDynamicConfigFilename() { + if (configFilename == null) { + LOG.warn("configFilename is null! This should only happen in tests."); + return null; + } + return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix; + } + + // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK + // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from + // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take + // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken + // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne(). + private void connectNewPeers(QuorumCnxManager qcm) { + if (quorumVerifier != null && lastSeenQuorumVerifier != null) { + Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers(); + for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { + if (e.getKey() != getMyId() && !committedView.containsKey(e.getKey())) { + qcm.connectOne(e.getKey()); + } + } + } + } + + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { + if (!isReconfigEnabled()) { + LOG.info("Dynamic reconfig is disabled, we don't store the last seen config."); + return; + } + + // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm + // and then take QV_LOCK. Take the locks in the same order to ensure that we don't + // deadlock against other callers of connectOne(). If qcmRef gets set in another + // thread while we're inside the synchronized block, that does no harm; if we didn't + // take a lock on qcm (because it was null when we sampled it), we won't call + // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility + // of updates that provably happen in another thread before entering this method.) + QuorumCnxManager qcm = qcmRef.get(); + Object outerLockObject = (qcm != null) ? qcm : QV_LOCK; + synchronized (outerLockObject) { + synchronized (QV_LOCK) { + if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { + LOG.error("setLastSeenQuorumVerifier called with stale config " + + qv.getVersion() + + ". Current version: " + + quorumVerifier.getVersion()); + } + // assuming that a version uniquely identifies a configuration, so if + // version is the same, nothing to do here. + if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { + return; + } + lastSeenQuorumVerifier = qv; + if (qcm != null) { + connectNewPeers(qcm); + } + + if (writeToDisk) { + try { + String fileName = getNextDynamicConfigFilename(); + if (fileName != null) { + QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); + } + } catch (IOException e) { + LOG.error("Error writing next dynamic config file to disk", e); + } + } + } + } + } + + public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { + synchronized (QV_LOCK) { + if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) { + // this is normal. For example - server found out about new config through FastLeaderElection gossiping + // and then got the same config in UPTODATE message so its already known + LOG.debug( + "{} setQuorumVerifier called with known or old config {}. Current version: {}", + getMyId(), + qv.getVersion(), + quorumVerifier.getVersion()); + return quorumVerifier; + } + QuorumVerifier prevQV = quorumVerifier; + quorumVerifier = qv; + if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) { + lastSeenQuorumVerifier = qv; + } + + if (writeToDisk) { + // some tests initialize QuorumPeer without a static config file + if (configFilename != null) { + try { + String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion()); + QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false); + QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig()); + } catch (IOException e) { + LOG.error("Error closing file", e); + } + } else { + LOG.info("writeToDisk == true but configFilename == null"); + } + } + + if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) { + QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename()); + } + QuorumServer qs = qv.getAllMembers().get(getMyId()); + if (qs != null) { + setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); + } + updateObserverMasterList(); + return prevQV; + } + } + + private String makeDynamicConfigFilename(long version) { + return configFilename + ".dynamic." + Long.toHexString(version); + } + + private boolean needEraseClientInfoFromStaticConfig() { + QuorumServer server = quorumVerifier.getAllMembers().get(getMyId()); + return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic); + } + + /** + * Get an instance of LeaderElection + */ + public Election getElectionAlg() { + return electionAlg; + } + + /** + * Get the synclimit + */ + public int getSyncLimit() { + return syncLimit; + } + + /** + * Set the synclimit + */ + public void setSyncLimit(int syncLimit) { + LOG.info("syncLimit set to {}", syncLimit); + this.syncLimit = syncLimit; + } + + /** + * Get the connectToLearnerMasterLimit + */ + public int getConnectToLearnerMasterLimit() { + return connectToLearnerMasterLimit; + } + + /** + * Set the connectToLearnerMasterLimit + */ + public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) { + LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit); + this.connectToLearnerMasterLimit = connectToLearnerMasterLimit; + } + + /** + * The syncEnabled can also be set via a system property. + */ + public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled"; + + /** + * Return syncEnabled. + */ + public boolean getSyncEnabled() { + if (System.getProperty(SYNC_ENABLED) != null) { + LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED)); + return Boolean.getBoolean(SYNC_ENABLED); + } else { + return syncEnabled; + } + } + + /** + * Set syncEnabled. + * + * @param syncEnabled + */ + public void setSyncEnabled(boolean syncEnabled) { + this.syncEnabled = syncEnabled; + } + + /** + * Gets the election type + */ + public int getElectionType() { + return electionType; + } + + /** + * Sets the election type + */ + public void setElectionType(int electionType) { + this.electionType = electionType; + } + + public boolean getQuorumListenOnAllIPs() { + return quorumListenOnAllIPs; + } + + public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) { + this.quorumListenOnAllIPs = quorumListenOnAllIPs; + } + + public void setCnxnFactory(ServerCnxnFactory cnxnFactory) { + this.cnxnFactory = cnxnFactory; + } + + public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) { + this.secureCnxnFactory = secureCnxnFactory; + } + + public void setSslQuorum(boolean sslQuorum) { + if (sslQuorum) { + LOG.info("Using TLS encrypted quorum communication"); + } else { + LOG.info("Using insecure (non-TLS) quorum communication"); + } + this.sslQuorum = sslQuorum; + } + + public void setUsePortUnification(boolean shouldUsePortUnification) { + LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled"); + this.shouldUsePortUnification = shouldUsePortUnification; + } + + private void startServerCnxnFactory() { + if (cnxnFactory != null) { + cnxnFactory.start(); + } + if (secureCnxnFactory != null) { + secureCnxnFactory.start(); + } + } + + private void shutdownServerCnxnFactory() { + if (cnxnFactory != null) { + cnxnFactory.shutdown(); + } + if (secureCnxnFactory != null) { + secureCnxnFactory.shutdown(); + } + } + + // Leader and learner will control the zookeeper server and pass it into QuorumPeer. + public void setZooKeeperServer(ZooKeeperServer zks) { + if (cnxnFactory != null) { + cnxnFactory.setZooKeeperServer(zks); + } + if (secureCnxnFactory != null) { + secureCnxnFactory.setZooKeeperServer(zks); + } + } + + public void closeAllConnections() { + if (cnxnFactory != null) { + cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); + } + if (secureCnxnFactory != null) { + secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); + } + } + + public int getClientPort() { + if (cnxnFactory != null) { + return cnxnFactory.getLocalPort(); + } + return -1; + } + + public int getSecureClientPort() { + if (secureCnxnFactory != null) { + return secureCnxnFactory.getLocalPort(); + } + return -1; + } + + public void setTxnFactory(FileTxnSnapLog factory) { + this.logFactory = factory; + } + + public FileTxnSnapLog getTxnFactory() { + return this.logFactory; + } + + /** + * set zk database for this node + * @param database + */ + public void setZKDatabase(ZKDatabase database) { + this.zkDb = database; + } + + protected ZKDatabase getZkDb() { + return zkDb; + } + + public synchronized void initConfigInZKDatabase() { + if (zkDb != null) { + zkDb.initConfigInZKDatabase(getQuorumVerifier()); + } + } + + public boolean isRunning() { + return running; + } + + /** + * get reference to QuorumCnxManager + */ + public QuorumCnxManager getQuorumCnxManager() { + return qcmRef.get(); + } + private long readLongFromFile(String name) throws IOException { + File file = new File(logFactory.getSnapDir(), name); + BufferedReader br = new BufferedReader(new FileReader(file)); + String line = ""; + try { + line = br.readLine(); + return Long.parseLong(line); + } catch (NumberFormatException e) { + throw new IOException("Found " + line + " in " + file); + } finally { + br.close(); + } + } + + private long acceptedEpoch = -1; + private long currentEpoch = -1; + + public static final String CURRENT_EPOCH_FILENAME = "currentEpoch"; + + public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch"; + + /** + * Write a long value to disk atomically. Either succeeds or an exception + * is thrown. + * @param name file name to write the long to + * @param value the long value to write to the named file + * @throws IOException if the file cannot be written atomically + */ + // visibleForTest + void writeLongToFile(String name, final long value) throws IOException { + File file = new File(logFactory.getSnapDir(), name); + new AtomicFileWritingIdiom(file, new WriterStatement() { + @Override + public void write(Writer bw) throws IOException { + bw.write(Long.toString(value)); + } + }); + } + + public long getCurrentEpoch() throws IOException { + if (currentEpoch == -1) { + currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); + } + return currentEpoch; + } + + public long getAcceptedEpoch() throws IOException { + if (acceptedEpoch == -1) { + acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); + } + return acceptedEpoch; + } + + public void setCurrentEpoch(long e) throws IOException { + writeLongToFile(CURRENT_EPOCH_FILENAME, e); + currentEpoch = e; + } + + public void setAcceptedEpoch(long e) throws IOException { + writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); + acceptedEpoch = e; + } + + public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) { + if (!isReconfigEnabled()) { + LOG.debug("Reconfig feature is disabled, skip reconfig processing."); + return false; + } + + InetSocketAddress oldClientAddr = getClientAddress(); + + // update last committed quorum verifier, write the new config to disk + // and restart leader election if config changed. + QuorumVerifier prevQV = setQuorumVerifier(qv, true); + + // There is no log record for the initial config, thus after syncing + // with leader + // /zookeeper/config is empty! it is also possible that last committed + // config is propagated during leader election + // without the propagation the corresponding log records. + // so we should explicitly do this (this is not necessary when we're + // already a Follower/Observer, only + // for Learner): + initConfigInZKDatabase(); + + if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) { + Map<Long, QuorumServer> newMembers = qv.getAllMembers(); + updateRemotePeerMXBeans(newMembers); + if (restartLE) { + restartLeaderElection(prevQV, qv); + } + + QuorumServer myNewQS = newMembers.get(getMyId()); + if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) { + cnxnFactory.reconfigure(myNewQS.clientAddr); + updateThreadName(); + } + + boolean roleChange = updateLearnerType(qv); + boolean leaderChange = false; + if (suggestedLeaderId != null) { + // zxid should be non-null too + leaderChange = updateVote(suggestedLeaderId, zxid); + } else { + long currentLeaderId = getCurrentVote().getId(); + QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId); + QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId); + leaderChange = (myleaderInCurQV == null + || myleaderInCurQV.addr == null + || myleaderInNewQV == null + || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr)); + // we don't have a designated leader - need to go into leader + // election + reconfigFlagClear(); + } + + return roleChange || leaderChange; + } + return false; + + } + + private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) { + Set<Long> existingMembers = new HashSet<>(newMembers.keySet()); + existingMembers.retainAll(jmxRemotePeerBean.keySet()); + for (Long id : existingMembers) { + RemotePeerBean rBean = jmxRemotePeerBean.get(id); + rBean.setQuorumServer(newMembers.get(id)); + } + + Set<Long> joiningMembers = new HashSet<>(newMembers.keySet()); + joiningMembers.removeAll(jmxRemotePeerBean.keySet()); + joiningMembers.remove(getMyId()); // remove self as it is local bean + for (Long id : joiningMembers) { + QuorumServer qs = newMembers.get(id); + RemotePeerBean rBean = new RemotePeerBean(this, qs); + try { + MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); + jmxRemotePeerBean.put(qs.id, rBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + } + + Set<Long> leavingMembers = new HashSet<>(jmxRemotePeerBean.keySet()); + leavingMembers.removeAll(newMembers.keySet()); + for (Long id : leavingMembers) { + RemotePeerBean rBean = jmxRemotePeerBean.remove(id); + try { + MBeanRegistry.getInstance().unregister(rBean); + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + } + } + + private ArrayList<QuorumServer> observerMasters = new ArrayList<>(); + private void updateObserverMasterList() { + if (observerMasterPort <= 0) { + return; // observer masters not enabled + } + observerMasters.clear(); + StringBuilder sb = new StringBuilder(); + for (QuorumServer server : quorumVerifier.getVotingMembers().values()) { + InetAddress address = server.addr.getReachableOrOne().getAddress(); + InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort); + observerMasters.add(new QuorumServer(server.id, addr)); + sb.append(addr).append(","); + } + LOG.info("Updated learner master list to be {}", sb.toString()); + Collections.shuffle(observerMasters); + // Reset the internal index of the observerMaster when + // the observerMaster List is refreshed + nextObserverMaster = 0; + } + + private boolean useObserverMasters() { + return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0; + } + + private int nextObserverMaster = 0; + private QuorumServer nextObserverMaster() { + if (nextObserverMaster >= observerMasters.size()) { + nextObserverMaster = 0; + // Add a reconnect delay only after the observer + // has exhausted trying to connect to all the masters + // from the observerMasterList + if (isRunning()) { + Observer.waitForReconnectDelay(); + } + } + return observerMasters.get(nextObserverMaster++); + } + + QuorumServer findLearnerMaster(QuorumServer leader) { + if (useObserverMasters()) { + return nextObserverMaster(); + } else { + // Add delay jitter to reduce the load on the leader + if (isRunning()) { + Observer.waitForReconnectDelay(); + } + return leader; + } + } + + /** + * Vet a given learner master's information. + * Allows specification by server id, ip only, or ip and port + */ + QuorumServer validateLearnerMaster(String desiredMaster) { + if (useObserverMasters()) { + Long sid; + try { + sid = Long.parseLong(desiredMaster); + } catch (NumberFormatException e) { + sid = null; + } + for (QuorumServer server : observerMasters) { + if (sid == null) { + for (InetSocketAddress address : server.addr.getAllAddresses()) { + String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort(); + if (serverAddr.startsWith(desiredMaster)) { + return server; + } + } + } else { + if (sid.equals(server.id)) { + return server; + } + } + } + if (sid == null) { + LOG.info("could not find learner master address={}", desiredMaster); + } else { + LOG.warn("could not find learner master sid={}", sid); + } + } else { + LOG.info("cannot validate request, observer masters not enabled"); + } + return null; + } + + private boolean updateLearnerType(QuorumVerifier newQV) { + //check if I'm an observer in new config + if (newQV.getObservingMembers().containsKey(getMyId())) { + if (getLearnerType() != LearnerType.OBSERVER) { + setLearnerType(LearnerType.OBSERVER); + LOG.info("Becoming an observer"); + reconfigFlagSet(); + return true; + } else { + return false; + } + } else if (newQV.getVotingMembers().containsKey(getMyId())) { + if (getLearnerType() != LearnerType.PARTICIPANT) { + setLearnerType(LearnerType.PARTICIPANT); + LOG.info("Becoming a voting participant"); + reconfigFlagSet(); + return true; + } else { + return false; + } + } + // I'm not in the view + if (getLearnerType() != LearnerType.PARTICIPANT) { + setLearnerType(LearnerType.PARTICIPANT); + LOG.info("Becoming a non-voting participant"); + reconfigFlagSet(); + return true; + } + return false; + } + + private boolean updateVote(long designatedLeader, long zxid) { + Vote currentVote = getCurrentVote(); + if (currentVote != null && designatedLeader != currentVote.getId()) { + setCurrentVote(new Vote(designatedLeader, zxid)); + reconfigFlagSet(); + LOG.warn("Suggested leader: {}", designatedLeader); + return true; + } + return false; + } + + /** + * Updates leader election info to avoid inconsistencies when + * a new server tries to join the ensemble. + * + * Here is the inconsistency scenario we try to solve by updating the peer + * epoch after following leader: + * + * Let's say we have an ensemble with 3 servers z1, z2 and z3. + * + * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is + * 0xb9, aka current accepted epoch on disk. + * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading + * the current accept epoch from disk. + * 3. z2 received notification from z1 and z3, which is following z3 with + * epoch 0xb8, so it started following z3 again with peer epoch 0xb8. + * 4. before z2 successfully connected to z3, z3 get restarted with new + * epoch 0xb9. + * 5. z2 will retry around a few round (default 5s) before giving up, + * meanwhile it will report z3 as leader. + * 6. z1 restarted, and looking with peer epoch 0xb9. + * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9. + * 8. z2 successfully connected to z3 before giving up, but with peer + * epoch 0xb8. + * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot + * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting + * 0xb9. + * + * By updating the election vote after actually following leader, we can + * avoid this kind of stuck happened. + * + * Btw, the zxid and electionEpoch could be inconsistent because of the same + * reason, it's better to update these as well after syncing with leader, but + * that required protocol change which is non trivial. This problem is worked + * around by skipping comparing the zxid and electionEpoch when counting for + * votes for out of election servers during looking for leader. + * + * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732 + */ + protected void updateElectionVote(long newEpoch) { + Vote currentVote = getCurrentVote(); + if (currentVote != null) { + setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote + .getState())); + } + } + + private void updateThreadName() { + String plain = cnxnFactory != null + ? cnxnFactory.getLocalAddress() != null + ? formatInetAddr(cnxnFactory.getLocalAddress()) + : "disabled" + : "disabled"; + String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled"; + setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getMyId(), plain, secure)); + } + + /** + * Sets the time taken for leader election in milliseconds. + * + * @param electionTimeTaken time taken for leader election + */ + void setElectionTimeTaken(long electionTimeTaken) { + this.electionTimeTaken = electionTimeTaken; + } + + /** + * @return the time taken for leader election in milliseconds. + */ + long getElectionTimeTaken() { + return electionTimeTaken; + } + + void setQuorumServerSaslRequired(boolean serverSaslRequired) { + quorumServerSaslAuthRequired = serverSaslRequired; + LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired); + } + + void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) { + quorumLearnerSaslAuthRequired = learnerSaslRequired; + LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired); + } + + void setQuorumSaslEnabled(boolean enableAuth) { + quorumSaslEnableAuth = enableAuth; + if (!quorumSaslEnableAuth) { + LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)"); + } else { + LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth); + } + } + + void setQuorumServicePrincipal(String servicePrincipal) { + quorumServicePrincipal = servicePrincipal; + LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal); + } + + void setQuorumLearnerLoginContext(String learnerContext) { + quorumLearnerLoginContext = learnerContext; + LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext); + } + + void setQuorumServerLoginContext(String serverContext) { + quorumServerLoginContext = serverContext; + LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext); + } + + void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) { + if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) { + quorumCnxnThreadsSize = qCnxnThreadsSize; + } + LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize); + } + + boolean isQuorumSaslAuthEnabled() { + return quorumSaslEnableAuth; + } + + private boolean isQuorumServerSaslAuthRequired() { + return quorumServerSaslAuthRequired; + } + + private boolean isQuorumLearnerSaslAuthRequired() { + return quorumLearnerSaslAuthRequired; + } + + public QuorumCnxManager createCnxnManager() { + int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit; + LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout); + return new QuorumCnxManager( + this, + this.getMyId(), + this.getView(), + this.authServer, + this.authLearner, + timeout, + this.getQuorumListenOnAllIPs(), + this.quorumCnxnThreadsSize, + this.isQuorumSaslAuthEnabled()); + } + + boolean isLeader(long id) { + Vote vote = getCurrentVote(); + return vote != null && id == vote.getId(); + } + + public boolean isReconfigEnabled() { + return reconfigEnabled; + } + + @InterfaceAudience.Private + /** + * This is a metric that depends on the status of the peer. + */ public Integer getSynced_observers_metric() { + if (leader != null) { + return leader.getObservingLearners().size(); + } else if (follower != null) { + return follower.getSyncedObserverSize(); + } else { + return null; + } + } + + /** + * Create a new QuorumPeer and apply all the values per the already-parsed config. + * + * @param config The appertained quorum peer config. + * @return A QuorumPeer instantiated with specified peer config. Note this peer + * is not fully initialized; caller should finish initialization through + * additional configurations (connection factory settings, etc). + * + * @throws IOException + */ + public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException { + QuorumPeer quorumPeer = new QuorumPeer(); + quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); + quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); + quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); + quorumPeer.setElectionType(config.getElectionAlg()); + quorumPeer.setMyid(config.getServerId()); + quorumPeer.setTickTime(config.getTickTime()); + quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); + quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); + quorumPeer.setInitLimit(config.getInitLimit()); + quorumPeer.setSyncLimit(config.getSyncLimit()); + quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); + quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); + quorumPeer.setConfigFileName(config.getConfigFilename()); + quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); + quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); + quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); + if (config.getLastSeenQuorumVerifier() != null) { + quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); + } + quorumPeer.initConfigInZKDatabase(); + quorumPeer.setSslQuorum(config.isSslQuorum()); + quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); + quorumPeer.setLearnerType(config.getPeerType()); + quorumPeer.setSyncEnabled(config.getSyncEnabled()); + quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); + if (config.sslQuorumReloadCertFiles) { + quorumPeer.getX509Util().enableCertFileReloading(); + } + quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); + quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); + quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); + + // sets quorum sasl authentication configurations + quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); + if (quorumPeer.isQuorumSaslAuthEnabled()) { + quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); + quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); + quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); + quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); + quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); + } + quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); + + if (config.jvmPauseMonitorToRun) { + quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); + } + + return quorumPeer; + } + +} |