diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java')
-rw-r--r-- | zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java | 2711 |
1 files changed, 0 insertions, 2711 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java deleted file mode 100644 index f6fc87d7716..00000000000 --- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ /dev/null @@ -1,2711 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import static org.apache.zookeeper.common.NetUtils.formatInetAddr; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.io.Writer; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import javax.security.sasl.SaslException; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException.BadArgumentsException; -import org.apache.zookeeper.common.AtomicFileOutputStream; -import org.apache.zookeeper.common.AtomicFileWritingIdiom; -import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement; -import org.apache.zookeeper.common.QuorumX509Util; -import org.apache.zookeeper.common.Time; -import org.apache.zookeeper.common.X509Exception; -import org.apache.zookeeper.jmx.MBeanRegistry; -import org.apache.zookeeper.jmx.ZKMBeanInfo; -import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerMetrics; -import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.ZooKeeperThread; -import org.apache.zookeeper.server.admin.AdminServer; -import org.apache.zookeeper.server.admin.AdminServer.AdminServerException; -import org.apache.zookeeper.server.admin.AdminServerFactory; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; -import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthLearner; -import org.apache.zookeeper.server.quorum.auth.NullQuorumAuthServer; -import org.apache.zookeeper.server.quorum.auth.QuorumAuth; -import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner; -import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer; -import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthLearner; -import org.apache.zookeeper.server.quorum.auth.SaslQuorumAuthServer; -import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; -import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj; -import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import org.apache.zookeeper.server.util.ConfigUtils; -import org.apache.zookeeper.server.util.JvmPauseMonitor; -import org.apache.zookeeper.server.util.ZxidUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class manages the quorum protocol. There are three states this server - * can be in: - * <ol> - * <li>Leader election - each server will elect a leader (proposing itself as a - * leader initially).</li> - * <li>Follower - the server will synchronize with the leader and replicate any - * transactions.</li> - * <li>Leader - the server will process requests and forward them to followers. - * A majority of followers must log the request before it can be accepted. - * </ol> - * - * This class will setup a datagram socket that will always respond with its - * view of the current leader. The response will take the form of: - * - * <pre> - * int xid; - * - * long myid; - * - * long leader_id; - * - * long leader_zxid; - * </pre> - * - * The request for the current leader will consist solely of an xid: int xid; - */ -public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider { - - private static final Logger LOG = LoggerFactory.getLogger(QuorumPeer.class); - - public static final String CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES = "zookeeper.kerberos.canonicalizeHostNames"; - public static final String CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES = "false"; - - private QuorumBean jmxQuorumBean; - LocalPeerBean jmxLocalPeerBean; - private Map<Long, RemotePeerBean> jmxRemotePeerBean; - LeaderElectionBean jmxLeaderElectionBean; - - // The QuorumCnxManager is held through an AtomicReference to ensure cross-thread visibility - // of updates; see the implementation comment at setLastSeenQuorumVerifier(). - private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>(); - - QuorumAuthServer authServer; - QuorumAuthLearner authLearner; - - /** - * ZKDatabase is a top level member of quorumpeer - * which will be used in all the zookeeperservers - * instantiated later. Also, it is created once on - * bootup and only thrown away in case of a truncate - * message from the leader - */ - private ZKDatabase zkDb; - - private JvmPauseMonitor jvmPauseMonitor; - - private final AtomicBoolean suspended = new AtomicBoolean(false); - - public static final class AddressTuple { - - public final MultipleAddresses quorumAddr; - public final MultipleAddresses electionAddr; - public final InetSocketAddress clientAddr; - - public AddressTuple(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) { - this.quorumAddr = quorumAddr; - this.electionAddr = electionAddr; - this.clientAddr = clientAddr; - } - - } - - private int observerMasterPort; - - public int getObserverMasterPort() { - return observerMasterPort; - } - - public void setObserverMasterPort(int observerMasterPort) { - this.observerMasterPort = observerMasterPort; - } - - public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled"; - public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false"; - - private boolean multiAddressEnabled = true; - public boolean isMultiAddressEnabled() { - return multiAddressEnabled; - } - - public void setMultiAddressEnabled(boolean multiAddressEnabled) { - this.multiAddressEnabled = multiAddressEnabled; - LOG.info("multiAddress.enabled set to {}", multiAddressEnabled); - } - - public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs"; - - private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis(); - public int getMultiAddressReachabilityCheckTimeoutMs() { - return multiAddressReachabilityCheckTimeoutMs; - } - - public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabilityCheckTimeoutMs) { - this.multiAddressReachabilityCheckTimeoutMs = multiAddressReachabilityCheckTimeoutMs; - LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs); - } - - public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled"; - - private boolean multiAddressReachabilityCheckEnabled = true; - - public boolean isMultiAddressReachabilityCheckEnabled() { - return multiAddressReachabilityCheckEnabled; - } - - public void setMultiAddressReachabilityCheckEnabled(boolean multiAddressReachabilityCheckEnabled) { - this.multiAddressReachabilityCheckEnabled = multiAddressReachabilityCheckEnabled; - LOG.info("multiAddress.reachabilityCheckEnabled set to {}", multiAddressReachabilityCheckEnabled); - } - - public static class QuorumServer { - - public MultipleAddresses addr = new MultipleAddresses(); - - public MultipleAddresses electionAddr = new MultipleAddresses(); - - public InetSocketAddress clientAddr = null; - - public long id; - - public String hostname; - - public LearnerType type = LearnerType.PARTICIPANT; - - public boolean isClientAddrFromStatic = false; - - private List<InetSocketAddress> myAddrs; - - public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr) { - this(id, addr, electionAddr, clientAddr, LearnerType.PARTICIPANT); - } - - public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr) { - this(id, addr, electionAddr, null, LearnerType.PARTICIPANT); - } - - // VisibleForTesting - public QuorumServer(long id, InetSocketAddress addr) { - this(id, addr, null, null, LearnerType.PARTICIPANT); - } - - public long getId() { - return id; - } - - /** - * Performs a DNS lookup for server address and election address. - * - * If the DNS lookup fails, this.addr and electionAddr remain - * unmodified. - */ - public void recreateSocketAddresses() { - if (this.addr.isEmpty()) { - LOG.warn("Server address has not been initialized"); - return; - } - if (this.electionAddr.isEmpty()) { - LOG.warn("Election address has not been initialized"); - return; - } - this.addr.recreateSocketAddresses(); - this.electionAddr.recreateSocketAddresses(); - } - - private LearnerType getType(String s) throws ConfigException { - switch (s.trim().toLowerCase()) { - case "observer": - return LearnerType.OBSERVER; - case "participant": - return LearnerType.PARTICIPANT; - default: - throw new ConfigException("Unrecognised peertype: " + s); - } - } - - public QuorumServer(long sid, String addressStr) throws ConfigException { - this(sid, addressStr, QuorumServer::getInetAddress); - } - - QuorumServer(long sid, String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException { - this.id = sid; - initializeWithAddressString(addressStr, getInetAddress); - } - - public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, LearnerType type) { - this(id, addr, electionAddr, null, type); - } - - public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) { - this.id = id; - if (addr != null) { - this.addr.addAddress(addr); - } - if (electionAddr != null) { - this.electionAddr.addAddress(electionAddr); - } - this.type = type; - this.clientAddr = clientAddr; - - setMyAddrs(); - } - - private static final String wrongFormat = - " does not have the form server_config or server_config;client_config" - + " where server_config is the pipe separated list of host:port:port or host:port:port:type" - + " and client_config is port or host:port"; - - private void initializeWithAddressString(String addressStr, Function<InetSocketAddress, InetAddress> getInetAddress) throws ConfigException { - LearnerType newType = null; - String[] serverClientParts = addressStr.split(";"); - String[] serverAddresses = serverClientParts[0].split("\\|"); - - if (serverClientParts.length == 2) { - String[] clientParts = ConfigUtils.getHostAndPort(serverClientParts[1]); - if (clientParts.length > 2) { - throw new ConfigException(addressStr + wrongFormat); - } - - // is client_config a host:port or just a port - String clientHostName = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0"; - try { - clientAddr = new InetSocketAddress(clientHostName, Integer.parseInt(clientParts[clientParts.length - 1])); - } catch (NumberFormatException e) { - throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]); - } - } - - boolean multiAddressEnabled = Boolean.parseBoolean( - System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED)); - if (!multiAddressEnabled && serverAddresses.length > 1) { - throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + this.id); - } - - boolean canonicalize = Boolean.parseBoolean( - System.getProperty( - CONFIG_KEY_KERBEROS_CANONICALIZE_HOST_NAMES, - CONFIG_DEFAULT_KERBEROS_CANONICALIZE_HOST_NAMES)); - - for (String serverAddress : serverAddresses) { - String serverParts[] = ConfigUtils.getHostAndPort(serverAddress); - if ((serverClientParts.length > 2) || (serverParts.length < 3) - || (serverParts.length > 4)) { - throw new ConfigException(addressStr + wrongFormat); - } - - String serverHostName = serverParts[0]; - - // server_config should be either host:port:port or host:port:port:type - InetSocketAddress tempAddress; - InetSocketAddress tempElectionAddress; - try { - tempAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[1])); - addr.addAddress(tempAddress); - } catch (NumberFormatException e) { - throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[1]); - } - try { - tempElectionAddress = new InetSocketAddress(serverHostName, Integer.parseInt(serverParts[2])); - electionAddr.addAddress(tempElectionAddress); - } catch (NumberFormatException e) { - throw new ConfigException("Address unresolved: " + serverHostName + ":" + serverParts[2]); - } - - if (tempAddress.getPort() == tempElectionAddress.getPort()) { - throw new ConfigException("Client and election port must be different! Please update the " - + "configuration file on server." + this.id); - } - - if (canonicalize) { - InetAddress ia = getInetAddress.apply(tempAddress); - if (ia == null) { - throw new ConfigException("Unable to canonicalize address " + serverHostName + " because it's not resolvable"); - } - - String canonicalHostName = ia.getCanonicalHostName(); - - if (!canonicalHostName.equals(serverHostName) - // Avoid using literal IP address when - // security check fails - && !canonicalHostName.equals(ia.getHostAddress())) { - LOG.info("Host name for quorum server {} " - + "canonicalized from {} to {}", - this.id, serverHostName, canonicalHostName); - serverHostName = canonicalHostName; - } - } - - if (serverParts.length == 4) { - LearnerType tempType = getType(serverParts[3]); - if (newType == null) { - newType = tempType; - } - - if (newType != tempType) { - throw new ConfigException("Multiple addresses should have similar roles: " + type + " vs " + tempType); - } - } - - this.hostname = serverHostName; - } - - if (newType != null) { - type = newType; - } - - setMyAddrs(); - } - - private static InetAddress getInetAddress(InetSocketAddress addr) { - return addr.getAddress(); - } - - private void setMyAddrs() { - this.myAddrs = new ArrayList<>(); - this.myAddrs.addAll(this.addr.getAllAddresses()); - this.myAddrs.add(this.clientAddr); - this.myAddrs.addAll(this.electionAddr.getAllAddresses()); - this.myAddrs = excludedSpecialAddresses(this.myAddrs); - } - - public static String delimitedHostString(InetSocketAddress addr) { - String host = addr.getHostString(); - if (host.contains(":")) { - return "[" + host + "]"; - } else { - return host; - } - } - - public String toString() { - StringWriter sw = new StringWriter(); - - List<InetSocketAddress> addrList = new LinkedList<>(addr.getAllAddresses()); - List<InetSocketAddress> electionAddrList = new LinkedList<>(electionAddr.getAllAddresses()); - - if (addrList.size() > 0 && electionAddrList.size() > 0) { - addrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); - electionAddrList.sort(Comparator.comparing(InetSocketAddress::getHostString)); - sw.append(IntStream.range(0, addrList.size()).mapToObj(i -> String.format("%s:%d:%d", - delimitedHostString(addrList.get(i)), addrList.get(i).getPort(), electionAddrList.get(i).getPort())) - .collect(Collectors.joining("|"))); - } - - if (type == LearnerType.OBSERVER) { - sw.append(":observer"); - } else if (type == LearnerType.PARTICIPANT) { - sw.append(":participant"); - } - - if (clientAddr != null && !isClientAddrFromStatic) { - sw.append(";"); - sw.append(delimitedHostString(clientAddr)); - sw.append(":"); - sw.append(String.valueOf(clientAddr.getPort())); - } - - return sw.toString(); - } - - public int hashCode() { - assert false : "hashCode not designed"; - return 42; // any arbitrary constant will do - } - - private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2) { - return (addr1 != null || addr2 == null) - && (addr1 == null || addr2 != null) - && (addr1 == null || addr2 == null || addr1.equals(addr2)); - } - - public boolean equals(Object o) { - if (!(o instanceof QuorumServer)) { - return false; - } - QuorumServer qs = (QuorumServer) o; - if ((qs.id != id) || (qs.type != type)) { - return false; - } - if (!addr.equals(qs.addr)) { - return false; - } - if (!electionAddr.equals(qs.electionAddr)) { - return false; - } - return checkAddressesEqual(clientAddr, qs.clientAddr); - } - - public void checkAddressDuplicate(QuorumServer s) throws BadArgumentsException { - List<InetSocketAddress> otherAddrs = new ArrayList<>(s.addr.getAllAddresses()); - otherAddrs.add(s.clientAddr); - otherAddrs.addAll(s.electionAddr.getAllAddresses()); - otherAddrs = excludedSpecialAddresses(otherAddrs); - - for (InetSocketAddress my : this.myAddrs) { - - for (InetSocketAddress other : otherAddrs) { - if (my.equals(other)) { - String error = String.format("%s of server.%d conflicts %s of server.%d", my, this.id, other, s.id); - throw new BadArgumentsException(error); - } - } - } - } - - private List<InetSocketAddress> excludedSpecialAddresses(List<InetSocketAddress> addrs) { - List<InetSocketAddress> included = new ArrayList<>(); - - for (InetSocketAddress addr : addrs) { - if (addr == null) { - continue; - } - InetAddress inetaddr = addr.getAddress(); - - if (inetaddr == null || inetaddr.isAnyLocalAddress() // wildCard addresses (0.0.0.0 or [::]) - || inetaddr.isLoopbackAddress()) { // loopback address(localhost/127.0.0.1) - continue; - } - included.add(addr); - } - return included; - } - - } - - public enum ServerState { - LOOKING, - FOLLOWING, - LEADING, - OBSERVING - } - - /** - * (Used for monitoring) shows the current phase of - * Zab protocol that peer is running. - */ - public enum ZabState { - ELECTION, - DISCOVERY, - SYNCHRONIZATION, - BROADCAST - } - - /** - * (Used for monitoring) When peer is in synchronization phase, this shows - * which synchronization mechanism is being used - */ - public enum SyncMode { - NONE, - DIFF, - SNAP, - TRUNC - } - - /* - * A peer can either be participating, which implies that it is willing to - * both vote in instances of consensus and to elect or become a Leader, or - * it may be observing in which case it isn't. - * - * We need this distinction to decide which ServerState to move to when - * conditions change (e.g. which state to become after LOOKING). - */ - public enum LearnerType { - PARTICIPANT, - OBSERVER - } - - /* - * To enable observers to have no identifier, we need a generic identifier - * at least for QuorumCnxManager. We use the following constant to as the - * value of such a generic identifier. - */ - - static final long OBSERVER_ID = Long.MAX_VALUE; - - /* - * Record leader election time - */ - public long start_fle, end_fle; // fle = fast leader election - public static final String FLE_TIME_UNIT = "MS"; - private long unavailableStartTime; - - /* - * Default value of peer is participant - */ - private LearnerType learnerType = LearnerType.PARTICIPANT; - - public LearnerType getLearnerType() { - return learnerType; - } - - /** - * Sets the LearnerType - */ - public void setLearnerType(LearnerType p) { - learnerType = p; - } - - protected synchronized void setConfigFileName(String s) { - configFilename = s; - } - - private String configFilename = null; - - public int getQuorumSize() { - return getVotingView().size(); - } - - public void setJvmPauseMonitor(JvmPauseMonitor jvmPauseMonitor) { - this.jvmPauseMonitor = jvmPauseMonitor; - } - - /** - * QuorumVerifier implementation; default (majority). - */ - - //last committed quorum verifier - private QuorumVerifier quorumVerifier; - - //last proposed quorum verifier - private QuorumVerifier lastSeenQuorumVerifier = null; - - // Lock object that guard access to quorumVerifier and lastSeenQuorumVerifier. - final Object QV_LOCK = new Object(); - - /** - * My id - */ - private long myid; - - /** - * get the id of this quorum peer. - */ - public long getMyId() { - return myid; - } - - // VisibleForTesting - void setId(long id) { - this.myid = id; - } - - private boolean sslQuorum; - private boolean shouldUsePortUnification; - - public boolean isSslQuorum() { - return sslQuorum; - } - - public boolean shouldUsePortUnification() { - return shouldUsePortUnification; - } - - private final QuorumX509Util x509Util; - - QuorumX509Util getX509Util() { - return x509Util; - } - - /** - * This is who I think the leader currently is. - */ - private volatile Vote currentVote; - - public synchronized Vote getCurrentVote() { - return currentVote; - } - - public synchronized void setCurrentVote(Vote v) { - currentVote = v; - } - - private volatile boolean running = true; - - private String initialConfig; - - /** - * The number of milliseconds of each tick - */ - protected int tickTime; - - /** - * Whether learners in this quorum should create new sessions as local. - * False by default to preserve existing behavior. - */ - protected boolean localSessionsEnabled = false; - - /** - * Whether learners in this quorum should upgrade local sessions to - * global. Only matters if local sessions are enabled. - */ - protected boolean localSessionsUpgradingEnabled = true; - - /** - * Minimum number of milliseconds to allow for session timeout. - * A value of -1 indicates unset, use default. - */ - protected int minSessionTimeout = -1; - - /** - * Maximum number of milliseconds to allow for session timeout. - * A value of -1 indicates unset, use default. - */ - protected int maxSessionTimeout = -1; - - /** - * The ZooKeeper server's socket backlog length. The number of connections - * that will be queued to be read before new connections are dropped. A - * value of one indicates the default backlog will be used. - */ - protected int clientPortListenBacklog = -1; - - /** - * The number of ticks that the initial synchronization phase can take - */ - protected volatile int initLimit; - - /** - * The number of ticks that can pass between sending a request and getting - * an acknowledgment - */ - protected volatile int syncLimit; - - /** - * The number of ticks that can pass before retrying to connect to learner master - */ - protected volatile int connectToLearnerMasterLimit; - - /** - * Enables/Disables sync request processor. This option is enabled - * by default and is to be used with observers. - */ - protected boolean syncEnabled = true; - - /** - * The current tick - */ - protected AtomicInteger tick = new AtomicInteger(); - - /** - * Whether or not to listen on all IPs for the two quorum ports - * (broadcast and fast leader election). - */ - protected boolean quorumListenOnAllIPs = false; - - /** - * Keeps time taken for leader election in milliseconds. Sets the value to - * this variable only after the completion of leader election. - */ - private long electionTimeTaken = -1; - - /** - * Enable/Disables quorum authentication using sasl. Defaulting to false. - */ - protected boolean quorumSaslEnableAuth; - - /** - * If this is false, quorum peer server will accept another quorum peer client - * connection even if the authentication did not succeed. This can be used while - * upgrading ZooKeeper server. Defaulting to false (required). - */ - protected boolean quorumServerSaslAuthRequired; - - /** - * If this is false, quorum peer learner will talk to quorum peer server - * without authentication. This can be used while upgrading ZooKeeper - * server. Defaulting to false (required). - */ - protected boolean quorumLearnerSaslAuthRequired; - - /** - * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'. - */ - protected String quorumServicePrincipal; - - /** - * Quorum learner login context name in jaas-conf file to read the kerberos - * security details. Defaulting to 'QuorumLearner'. - */ - protected String quorumLearnerLoginContext; - - /** - * Quorum server login context name in jaas-conf file to read the kerberos - * security details. Defaulting to 'QuorumServer'. - */ - protected String quorumServerLoginContext; - - // TODO: need to tune the default value of thread size - private static final int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20; - /** - * The maximum number of threads to allow in the connectionExecutors thread - * pool which will be used to initiate quorum server connections. - */ - protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE; - - public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs"; - private static int quorumCnxnTimeoutMs; - - static { - quorumCnxnTimeoutMs = Integer.getInteger(QUORUM_CNXN_TIMEOUT_MS, -1); - LOG.info("{}={}", QUORUM_CNXN_TIMEOUT_MS, quorumCnxnTimeoutMs); - } - - /** - * @deprecated As of release 3.4.0, this class has been deprecated, since - * it is used with one of the udp-based versions of leader election, which - * we are also deprecating. - * - * This class simply responds to requests for the current leader of this - * node. - * <p> - * The request contains just an xid generated by the requestor. - * <p> - * The response has the xid, the id of this server, the id of the leader, - * and the zxid of the leader. - * - * - */ - @Deprecated - class ResponderThread extends ZooKeeperThread { - - ResponderThread() { - super("ResponderThread"); - } - - volatile boolean running = true; - - @Override - public void run() { - try { - byte[] b = new byte[36]; - ByteBuffer responseBuffer = ByteBuffer.wrap(b); - DatagramPacket packet = new DatagramPacket(b, b.length); - while (running) { - udpSocket.receive(packet); - if (packet.getLength() != 4) { - LOG.warn("Got more than just an xid! Len = {}", packet.getLength()); - } else { - responseBuffer.clear(); - responseBuffer.getInt(); // Skip the xid - responseBuffer.putLong(myid); - Vote current = getCurrentVote(); - switch (getPeerState()) { - case LOOKING: - responseBuffer.putLong(current.getId()); - responseBuffer.putLong(current.getZxid()); - break; - case LEADING: - responseBuffer.putLong(myid); - try { - long proposed; - synchronized (leader) { - proposed = leader.lastProposed; - } - responseBuffer.putLong(proposed); - } catch (NullPointerException npe) { - // This can happen in state transitions, - // just ignore the request - } - break; - case FOLLOWING: - responseBuffer.putLong(current.getId()); - try { - responseBuffer.putLong(follower.getZxid()); - } catch (NullPointerException npe) { - // This can happen in state transitions, - // just ignore the request - } - break; - case OBSERVING: - // Do nothing, Observers keep themselves to - // themselves. - break; - } - packet.setData(b); - udpSocket.send(packet); - } - packet.setLength(b.length); - } - } catch (RuntimeException e) { - LOG.warn("Unexpected runtime exception in ResponderThread", e); - } catch (IOException e) { - LOG.warn("Unexpected IO exception in ResponderThread", e); - } finally { - LOG.warn("QuorumPeer responder thread exited"); - } - } - - } - - private ServerState state = ServerState.LOOKING; - - private AtomicReference<ZabState> zabState = new AtomicReference<>(ZabState.ELECTION); - private AtomicReference<SyncMode> syncMode = new AtomicReference<>(SyncMode.NONE); - private AtomicReference<String> leaderAddress = new AtomicReference<>(""); - private AtomicLong leaderId = new AtomicLong(-1); - - private boolean reconfigFlag = false; // indicates that a reconfig just committed - - public synchronized void setPeerState(ServerState newState) { - state = newState; - if (newState == ServerState.LOOKING) { - setLeaderAddressAndId(null, -1); - setZabState(ZabState.ELECTION); - } else { - LOG.info("Peer state changed: {}", getDetailedPeerState()); - } - } - - public void setZabState(ZabState zabState) { - if ((zabState == ZabState.BROADCAST) && (unavailableStartTime != 0)) { - long unavailableTime = Time.currentElapsedTime() - unavailableStartTime; - ServerMetrics.getMetrics().UNAVAILABLE_TIME.add(unavailableTime); - if (getPeerState() == ServerState.LEADING) { - ServerMetrics.getMetrics().LEADER_UNAVAILABLE_TIME.add(unavailableTime); - } - unavailableStartTime = 0; - } - this.zabState.set(zabState); - LOG.info("Peer state changed: {}", getDetailedPeerState()); - } - - public void setSyncMode(SyncMode syncMode) { - this.syncMode.set(syncMode); - LOG.info("Peer state changed: {}", getDetailedPeerState()); - } - - public ZabState getZabState() { - return zabState.get(); - } - - public SyncMode getSyncMode() { - return syncMode.get(); - } - - public void setLeaderAddressAndId(MultipleAddresses addr, long newId) { - if (addr != null) { - leaderAddress.set(String.join("|", addr.getAllHostStrings())); - } else { - leaderAddress.set(null); - } - leaderId.set(newId); - } - - public String getLeaderAddress() { - return leaderAddress.get(); - } - - public long getLeaderId() { - return leaderId.get(); - } - - public String getDetailedPeerState() { - final StringBuilder sb = new StringBuilder(getPeerState().toString().toLowerCase()); - final ZabState zabState = getZabState(); - if (!ZabState.ELECTION.equals(zabState)) { - sb.append(" - ").append(zabState.toString().toLowerCase()); - } - final SyncMode syncMode = getSyncMode(); - if (!SyncMode.NONE.equals(syncMode)) { - sb.append(" - ").append(syncMode.toString().toLowerCase()); - } - return sb.toString(); - } - - public synchronized void reconfigFlagSet() { - reconfigFlag = true; - } - public synchronized void reconfigFlagClear() { - reconfigFlag = false; - } - public synchronized boolean isReconfigStateChange() { - return reconfigFlag; - } - public synchronized ServerState getPeerState() { - return state; - } - - DatagramSocket udpSocket; - - private final AtomicReference<AddressTuple> myAddrs = new AtomicReference<>(); - - /** - * Resolves hostname for a given server ID. - * - * This method resolves hostname for a given server ID in both quorumVerifer - * and lastSeenQuorumVerifier. If the server ID matches the local server ID, - * it also updates myAddrs. - */ - public void recreateSocketAddresses(long id) { - QuorumVerifier qv = getQuorumVerifier(); - if (qv != null) { - QuorumServer qs = qv.getAllMembers().get(id); - if (qs != null) { - qs.recreateSocketAddresses(); - if (id == getMyId()) { - setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); - } - } - } - qv = getLastSeenQuorumVerifier(); - if (qv != null) { - QuorumServer qs = qv.getAllMembers().get(id); - if (qs != null) { - qs.recreateSocketAddresses(); - } - } - } - - private AddressTuple getAddrs() { - AddressTuple addrs = myAddrs.get(); - if (addrs != null) { - return addrs; - } - try { - synchronized (QV_LOCK) { - addrs = myAddrs.get(); - while (addrs == null) { - QV_LOCK.wait(); - addrs = myAddrs.get(); - } - return addrs; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - public MultipleAddresses getQuorumAddress() { - return getAddrs().quorumAddr; - } - - public MultipleAddresses getElectionAddress() { - return getAddrs().electionAddr; - } - - public InetSocketAddress getClientAddress() { - final AddressTuple addrs = myAddrs.get(); - return (addrs == null) ? null : addrs.clientAddr; - } - - private void setAddrs(MultipleAddresses quorumAddr, MultipleAddresses electionAddr, InetSocketAddress clientAddr) { - synchronized (QV_LOCK) { - myAddrs.set(new AddressTuple(quorumAddr, electionAddr, clientAddr)); - QV_LOCK.notifyAll(); - } - } - - private int electionType; - - Election electionAlg; - - ServerCnxnFactory cnxnFactory; - ServerCnxnFactory secureCnxnFactory; - - private FileTxnSnapLog logFactory = null; - - private final QuorumStats quorumStats; - - AdminServer adminServer; - - private final boolean reconfigEnabled; - - public static QuorumPeer testingQuorumPeer() throws SaslException { - return new QuorumPeer(); - } - - public QuorumPeer() throws SaslException { - super("QuorumPeer"); - quorumStats = new QuorumStats(this); - jmxRemotePeerBean = new HashMap<>(); - adminServer = AdminServerFactory.createAdminServer(); - x509Util = createX509Util(); - initialize(); - reconfigEnabled = QuorumPeerConfig.isReconfigEnabled(); - } - - // VisibleForTesting - QuorumX509Util createX509Util() { - return new QuorumX509Util(); - } - - /** - * For backward compatibility purposes, we instantiate QuorumMaj by default. - */ - - public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, ServerCnxnFactory cnxnFactory) throws IOException { - this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory, new QuorumMaj(quorumPeers)); - } - - public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { - this(); - this.cnxnFactory = cnxnFactory; - this.electionType = electionType; - this.myid = myid; - this.tickTime = tickTime; - this.initLimit = initLimit; - this.syncLimit = syncLimit; - this.connectToLearnerMasterLimit = connectToLearnerMasterLimit; - this.quorumListenOnAllIPs = quorumListenOnAllIPs; - this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); - this.zkDb = new ZKDatabase(this.logFactory); - if (quorumConfig == null) { - quorumConfig = new QuorumMaj(quorumPeers); - } - setQuorumVerifier(quorumConfig, false); - adminServer = AdminServerFactory.createAdminServer(); - } - - public void initialize() throws SaslException { - // init quorum auth server & learner - if (isQuorumSaslAuthEnabled()) { - Set<String> authzHosts = new HashSet<>(); - for (QuorumServer qs : getView().values()) { - authzHosts.add(qs.hostname); - } - authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts); - authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext); - } else { - authServer = new NullQuorumAuthServer(); - authLearner = new NullQuorumAuthLearner(); - } - } - - QuorumStats quorumStats() { - return quorumStats; - } - - @Override - public synchronized void start() { - if (!getView().containsKey(myid)) { - throw new RuntimeException("My id " + myid + " not in the peer list"); - } - loadDataBase(); - startServerCnxnFactory(); - try { - adminServer.start(); - } catch (AdminServerException e) { - LOG.warn("Problem starting AdminServer", e); - } - startLeaderElection(); - startJvmPauseMonitor(); - super.start(); - } - - private void loadDataBase() { - try { - zkDb.loadDataBase(); - - // load the epochs - long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; - long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); - try { - currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); - } catch (FileNotFoundException e) { - // pick a reasonable epoch number - // this should only happen once when moving to a - // new code version - currentEpoch = epochOfZxid; - LOG.info( - "{} not found! Creating with a reasonable default of {}. " - + "This should only happen when you are upgrading your installation", - CURRENT_EPOCH_FILENAME, - currentEpoch); - writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); - } - if (epochOfZxid > currentEpoch) { - // acceptedEpoch.tmp file in snapshot directory - File currentTmp = new File(getTxnFactory().getSnapDir(), - CURRENT_EPOCH_FILENAME + AtomicFileOutputStream.TMP_EXTENSION); - if (currentTmp.exists()) { - long epochOfTmp = readLongFromFile(currentTmp.getName()); - LOG.info("{} found. Setting current epoch to {}.", currentTmp, epochOfTmp); - setCurrentEpoch(epochOfTmp); - } else { - throw new IOException( - "The current epoch, " + ZxidUtils.zxidToString(currentEpoch) - + ", is older than the last zxid, " + lastProcessedZxid); - } - } - try { - acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); - } catch (FileNotFoundException e) { - // pick a reasonable epoch number - // this should only happen once when moving to a - // new code version - acceptedEpoch = epochOfZxid; - LOG.info( - "{} not found! Creating with a reasonable default of {}. " - + "This should only happen when you are upgrading your installation", - ACCEPTED_EPOCH_FILENAME, - acceptedEpoch); - writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); - } - if (acceptedEpoch < currentEpoch) { - throw new IOException("The accepted epoch, " - + ZxidUtils.zxidToString(acceptedEpoch) - + " is less than the current epoch, " - + ZxidUtils.zxidToString(currentEpoch)); - } - } catch (IOException ie) { - LOG.error("Unable to load database on disk", ie); - throw new RuntimeException("Unable to run quorum server ", ie); - } - } - - ResponderThread responder; - - public synchronized void stopLeaderElection() { - responder.running = false; - responder.interrupt(); - } - public synchronized void startLeaderElection() { - try { - if (getPeerState() == ServerState.LOOKING) { - currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); - } - } catch (IOException e) { - RuntimeException re = new RuntimeException(e.getMessage()); - re.setStackTrace(e.getStackTrace()); - throw re; - } - - this.electionAlg = createElectionAlgorithm(electionType); - } - - private void startJvmPauseMonitor() { - if (this.jvmPauseMonitor != null) { - this.jvmPauseMonitor.serviceStart(); - } - } - - /** - * Count the number of nodes in the map that could be followers. - * @param peers - * @return The number of followers in the map - */ - protected static int countParticipants(Map<Long, QuorumServer> peers) { - int count = 0; - for (QuorumServer q : peers.values()) { - if (q.type == LearnerType.PARTICIPANT) { - count++; - } - } - return count; - } - - /** - * This constructor is only used by the existing unit test code. - * It defaults to FileLogProvider persistence provider. - */ - public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit) throws IOException { - this( - quorumPeers, - snapDir, - logDir, - electionAlg, - myid, - tickTime, - initLimit, - syncLimit, - connectToLearnerMasterLimit, - false, - ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), - new QuorumMaj(quorumPeers)); - } - - public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, String oraclePath) throws IOException { - this( - quorumPeers, - snapDir, - logDir, - electionAlg, - myid, - tickTime, - initLimit, - syncLimit, - connectToLearnerMasterLimit, - false, - ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), - new QuorumOracleMaj(quorumPeers, oraclePath)); - } - - /** - * This constructor is only used by the existing unit test code. - * It defaults to FileLogProvider persistence provider. - */ - public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort, int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, QuorumVerifier quorumConfig) throws IOException { - this( - quorumPeers, - snapDir, - logDir, - electionAlg, - myid, - tickTime, - initLimit, - syncLimit, - connectToLearnerMasterLimit, - false, - ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1), - quorumConfig); - } - - private static InetSocketAddress getClientAddress(Map<Long, QuorumServer> quorumPeers, long myid, int clientPort) throws IOException { - QuorumServer quorumServer = quorumPeers.get(myid); - if (null == quorumServer) { - throw new IOException("No QuorumServer correspoding to myid " + myid); - } - if (null == quorumServer.clientAddr) { - return new InetSocketAddress(clientPort); - } - if (quorumServer.clientAddr.getPort() != clientPort) { - throw new IOException("QuorumServer port " - + quorumServer.clientAddr.getPort() - + " does not match with given port " - + clientPort); - } - return quorumServer.clientAddr; - } - - /** - * returns the highest zxid that this host has seen - * - * @return the highest zxid for this host - */ - public long getLastLoggedZxid() { - if (!zkDb.isInitialized()) { - loadDataBase(); - } - return zkDb.getDataTreeLastProcessedZxid(); - } - - public Follower follower; - public Leader leader; - public Observer observer; - - protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { - return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb)); - } - - protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { - return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb)); - } - - protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException { - return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb)); - } - - @SuppressWarnings("deprecation") - protected Election createElectionAlgorithm(int electionAlgorithm) { - Election le = null; - - //TODO: use a factory rather than a switch - switch (electionAlgorithm) { - case 1: - throw new UnsupportedOperationException("Election Algorithm 1 is not supported."); - case 2: - throw new UnsupportedOperationException("Election Algorithm 2 is not supported."); - case 3: - QuorumCnxManager qcm = createCnxnManager(); - QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); - if (oldQcm != null) { - LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)"); - oldQcm.halt(); - } - QuorumCnxManager.Listener listener = qcm.listener; - if (listener != null) { - listener.start(); - FastLeaderElection fle = new FastLeaderElection(this, qcm); - fle.start(); - le = fle; - } else { - LOG.error("Null listener when initializing cnx manager"); - } - break; - default: - assert false; - } - return le; - } - - @SuppressWarnings("deprecation") - protected Election makeLEStrategy() { - LOG.debug("Initializing leader election protocol..."); - return electionAlg; - } - - protected synchronized void setLeader(Leader newLeader) { - leader = newLeader; - } - - protected synchronized void setFollower(Follower newFollower) { - follower = newFollower; - } - - protected synchronized void setObserver(Observer newObserver) { - observer = newObserver; - } - - public synchronized ZooKeeperServer getActiveServer() { - if (leader != null) { - return leader.zk; - } else if (follower != null) { - return follower.zk; - } else if (observer != null) { - return observer.zk; - } - return null; - } - - boolean shuttingDownLE = false; - - public void setSuspended(boolean suspended) { - this.suspended.set(suspended); - } - private void checkSuspended() { - try { - while (suspended.get()) { - Thread.sleep(10); - } - } catch (InterruptedException err) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void run() { - updateThreadName(); - - LOG.debug("Starting quorum peer"); - try { - jmxQuorumBean = new QuorumBean(this); - MBeanRegistry.getInstance().register(jmxQuorumBean, null); - for (QuorumServer s : getView().values()) { - ZKMBeanInfo p; - if (getMyId() == s.id) { - p = jmxLocalPeerBean = new LocalPeerBean(this); - try { - MBeanRegistry.getInstance().register(p, jmxQuorumBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - jmxLocalPeerBean = null; - } - } else { - RemotePeerBean rBean = new RemotePeerBean(this, s); - try { - MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); - jmxRemotePeerBean.put(s.id, rBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - } - } - } - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - jmxQuorumBean = null; - } - - try { - /* - * Main loop - */ - while (running) { - if (unavailableStartTime == 0) { - unavailableStartTime = Time.currentElapsedTime(); - } - - switch (getPeerState()) { - case LOOKING: - LOG.info("LOOKING"); - ServerMetrics.getMetrics().LOOKING_COUNT.add(1); - - if (Boolean.getBoolean("readonlymode.enabled")) { - LOG.info("Attempting to start ReadOnlyZooKeeperServer"); - - // Create read-only server but don't start it immediately - final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); - - // Instead of starting roZk immediately, wait some grace - // period before we decide we're partitioned. - // - // Thread is used here because otherwise it would require - // changes in each of election strategy classes which is - // unnecessary code coupling. - Thread roZkMgr = new Thread() { - public void run() { - try { - // lower-bound grace period to 2 secs - sleep(Math.max(2000, tickTime)); - if (ServerState.LOOKING.equals(getPeerState())) { - roZk.startup(); - } - } catch (InterruptedException e) { - LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); - } catch (Exception e) { - LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); - } - } - }; - try { - roZkMgr.start(); - reconfigFlagClear(); - if (shuttingDownLE) { - shuttingDownLE = false; - startLeaderElection(); - } - setCurrentVote(makeLEStrategy().lookForLeader()); - checkSuspended(); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - setPeerState(ServerState.LOOKING); - } finally { - // If the thread is in the the grace period, interrupt - // to come out of waiting. - roZkMgr.interrupt(); - roZk.shutdown(); - } - } else { - try { - reconfigFlagClear(); - if (shuttingDownLE) { - shuttingDownLE = false; - startLeaderElection(); - } - setCurrentVote(makeLEStrategy().lookForLeader()); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - setPeerState(ServerState.LOOKING); - } - } - break; - case OBSERVING: - try { - LOG.info("OBSERVING"); - setObserver(makeObserver(logFactory)); - observer.observeLeader(); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - } finally { - observer.shutdown(); - setObserver(null); - updateServerState(); - - // Add delay jitter before we switch to LOOKING - // state to reduce the load of ObserverMaster - if (isRunning()) { - Observer.waitForObserverElectionDelay(); - } - } - break; - case FOLLOWING: - try { - LOG.info("FOLLOWING"); - setFollower(makeFollower(logFactory)); - follower.followLeader(); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - } finally { - follower.shutdown(); - setFollower(null); - updateServerState(); - } - break; - case LEADING: - LOG.info("LEADING"); - try { - setLeader(makeLeader(logFactory)); - leader.lead(); - setLeader(null); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - } finally { - if (leader != null) { - leader.shutdown("Forcing shutdown"); - setLeader(null); - } - updateServerState(); - } - break; - } - } - } finally { - LOG.warn("QuorumPeer main thread exited"); - MBeanRegistry instance = MBeanRegistry.getInstance(); - instance.unregister(jmxQuorumBean); - instance.unregister(jmxLocalPeerBean); - - for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { - instance.unregister(remotePeerBean); - } - - jmxQuorumBean = null; - jmxLocalPeerBean = null; - jmxRemotePeerBean = null; - } - } - - private synchronized void updateServerState() { - if (!reconfigFlag) { - setPeerState(ServerState.LOOKING); - LOG.warn("PeerState set to LOOKING"); - return; - } - - if (getMyId() == getCurrentVote().getId()) { - setPeerState(ServerState.LEADING); - LOG.debug("PeerState set to LEADING"); - } else if (getLearnerType() == LearnerType.PARTICIPANT) { - setPeerState(ServerState.FOLLOWING); - LOG.debug("PeerState set to FOLLOWING"); - } else if (getLearnerType() == LearnerType.OBSERVER) { - setPeerState(ServerState.OBSERVING); - LOG.debug("PeerState set to OBSERVER"); - } else { // currently shouldn't happen since there are only 2 learner types - setPeerState(ServerState.LOOKING); - LOG.debug("Should not be here"); - } - reconfigFlag = false; - } - - public void shutdown() { - running = false; - x509Util.close(); - if (leader != null) { - leader.shutdown("quorum Peer shutdown"); - } - if (follower != null) { - follower.shutdown(); - } - shutdownServerCnxnFactory(); - if (udpSocket != null) { - udpSocket.close(); - } - if (jvmPauseMonitor != null) { - jvmPauseMonitor.serviceStop(); - } - - try { - adminServer.shutdown(); - } catch (AdminServerException e) { - LOG.warn("Problem stopping AdminServer", e); - } - - if (getElectionAlg() != null) { - this.interrupt(); - getElectionAlg().shutdown(); - } - try { - zkDb.close(); - } catch (IOException ie) { - LOG.warn("Error closing logs ", ie); - } - } - - /** - * A 'view' is a node's current opinion of the membership of the entire - * ensemble. - */ - public Map<Long, QuorumPeer.QuorumServer> getView() { - return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers()); - } - - /** - * Observers are not contained in this view, only nodes with - * PeerType=PARTICIPANT. - */ - public Map<Long, QuorumPeer.QuorumServer> getVotingView() { - return getQuorumVerifier().getVotingMembers(); - } - - /** - * Returns only observers, no followers. - */ - public Map<Long, QuorumPeer.QuorumServer> getObservingView() { - return getQuorumVerifier().getObservingMembers(); - } - - public synchronized Set<Long> getCurrentAndNextConfigVoters() { - Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet()); - if (getLastSeenQuorumVerifier() != null) { - voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers().keySet()); - } - return voterIds; - } - - /** - * Check if a node is in the current view. With static membership, the - * result of this check will never change; only when dynamic membership - * is introduced will this be more useful. - */ - public boolean viewContains(Long sid) { - return this.getView().containsKey(sid); - } - - /** - * Only used by QuorumStats at the moment - */ - public String[] getQuorumPeers() { - List<String> l = new ArrayList<>(); - synchronized (this) { - if (leader != null) { - for (LearnerHandler fh : leader.getLearners()) { - if (fh.getSocket() != null) { - String s = formatInetAddr((InetSocketAddress) fh.getSocket().getRemoteSocketAddress()); - if (leader.isLearnerSynced(fh)) { - s += "*"; - } - l.add(s); - } - } - } else if (follower != null) { - l.add(formatInetAddr((InetSocketAddress) follower.sock.getRemoteSocketAddress())); - } - } - return l.toArray(new String[0]); - } - - public String getServerState() { - switch (getPeerState()) { - case LOOKING: - return QuorumStats.Provider.LOOKING_STATE; - case LEADING: - return QuorumStats.Provider.LEADING_STATE; - case FOLLOWING: - return QuorumStats.Provider.FOLLOWING_STATE; - case OBSERVING: - return QuorumStats.Provider.OBSERVING_STATE; - } - return QuorumStats.Provider.UNKNOWN_STATE; - } - - /** - * set the id of this quorum peer. - */ - public void setMyid(long myid) { - this.myid = myid; - } - - public void setInitialConfig(String initialConfig) { - this.initialConfig = initialConfig; - } - - public String getInitialConfig() { - return initialConfig; - } - - /** - * Get the number of milliseconds of each tick - */ - public int getTickTime() { - return tickTime; - } - - /** - * Set the number of milliseconds of each tick - */ - public void setTickTime(int tickTime) { - LOG.info("tickTime set to {}", tickTime); - this.tickTime = tickTime; - } - - /** Maximum number of connections allowed from particular host (ip) */ - public int getMaxClientCnxnsPerHost() { - if (cnxnFactory != null) { - return cnxnFactory.getMaxClientCnxnsPerHost(); - } - if (secureCnxnFactory != null) { - return secureCnxnFactory.getMaxClientCnxnsPerHost(); - } - return -1; - } - - /** Whether local sessions are enabled */ - public boolean areLocalSessionsEnabled() { - return localSessionsEnabled; - } - - /** Whether to enable local sessions */ - public void enableLocalSessions(boolean flag) { - LOG.info("Local sessions {}", (flag ? "enabled" : "disabled")); - localSessionsEnabled = flag; - } - - /** Whether local sessions are allowed to upgrade to global sessions */ - public boolean isLocalSessionsUpgradingEnabled() { - return localSessionsUpgradingEnabled; - } - - /** Whether to allow local sessions to upgrade to global sessions */ - public void enableLocalSessionsUpgrading(boolean flag) { - LOG.info("Local session upgrading {}", (flag ? "enabled" : "disabled")); - localSessionsUpgradingEnabled = flag; - } - - /** minimum session timeout in milliseconds */ - public int getMinSessionTimeout() { - return minSessionTimeout; - } - - /** minimum session timeout in milliseconds */ - public void setMinSessionTimeout(int min) { - LOG.info("minSessionTimeout set to {}", min); - this.minSessionTimeout = min; - } - - /** maximum session timeout in milliseconds */ - public int getMaxSessionTimeout() { - return maxSessionTimeout; - } - - /** maximum session timeout in milliseconds */ - public void setMaxSessionTimeout(int max) { - LOG.info("maxSessionTimeout set to {}", max); - this.maxSessionTimeout = max; - } - - /** The server socket's listen backlog length */ - public int getClientPortListenBacklog() { - return this.clientPortListenBacklog; - } - - /** Sets the server socket's listen backlog length. */ - public void setClientPortListenBacklog(int backlog) { - this.clientPortListenBacklog = backlog; - } - - /** - * Get the number of ticks that the initial synchronization phase can take - */ - public int getInitLimit() { - return initLimit; - } - - /** - * Set the number of ticks that the initial synchronization phase can take - */ - public void setInitLimit(int initLimit) { - LOG.info("initLimit set to {}", initLimit); - this.initLimit = initLimit; - } - - /** - * Get the current tick - */ - public int getTick() { - return tick.get(); - } - - public QuorumVerifier configFromString(String s) throws IOException, ConfigException { - Properties props = new Properties(); - props.load(new StringReader(s)); - return QuorumPeerConfig.parseDynamicConfig(props, electionType, false, false, getQuorumVerifier().getOraclePath()); - } - - /** - * Return QuorumVerifier object for the last committed configuration. - */ - public QuorumVerifier getQuorumVerifier() { - synchronized (QV_LOCK) { - return quorumVerifier; - } - } - - /** - * Return QuorumVerifier object for the last proposed configuration. - */ - public QuorumVerifier getLastSeenQuorumVerifier() { - synchronized (QV_LOCK) { - return lastSeenQuorumVerifier; - } - } - - public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW) { - if (qvOLD == null || !qvOLD.equals(qvNEW)) { - LOG.warn("Restarting Leader Election"); - getElectionAlg().shutdown(); - shuttingDownLE = false; - startLeaderElection(); - } - } - - public String getNextDynamicConfigFilename() { - if (configFilename == null) { - LOG.warn("configFilename is null! This should only happen in tests."); - return null; - } - return configFilename + QuorumPeerConfig.nextDynamicConfigFileSuffix; - } - - // On entry to this method, qcm must be non-null and the locks on both qcm and QV_LOCK - // must be held. We don't want quorumVerifier/lastSeenQuorumVerifier to change out from - // under us, so we have to hold QV_LOCK; and since the call to qcm.connectOne() will take - // the lock on qcm (and take QV_LOCK again inside that), the caller needs to have taken - // qcm outside QV_LOCK to avoid a deadlock against other callers of qcm.connectOne(). - private void connectNewPeers(QuorumCnxManager qcm) { - if (quorumVerifier != null && lastSeenQuorumVerifier != null) { - Map<Long, QuorumServer> committedView = quorumVerifier.getAllMembers(); - for (Entry<Long, QuorumServer> e : lastSeenQuorumVerifier.getAllMembers().entrySet()) { - if (e.getKey() != getMyId() && !committedView.containsKey(e.getKey())) { - qcm.connectOne(e.getKey()); - } - } - } - } - - public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { - if (!isReconfigEnabled()) { - LOG.info("Dynamic reconfig is disabled, we don't store the last seen config."); - return; - } - - // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm - // and then take QV_LOCK. Take the locks in the same order to ensure that we don't - // deadlock against other callers of connectOne(). If qcmRef gets set in another - // thread while we're inside the synchronized block, that does no harm; if we didn't - // take a lock on qcm (because it was null when we sampled it), we won't call - // connectOne() on it. (Use of an AtomicReference is enough to guarantee visibility - // of updates that provably happen in another thread before entering this method.) - QuorumCnxManager qcm = qcmRef.get(); - Object outerLockObject = (qcm != null) ? qcm : QV_LOCK; - synchronized (outerLockObject) { - synchronized (QV_LOCK) { - if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() > qv.getVersion()) { - LOG.error("setLastSeenQuorumVerifier called with stale config " - + qv.getVersion() - + ". Current version: " - + quorumVerifier.getVersion()); - } - // assuming that a version uniquely identifies a configuration, so if - // version is the same, nothing to do here. - if (lastSeenQuorumVerifier != null && lastSeenQuorumVerifier.getVersion() == qv.getVersion()) { - return; - } - lastSeenQuorumVerifier = qv; - if (qcm != null) { - connectNewPeers(qcm); - } - - if (writeToDisk) { - try { - String fileName = getNextDynamicConfigFilename(); - if (fileName != null) { - QuorumPeerConfig.writeDynamicConfig(fileName, qv, true); - } - } catch (IOException e) { - LOG.error("Error writing next dynamic config file to disk", e); - } - } - } - } - } - - public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { - synchronized (QV_LOCK) { - if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) { - // this is normal. For example - server found out about new config through FastLeaderElection gossiping - // and then got the same config in UPTODATE message so its already known - LOG.debug( - "{} setQuorumVerifier called with known or old config {}. Current version: {}", - getMyId(), - qv.getVersion(), - quorumVerifier.getVersion()); - return quorumVerifier; - } - QuorumVerifier prevQV = quorumVerifier; - quorumVerifier = qv; - if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion())) { - lastSeenQuorumVerifier = qv; - } - - if (writeToDisk) { - // some tests initialize QuorumPeer without a static config file - if (configFilename != null) { - try { - String dynamicConfigFilename = makeDynamicConfigFilename(qv.getVersion()); - QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, qv, false); - QuorumPeerConfig.editStaticConfig(configFilename, dynamicConfigFilename, needEraseClientInfoFromStaticConfig()); - } catch (IOException e) { - LOG.error("Error closing file", e); - } - } else { - LOG.info("writeToDisk == true but configFilename == null"); - } - } - - if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()) { - QuorumPeerConfig.deleteFile(getNextDynamicConfigFilename()); - } - QuorumServer qs = qv.getAllMembers().get(getMyId()); - if (qs != null) { - setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); - } - updateObserverMasterList(); - return prevQV; - } - } - - private String makeDynamicConfigFilename(long version) { - return configFilename + ".dynamic." + Long.toHexString(version); - } - - private boolean needEraseClientInfoFromStaticConfig() { - QuorumServer server = quorumVerifier.getAllMembers().get(getMyId()); - return (server != null && server.clientAddr != null && !server.isClientAddrFromStatic); - } - - /** - * Get an instance of LeaderElection - */ - public Election getElectionAlg() { - return electionAlg; - } - - /** - * Get the synclimit - */ - public int getSyncLimit() { - return syncLimit; - } - - /** - * Set the synclimit - */ - public void setSyncLimit(int syncLimit) { - LOG.info("syncLimit set to {}", syncLimit); - this.syncLimit = syncLimit; - } - - /** - * Get the connectToLearnerMasterLimit - */ - public int getConnectToLearnerMasterLimit() { - return connectToLearnerMasterLimit; - } - - /** - * Set the connectToLearnerMasterLimit - */ - public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) { - LOG.info("connectToLearnerMasterLimit set to {}", connectToLearnerMasterLimit); - this.connectToLearnerMasterLimit = connectToLearnerMasterLimit; - } - - /** - * The syncEnabled can also be set via a system property. - */ - public static final String SYNC_ENABLED = "zookeeper.observer.syncEnabled"; - - /** - * Return syncEnabled. - */ - public boolean getSyncEnabled() { - if (System.getProperty(SYNC_ENABLED) != null) { - LOG.info("{}={}", SYNC_ENABLED, Boolean.getBoolean(SYNC_ENABLED)); - return Boolean.getBoolean(SYNC_ENABLED); - } else { - return syncEnabled; - } - } - - /** - * Set syncEnabled. - * - * @param syncEnabled - */ - public void setSyncEnabled(boolean syncEnabled) { - this.syncEnabled = syncEnabled; - } - - /** - * Gets the election type - */ - public int getElectionType() { - return electionType; - } - - /** - * Sets the election type - */ - public void setElectionType(int electionType) { - this.electionType = electionType; - } - - public boolean getQuorumListenOnAllIPs() { - return quorumListenOnAllIPs; - } - - public void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) { - this.quorumListenOnAllIPs = quorumListenOnAllIPs; - } - - public void setCnxnFactory(ServerCnxnFactory cnxnFactory) { - this.cnxnFactory = cnxnFactory; - } - - public void setSecureCnxnFactory(ServerCnxnFactory secureCnxnFactory) { - this.secureCnxnFactory = secureCnxnFactory; - } - - public void setSslQuorum(boolean sslQuorum) { - if (sslQuorum) { - LOG.info("Using TLS encrypted quorum communication"); - } else { - LOG.info("Using insecure (non-TLS) quorum communication"); - } - this.sslQuorum = sslQuorum; - } - - public void setUsePortUnification(boolean shouldUsePortUnification) { - LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled"); - this.shouldUsePortUnification = shouldUsePortUnification; - } - - private void startServerCnxnFactory() { - if (cnxnFactory != null) { - cnxnFactory.start(); - } - if (secureCnxnFactory != null) { - secureCnxnFactory.start(); - } - } - - private void shutdownServerCnxnFactory() { - if (cnxnFactory != null) { - cnxnFactory.shutdown(); - } - if (secureCnxnFactory != null) { - secureCnxnFactory.shutdown(); - } - } - - // Leader and learner will control the zookeeper server and pass it into QuorumPeer. - public void setZooKeeperServer(ZooKeeperServer zks) { - if (cnxnFactory != null) { - cnxnFactory.setZooKeeperServer(zks); - } - if (secureCnxnFactory != null) { - secureCnxnFactory.setZooKeeperServer(zks); - } - } - - public void closeAllConnections() { - if (cnxnFactory != null) { - cnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); - } - if (secureCnxnFactory != null) { - secureCnxnFactory.closeAll(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); - } - } - - public int getClientPort() { - if (cnxnFactory != null) { - return cnxnFactory.getLocalPort(); - } - return -1; - } - - public int getSecureClientPort() { - if (secureCnxnFactory != null) { - return secureCnxnFactory.getLocalPort(); - } - return -1; - } - - public void setTxnFactory(FileTxnSnapLog factory) { - this.logFactory = factory; - } - - public FileTxnSnapLog getTxnFactory() { - return this.logFactory; - } - - /** - * set zk database for this node - * @param database - */ - public void setZKDatabase(ZKDatabase database) { - this.zkDb = database; - } - - protected ZKDatabase getZkDb() { - return zkDb; - } - - public synchronized void initConfigInZKDatabase() { - if (zkDb != null) { - zkDb.initConfigInZKDatabase(getQuorumVerifier()); - } - } - - public boolean isRunning() { - return running; - } - - /** - * get reference to QuorumCnxManager - */ - public QuorumCnxManager getQuorumCnxManager() { - return qcmRef.get(); - } - private long readLongFromFile(String name) throws IOException { - File file = new File(logFactory.getSnapDir(), name); - BufferedReader br = new BufferedReader(new FileReader(file)); - String line = ""; - try { - line = br.readLine(); - return Long.parseLong(line); - } catch (NumberFormatException e) { - throw new IOException("Found " + line + " in " + file); - } finally { - br.close(); - } - } - - private long acceptedEpoch = -1; - private long currentEpoch = -1; - - public static final String CURRENT_EPOCH_FILENAME = "currentEpoch"; - - public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch"; - - /** - * Write a long value to disk atomically. Either succeeds or an exception - * is thrown. - * @param name file name to write the long to - * @param value the long value to write to the named file - * @throws IOException if the file cannot be written atomically - */ - // visibleForTest - void writeLongToFile(String name, final long value) throws IOException { - File file = new File(logFactory.getSnapDir(), name); - new AtomicFileWritingIdiom(file, new WriterStatement() { - @Override - public void write(Writer bw) throws IOException { - bw.write(Long.toString(value)); - } - }); - } - - public long getCurrentEpoch() throws IOException { - if (currentEpoch == -1) { - currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); - } - return currentEpoch; - } - - public long getAcceptedEpoch() throws IOException { - if (acceptedEpoch == -1) { - acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); - } - return acceptedEpoch; - } - - public void setCurrentEpoch(long e) throws IOException { - writeLongToFile(CURRENT_EPOCH_FILENAME, e); - currentEpoch = e; - } - - public void setAcceptedEpoch(long e) throws IOException { - writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); - acceptedEpoch = e; - } - - public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) { - if (!isReconfigEnabled()) { - LOG.debug("Reconfig feature is disabled, skip reconfig processing."); - return false; - } - - InetSocketAddress oldClientAddr = getClientAddress(); - - // update last committed quorum verifier, write the new config to disk - // and restart leader election if config changed. - QuorumVerifier prevQV = setQuorumVerifier(qv, true); - - // There is no log record for the initial config, thus after syncing - // with leader - // /zookeeper/config is empty! it is also possible that last committed - // config is propagated during leader election - // without the propagation the corresponding log records. - // so we should explicitly do this (this is not necessary when we're - // already a Follower/Observer, only - // for Learner): - initConfigInZKDatabase(); - - if (prevQV.getVersion() < qv.getVersion() && !prevQV.equals(qv)) { - Map<Long, QuorumServer> newMembers = qv.getAllMembers(); - updateRemotePeerMXBeans(newMembers); - if (restartLE) { - restartLeaderElection(prevQV, qv); - } - - QuorumServer myNewQS = newMembers.get(getMyId()); - if (myNewQS != null && myNewQS.clientAddr != null && !myNewQS.clientAddr.equals(oldClientAddr)) { - cnxnFactory.reconfigure(myNewQS.clientAddr); - updateThreadName(); - } - - boolean roleChange = updateLearnerType(qv); - boolean leaderChange = false; - if (suggestedLeaderId != null) { - // zxid should be non-null too - leaderChange = updateVote(suggestedLeaderId, zxid); - } else { - long currentLeaderId = getCurrentVote().getId(); - QuorumServer myleaderInCurQV = prevQV.getVotingMembers().get(currentLeaderId); - QuorumServer myleaderInNewQV = qv.getVotingMembers().get(currentLeaderId); - leaderChange = (myleaderInCurQV == null - || myleaderInCurQV.addr == null - || myleaderInNewQV == null - || !myleaderInCurQV.addr.equals(myleaderInNewQV.addr)); - // we don't have a designated leader - need to go into leader - // election - reconfigFlagClear(); - } - - return roleChange || leaderChange; - } - return false; - - } - - private void updateRemotePeerMXBeans(Map<Long, QuorumServer> newMembers) { - Set<Long> existingMembers = new HashSet<>(newMembers.keySet()); - existingMembers.retainAll(jmxRemotePeerBean.keySet()); - for (Long id : existingMembers) { - RemotePeerBean rBean = jmxRemotePeerBean.get(id); - rBean.setQuorumServer(newMembers.get(id)); - } - - Set<Long> joiningMembers = new HashSet<>(newMembers.keySet()); - joiningMembers.removeAll(jmxRemotePeerBean.keySet()); - joiningMembers.remove(getMyId()); // remove self as it is local bean - for (Long id : joiningMembers) { - QuorumServer qs = newMembers.get(id); - RemotePeerBean rBean = new RemotePeerBean(this, qs); - try { - MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); - jmxRemotePeerBean.put(qs.id, rBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - } - } - - Set<Long> leavingMembers = new HashSet<>(jmxRemotePeerBean.keySet()); - leavingMembers.removeAll(newMembers.keySet()); - for (Long id : leavingMembers) { - RemotePeerBean rBean = jmxRemotePeerBean.remove(id); - try { - MBeanRegistry.getInstance().unregister(rBean); - } catch (Exception e) { - LOG.warn("Failed to unregister with JMX", e); - } - } - } - - private ArrayList<QuorumServer> observerMasters = new ArrayList<>(); - private void updateObserverMasterList() { - if (observerMasterPort <= 0) { - return; // observer masters not enabled - } - observerMasters.clear(); - StringBuilder sb = new StringBuilder(); - for (QuorumServer server : quorumVerifier.getVotingMembers().values()) { - InetAddress address = server.addr.getReachableOrOne().getAddress(); - InetSocketAddress addr = new InetSocketAddress(address, observerMasterPort); - observerMasters.add(new QuorumServer(server.id, addr)); - sb.append(addr).append(","); - } - LOG.info("Updated learner master list to be {}", sb.toString()); - Collections.shuffle(observerMasters); - // Reset the internal index of the observerMaster when - // the observerMaster List is refreshed - nextObserverMaster = 0; - } - - private boolean useObserverMasters() { - return getLearnerType() == LearnerType.OBSERVER && observerMasters.size() > 0; - } - - private int nextObserverMaster = 0; - private QuorumServer nextObserverMaster() { - if (nextObserverMaster >= observerMasters.size()) { - nextObserverMaster = 0; - // Add a reconnect delay only after the observer - // has exhausted trying to connect to all the masters - // from the observerMasterList - if (isRunning()) { - Observer.waitForReconnectDelay(); - } - } - return observerMasters.get(nextObserverMaster++); - } - - QuorumServer findLearnerMaster(QuorumServer leader) { - if (useObserverMasters()) { - return nextObserverMaster(); - } else { - // Add delay jitter to reduce the load on the leader - if (isRunning()) { - Observer.waitForReconnectDelay(); - } - return leader; - } - } - - /** - * Vet a given learner master's information. - * Allows specification by server id, ip only, or ip and port - */ - QuorumServer validateLearnerMaster(String desiredMaster) { - if (useObserverMasters()) { - Long sid; - try { - sid = Long.parseLong(desiredMaster); - } catch (NumberFormatException e) { - sid = null; - } - for (QuorumServer server : observerMasters) { - if (sid == null) { - for (InetSocketAddress address : server.addr.getAllAddresses()) { - String serverAddr = address.getAddress().getHostAddress() + ':' + address.getPort(); - if (serverAddr.startsWith(desiredMaster)) { - return server; - } - } - } else { - if (sid.equals(server.id)) { - return server; - } - } - } - if (sid == null) { - LOG.info("could not find learner master address={}", desiredMaster); - } else { - LOG.warn("could not find learner master sid={}", sid); - } - } else { - LOG.info("cannot validate request, observer masters not enabled"); - } - return null; - } - - private boolean updateLearnerType(QuorumVerifier newQV) { - //check if I'm an observer in new config - if (newQV.getObservingMembers().containsKey(getMyId())) { - if (getLearnerType() != LearnerType.OBSERVER) { - setLearnerType(LearnerType.OBSERVER); - LOG.info("Becoming an observer"); - reconfigFlagSet(); - return true; - } else { - return false; - } - } else if (newQV.getVotingMembers().containsKey(getMyId())) { - if (getLearnerType() != LearnerType.PARTICIPANT) { - setLearnerType(LearnerType.PARTICIPANT); - LOG.info("Becoming a voting participant"); - reconfigFlagSet(); - return true; - } else { - return false; - } - } - // I'm not in the view - if (getLearnerType() != LearnerType.PARTICIPANT) { - setLearnerType(LearnerType.PARTICIPANT); - LOG.info("Becoming a non-voting participant"); - reconfigFlagSet(); - return true; - } - return false; - } - - private boolean updateVote(long designatedLeader, long zxid) { - Vote currentVote = getCurrentVote(); - if (currentVote != null && designatedLeader != currentVote.getId()) { - setCurrentVote(new Vote(designatedLeader, zxid)); - reconfigFlagSet(); - LOG.warn("Suggested leader: {}", designatedLeader); - return true; - } - return false; - } - - /** - * Updates leader election info to avoid inconsistencies when - * a new server tries to join the ensemble. - * - * Here is the inconsistency scenario we try to solve by updating the peer - * epoch after following leader: - * - * Let's say we have an ensemble with 3 servers z1, z2 and z3. - * - * 1. z1, z2 were following z3 with peerEpoch to be 0xb8, the new epoch is - * 0xb9, aka current accepted epoch on disk. - * 2. z2 get restarted, which will use 0xb9 as it's peer epoch when loading - * the current accept epoch from disk. - * 3. z2 received notification from z1 and z3, which is following z3 with - * epoch 0xb8, so it started following z3 again with peer epoch 0xb8. - * 4. before z2 successfully connected to z3, z3 get restarted with new - * epoch 0xb9. - * 5. z2 will retry around a few round (default 5s) before giving up, - * meanwhile it will report z3 as leader. - * 6. z1 restarted, and looking with peer epoch 0xb9. - * 7. z1 voted z3, and z3 was elected as leader again with peer epoch 0xb9. - * 8. z2 successfully connected to z3 before giving up, but with peer - * epoch 0xb8. - * 9. z1 get restarted, looking for leader with peer epoch 0xba, but cannot - * join, because z2 is reporting peer epoch 0xb8, while z3 is reporting - * 0xb9. - * - * By updating the election vote after actually following leader, we can - * avoid this kind of stuck happened. - * - * Btw, the zxid and electionEpoch could be inconsistent because of the same - * reason, it's better to update these as well after syncing with leader, but - * that required protocol change which is non trivial. This problem is worked - * around by skipping comparing the zxid and electionEpoch when counting for - * votes for out of election servers during looking for leader. - * - * See https://issues.apache.org/jira/browse/ZOOKEEPER-1732 - */ - protected void updateElectionVote(long newEpoch) { - Vote currentVote = getCurrentVote(); - if (currentVote != null) { - setCurrentVote(new Vote(currentVote.getId(), currentVote.getZxid(), currentVote.getElectionEpoch(), newEpoch, currentVote - .getState())); - } - } - - private void updateThreadName() { - String plain = cnxnFactory != null - ? cnxnFactory.getLocalAddress() != null - ? formatInetAddr(cnxnFactory.getLocalAddress()) - : "disabled" - : "disabled"; - String secure = secureCnxnFactory != null ? formatInetAddr(secureCnxnFactory.getLocalAddress()) : "disabled"; - setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getMyId(), plain, secure)); - } - - /** - * Sets the time taken for leader election in milliseconds. - * - * @param electionTimeTaken time taken for leader election - */ - void setElectionTimeTaken(long electionTimeTaken) { - this.electionTimeTaken = electionTimeTaken; - } - - /** - * @return the time taken for leader election in milliseconds. - */ - long getElectionTimeTaken() { - return electionTimeTaken; - } - - void setQuorumServerSaslRequired(boolean serverSaslRequired) { - quorumServerSaslAuthRequired = serverSaslRequired; - LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, serverSaslRequired); - } - - void setQuorumLearnerSaslRequired(boolean learnerSaslRequired) { - quorumLearnerSaslAuthRequired = learnerSaslRequired; - LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, learnerSaslRequired); - } - - void setQuorumSaslEnabled(boolean enableAuth) { - quorumSaslEnableAuth = enableAuth; - if (!quorumSaslEnableAuth) { - LOG.info("QuorumPeer communication is not secured! (SASL auth disabled)"); - } else { - LOG.info("{} set to {}", QuorumAuth.QUORUM_SASL_AUTH_ENABLED, enableAuth); - } - } - - void setQuorumServicePrincipal(String servicePrincipal) { - quorumServicePrincipal = servicePrincipal; - LOG.info("{} set to {}", QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL, quorumServicePrincipal); - } - - void setQuorumLearnerLoginContext(String learnerContext) { - quorumLearnerLoginContext = learnerContext; - LOG.info("{} set to {}", QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT, quorumLearnerLoginContext); - } - - void setQuorumServerLoginContext(String serverContext) { - quorumServerLoginContext = serverContext; - LOG.info("{} set to {}", QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT, quorumServerLoginContext); - } - - void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) { - if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) { - quorumCnxnThreadsSize = qCnxnThreadsSize; - } - LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize); - } - - boolean isQuorumSaslAuthEnabled() { - return quorumSaslEnableAuth; - } - - private boolean isQuorumServerSaslAuthRequired() { - return quorumServerSaslAuthRequired; - } - - private boolean isQuorumLearnerSaslAuthRequired() { - return quorumLearnerSaslAuthRequired; - } - - public QuorumCnxManager createCnxnManager() { - int timeout = quorumCnxnTimeoutMs > 0 ? quorumCnxnTimeoutMs : this.tickTime * this.syncLimit; - LOG.info("Using {}ms as the quorum cnxn socket timeout", timeout); - return new QuorumCnxManager( - this, - this.getMyId(), - this.getView(), - this.authServer, - this.authLearner, - timeout, - this.getQuorumListenOnAllIPs(), - this.quorumCnxnThreadsSize, - this.isQuorumSaslAuthEnabled()); - } - - boolean isLeader(long id) { - Vote vote = getCurrentVote(); - return vote != null && id == vote.getId(); - } - - public boolean isReconfigEnabled() { - return reconfigEnabled; - } - - @InterfaceAudience.Private - /** - * This is a metric that depends on the status of the peer. - */ public Integer getSynced_observers_metric() { - if (leader != null) { - return leader.getObservingLearners().size(); - } else if (follower != null) { - return follower.getSyncedObserverSize(); - } else { - return null; - } - } - - /** - * Create a new QuorumPeer and apply all the values per the already-parsed config. - * - * @param config The appertained quorum peer config. - * @return A QuorumPeer instantiated with specified peer config. Note this peer - * is not fully initialized; caller should finish initialization through - * additional configurations (connection factory settings, etc). - * - * @throws IOException - */ - public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOException { - QuorumPeer quorumPeer = new QuorumPeer(); - quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); - quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); - quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); - quorumPeer.setElectionType(config.getElectionAlg()); - quorumPeer.setMyid(config.getServerId()); - quorumPeer.setTickTime(config.getTickTime()); - quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); - quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); - quorumPeer.setInitLimit(config.getInitLimit()); - quorumPeer.setSyncLimit(config.getSyncLimit()); - quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); - quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); - quorumPeer.setConfigFileName(config.getConfigFilename()); - quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); - quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); - quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); - if (config.getLastSeenQuorumVerifier() != null) { - quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); - } - quorumPeer.initConfigInZKDatabase(); - quorumPeer.setSslQuorum(config.isSslQuorum()); - quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); - quorumPeer.setLearnerType(config.getPeerType()); - quorumPeer.setSyncEnabled(config.getSyncEnabled()); - quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); - if (config.sslQuorumReloadCertFiles) { - quorumPeer.getX509Util().enableCertFileReloading(); - } - quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); - quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); - quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); - - // sets quorum sasl authentication configurations - quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); - if (quorumPeer.isQuorumSaslAuthEnabled()) { - quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); - quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); - quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); - quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); - quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); - } - quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); - - if (config.jvmPauseMonitorToRun) { - quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); - } - - return quorumPeer; - } - -} |