diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java')
-rw-r--r-- | zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java | 928 |
1 files changed, 0 insertions, 928 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java deleted file mode 100644 index 3c7b2148400..00000000000 --- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ /dev/null @@ -1,928 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server.quorum; - -import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLSocket; -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.InputArchive; -import org.apache.jute.OutputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.ZooDefs.OpCode; -import org.apache.zookeeper.common.Time; -import org.apache.zookeeper.common.X509Exception; -import org.apache.zookeeper.server.ExitCode; -import org.apache.zookeeper.server.Request; -import org.apache.zookeeper.server.ServerCnxn; -import org.apache.zookeeper.server.ServerMetrics; -import org.apache.zookeeper.server.TxnLogEntry; -import org.apache.zookeeper.server.ZooTrace; -import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; -import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import org.apache.zookeeper.server.util.ConfigUtils; -import org.apache.zookeeper.server.util.MessageTracker; -import org.apache.zookeeper.server.util.SerializeUtils; -import org.apache.zookeeper.server.util.ZxidUtils; -import org.apache.zookeeper.txn.SetDataTxn; -import org.apache.zookeeper.txn.TxnDigest; -import org.apache.zookeeper.txn.TxnHeader; -import org.apache.zookeeper.util.ServiceUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is the superclass of two of the three main actors in a ZK - * ensemble: Followers and Observers. Both Followers and Observers share - * a good deal of code which is moved into Peer to avoid duplication. - */ -public class Learner { - - static class PacketInFlight { - - TxnHeader hdr; - Record rec; - TxnDigest digest; - - } - - QuorumPeer self; - LearnerZooKeeperServer zk; - - protected BufferedOutputStream bufferedOutput; - - protected Socket sock; - protected MultipleAddresses leaderAddr; - protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false); - - /** - * Socket getter - */ - public Socket getSocket() { - return sock; - } - - LearnerSender sender = null; - protected InputArchive leaderIs; - protected OutputArchive leaderOs; - /** the protocol version of the leader */ - protected int leaderProtocolVersion = 0x01; - - private static final int BUFFERED_MESSAGE_SIZE = 10; - protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); - - protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); - - /** - * Time to wait after connection attempt with the Leader or LearnerMaster before this - * Learner tries to connect again. - */ - private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100); - - private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); - - public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending"; - private static boolean asyncSending = - Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING)); - public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync"; - public static final boolean closeSocketAsync = Boolean - .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC)); - - static { - LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); - LOG.info("TCP NoDelay set to: {}", nodelay); - LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); - LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); - } - - final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>(); - - public int getPendingRevalidationsCount() { - return pendingRevalidations.size(); - } - - // for testing - protected static void setAsyncSending(boolean newMode) { - asyncSending = newMode; - LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); - - } - protected static boolean getAsyncSending() { - return asyncSending; - } - /** - * validate a session for a client - * - * @param clientId - * the client to be revalidated - * @param timeout - * the timeout for which the session is valid - * @throws IOException - */ - void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { - LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - dos.writeLong(clientId); - dos.writeInt(timeout); - dos.close(); - QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); - pendingRevalidations.put(clientId, cnxn); - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage( - LOG, - ZooTrace.SESSION_TRACE_MASK, - "To validate session 0x" + Long.toHexString(clientId)); - } - writePacket(qp, true); - } - - /** - * write a packet to the leader. - * - * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. - * When packets are sent synchronously, writing is done within a synchronization block. - * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. - * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. - * So we have only one thread writing to leaderOs at a time in either case. - * - * @param pp - * the proposal packet to be sent to the leader - * @throws IOException - */ - void writePacket(QuorumPacket pp, boolean flush) throws IOException { - if (asyncSending) { - sender.queuePacket(pp); - } else { - writePacketNow(pp, flush); - } - } - - void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { - synchronized (leaderOs) { - if (pp != null) { - messageTracker.trackSent(pp.getType()); - leaderOs.writeRecord(pp, "packet"); - } - if (flush) { - bufferedOutput.flush(); - } - } - } - - /** - * Start thread that will forward any packet in the queue to the leader - */ - protected void startSendingThread() { - sender = new LearnerSender(this); - sender.start(); - } - - /** - * read a packet from the leader - * - * @param pp - * the packet to be instantiated - * @throws IOException - */ - void readPacket(QuorumPacket pp) throws IOException { - synchronized (leaderIs) { - leaderIs.readRecord(pp, "packet"); - messageTracker.trackReceived(pp.getType()); - } - if (LOG.isTraceEnabled()) { - final long traceMask = - (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK - : ZooTrace.SERVER_PACKET_TRACE_MASK; - - ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); - } - } - - /** - * send a request packet to the leader - * - * @param request - * the request from the client - * @throws IOException - */ - void request(Request request) throws IOException { - if (request.isThrottled()) { - LOG.error("Throttled request sent to leader: {}. Exiting", request); - ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); - } - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream oa = new DataOutputStream(baos); - oa.writeLong(request.sessionId); - oa.writeInt(request.cxid); - oa.writeInt(request.type); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - oa.write(payload); - } - oa.close(); - QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); - writePacket(qp, true); - } - - /** - * Returns the address of the node we think is the leader. - */ - protected QuorumServer findLeader() { - QuorumServer leaderServer = null; - // Find the leader by id - Vote current = self.getCurrentVote(); - for (QuorumServer s : self.getView().values()) { - if (s.id == current.getId()) { - // Ensure we have the leader's correct IP address before - // attempting to connect. - s.recreateSocketAddresses(); - leaderServer = s; - break; - } - } - if (leaderServer == null) { - LOG.warn("Couldn't find the leader with id = {}", current.getId()); - } - return leaderServer; - } - - /** - * Overridable helper method to return the System.nanoTime(). - * This method behaves identical to System.nanoTime(). - */ - protected long nanoTime() { - return System.nanoTime(); - } - - /** - * Overridable helper method to simply call sock.connect(). This can be - * overriden in tests to fake connection success/failure for connectToLeader. - */ - protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException { - sock.connect(addr, timeout); - } - - /** - * Establish a connection with the LearnerMaster found by findLearnerMaster. - * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. - * Retries until either initLimit time has elapsed or 5 tries have happened. - * @param multiAddr - the address of the Peer to connect to. - * @throws IOException - if the socket connection fails on the 5th attempt - * if there is an authentication failure while connecting to leader - */ - protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException { - - this.leaderAddr = multiAddr; - Set<InetSocketAddress> addresses; - if (self.isMultiAddressReachabilityCheckEnabled()) { - // even if none of the addresses are reachable, we want to try to establish connection - // see ZOOKEEPER-3758 - addresses = multiAddr.getAllReachableAddressesOrAll(); - } else { - addresses = multiAddr.getAllAddresses(); - } - ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); - CountDownLatch latch = new CountDownLatch(addresses.size()); - AtomicReference<Socket> socket = new AtomicReference<>(null); - addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit); - - try { - latch.await(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while trying to connect to Leader", e); - } finally { - executor.shutdown(); - try { - if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - LOG.error("not all the LeaderConnector terminated properly"); - } - } catch (InterruptedException ie) { - LOG.error("Interrupted while terminating LeaderConnector executor.", ie); - } - } - - if (socket.get() == null) { - throw new IOException("Failed connect to " + multiAddr); - } else { - sock = socket.get(); - sockBeingClosed.set(false); - } - - self.authLearner.authenticate(sock, hostname); - - leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); - bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); - leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); - if (asyncSending) { - startSendingThread(); - } - } - - class LeaderConnector implements Runnable { - - private AtomicReference<Socket> socket; - private InetSocketAddress address; - private CountDownLatch latch; - - LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) { - this.address = address; - this.socket = socket; - this.latch = latch; - } - - @Override - public void run() { - try { - Thread.currentThread().setName("LeaderConnector-" + address); - Socket sock = connectToLeader(); - - if (sock != null && sock.isConnected()) { - if (socket.compareAndSet(null, sock)) { - LOG.info("Successfully connected to leader, using address: {}", address); - } else { - LOG.info("Connection to the leader is already established, close the redundant connection"); - sock.close(); - } - } - - } catch (Exception e) { - LOG.error("Failed connect to {}", address, e); - } finally { - latch.countDown(); - } - } - - private Socket connectToLeader() throws IOException, X509Exception, InterruptedException { - Socket sock = createSocket(); - - // leader connection timeout defaults to tickTime * initLimit - int connectTimeout = self.tickTime * self.initLimit; - - // but if connectToLearnerMasterLimit is specified, use that value to calculate - // timeout instead of using the initLimit value - if (self.connectToLearnerMasterLimit > 0) { - connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; - } - - int remainingTimeout; - long startNanoTime = nanoTime(); - - for (int tries = 0; tries < 5 && socket.get() == null; tries++) { - try { - // recalculate the init limit time because retries sleep for 1000 milliseconds - remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); - if (remainingTimeout <= 0) { - LOG.error("connectToLeader exceeded on retries."); - throw new IOException("connectToLeader exceeded on retries."); - } - - sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout)); - if (self.isSslQuorum()) { - ((SSLSocket) sock).startHandshake(); - } - sock.setTcpNoDelay(nodelay); - break; - } catch (IOException e) { - remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); - - if (remainingTimeout <= leaderConnectDelayDuringRetryMs) { - LOG.error( - "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}", - tries, - remainingTimeout, - address, - e); - throw e; - } else if (tries >= 4) { - LOG.error( - "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}", - tries, - remainingTimeout, - address, - e); - throw e; - } else { - LOG.warn( - "Unexpected exception, tries={}, remaining init limit={}, connecting to {}", - tries, - remainingTimeout, - address, - e); - sock = createSocket(); - } - } - Thread.sleep(leaderConnectDelayDuringRetryMs); - } - - return sock; - } - } - - /** - * Creating a simple or and SSL socket. - * This can be overridden in tests to fake already connected sockets for connectToLeader. - */ - protected Socket createSocket() throws X509Exception, IOException { - Socket sock; - if (self.isSslQuorum()) { - sock = self.getX509Util().createSSLSocket(); - } else { - sock = new Socket(); - } - sock.setSoTimeout(self.tickTime * self.initLimit); - return sock; - } - - /** - * Once connected to the leader or learner master, perform the handshake - * protocol to establish a following / observing connection. - * @param pktType - * @return the zxid the Leader sends for synchronization purposes. - * @throws IOException - */ - protected long registerWithLeader(int pktType) throws IOException { - /* - * Send follower info, including last zxid and sid - */ - long lastLoggedZxid = self.getLastLoggedZxid(); - QuorumPacket qp = new QuorumPacket(); - qp.setType(pktType); - qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); - - /* - * Add sid to payload - */ - LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); - ByteArrayOutputStream bsid = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); - boa.writeRecord(li, "LearnerInfo"); - qp.setData(bsid.toByteArray()); - - writePacket(qp, true); - readPacket(qp); - final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); - if (qp.getType() == Leader.LEADERINFO) { - // we are connected to a 1.0 server so accept the new epoch and read the next packet - leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); - byte[] epochBytes = new byte[4]; - final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); - if (newEpoch > self.getAcceptedEpoch()) { - wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); - self.setAcceptedEpoch(newEpoch); - } else if (newEpoch == self.getAcceptedEpoch()) { - // since we have already acked an epoch equal to the leaders, we cannot ack - // again, but we still need to send our lastZxid to the leader so that we can - // sync with it if it does assume leadership of the epoch. - // the -1 indicates that this reply should not count as an ack for the new epoch - wrappedEpochBytes.putInt(-1); - } else { - throw new IOException("Leaders epoch, " - + newEpoch - + " is less than accepted epoch, " - + self.getAcceptedEpoch()); - } - QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); - writePacket(ackNewEpoch, true); - return ZxidUtils.makeZxid(newEpoch, 0); - } else { - if (newEpoch > self.getAcceptedEpoch()) { - self.setAcceptedEpoch(newEpoch); - } - if (qp.getType() != Leader.NEWLEADER) { - LOG.error("First packet should have been NEWLEADER"); - throw new IOException("First packet should have been NEWLEADER"); - } - return qp.getZxid(); - } - } - - /** - * Finally, synchronize our history with the Leader (if Follower) - * or the LearnerMaster (if Observer). - * @param newLeaderZxid - * @throws IOException - * @throws InterruptedException - */ - protected void syncWithLeader(long newLeaderZxid) throws Exception { - long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - QuorumVerifier newLeaderQV = null; - - class SyncHelper { - - // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot. - // For SNAP and TRUNC the snapshot is needed to save that history. - boolean willSnapshot = true; - boolean syncSnapshot = false; - - // PROPOSALs received during sync, for matching up with COMMITs. - Deque<PacketInFlight> proposals = new ArrayDeque<>(); - - // PROPOSALs we delay forwarding to the ZK server until sync is done. - Deque<PacketInFlight> delayedProposals = new ArrayDeque<>(); - - // COMMITs we delay forwarding to the ZK server until sync is done. - Deque<Long> delayedCommits = new ArrayDeque<>(); - - void syncSnapshot() { - syncSnapshot = true; - } - - void noSnapshot() { - willSnapshot = false; - } - - void propose(PacketInFlight pif) { - proposals.add(pif); - delayedProposals.add(pif); - } - - PacketInFlight nextProposal() { - return proposals.peekFirst(); - } - - void commit() { - PacketInFlight packet = proposals.remove(); - if (willSnapshot) { - zk.processTxn(packet.hdr, packet.rec); - delayedProposals.remove(); - } else { - delayedCommits.add(packet.hdr.getZxid()); - } - } - - void writeState() throws IOException, InterruptedException { - // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, - // since this allows the leader to apply those transactions to its served state: - if (willSnapshot) { - zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions, - willSnapshot = false; // but anything after this needs to go to the transaction log; or - } - - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - - // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing. - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first. - fzk.syncProcessor.setDelayForwarding(true); - for (PacketInFlight p : delayedProposals) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - delayedProposals.clear(); - fzk.syncProcessor.syncFlush(); - } - - self.setCurrentEpoch(newEpoch); - } - - void flushAcks() throws InterruptedException { - if (zk instanceof FollowerZooKeeperServer) { - // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(false); - fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK. - } - } - - void applyDelayedPackets() { - // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : delayedProposals) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - for (Long zxid : delayedCommits) { - fzk.commit(zxid); - } - } else if (zk instanceof ObserverZooKeeperServer) { - ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : delayedProposals) { - Long zxid = delayedCommits.peekFirst(); - if (p.hdr.getZxid() != zxid) { - // log warning message if there is no matching commit - // old leader send outstanding proposal to observer - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(zxid), - Long.toHexString(p.hdr.getZxid())); - continue; - } - delayedCommits.remove(); - Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); - request.setTxnDigest(p.digest); - ozk.commitRequest(request); - } - } else { - // New server type need to handle in-flight packets - throw new UnsupportedOperationException("Unknown server type"); - } - } - - } - - SyncHelper helper = new SyncHelper(); - QuorumPacket qp = new QuorumPacket(); - readPacket(qp); - synchronized (zk) { - if (qp.getType() == Leader.DIFF) { - LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); - self.setSyncMode(QuorumPeer.SyncMode.DIFF); - if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { - LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); - helper.syncSnapshot(); - } else { - helper.noSnapshot(); - } - } else if (qp.getType() == Leader.SNAP) { - self.setSyncMode(QuorumPeer.SyncMode.SNAP); - LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); - // The leader is going to dump the database - zk.getZKDatabase().deserializeSnapshot(leaderIs); - // ZOOKEEPER-2819: overwrite config node content extracted - // from leader snapshot with local config, to avoid potential - // inconsistency of config node content during rolling restart. - if (!self.isReconfigEnabled()) { - LOG.debug("Reset config node content from local config after deserialization of snapshot."); - zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); - } - String signature = leaderIs.readString("signature"); - if (!signature.equals("BenWasHere")) { - LOG.error("Missing signature. Got {}", signature); - throw new IOException("Missing signature"); - } - zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - - // Immediately persist the latest snapshot when there is txn log gap - helper.syncSnapshot(); - } else if (qp.getType() == Leader.TRUNC) { - // We need to truncate the log to the lastZxid of the leader - self.setSyncMode(QuorumPeer.SyncMode.TRUNC); - LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); - boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); - if (!truncated) { - LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); - ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); - } - zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - } else { - LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); - ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); - } - zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); - zk.createSessionTracker(); - - - // we are now going to start getting transactions to apply followed by an UPTODATE - long lastQueued = 0; - TxnLogEntry logEntry; - outerLoop: - while (self.isRunning()) { - readPacket(qp); - switch (qp.getType()) { - case Leader.PROPOSAL: - PacketInFlight pif = new PacketInFlight(); - logEntry = SerializeUtils.deserializeTxn(qp.getData()); - pif.hdr = logEntry.getHeader(); - pif.rec = logEntry.getTxn(); - pif.digest = logEntry.getDigest(); - if (pif.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", - Long.toHexString(pif.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); - } - lastQueued = pif.hdr.getZxid(); - - if (pif.hdr.getType() == OpCode.reconfig) { - SetDataTxn setDataTxn = (SetDataTxn) pif.rec; - QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); - self.setLastSeenQuorumVerifier(qv, true); - } - helper.propose(pif); - break; - case Leader.COMMIT: - case Leader.COMMITANDACTIVATE: - pif = helper.nextProposal(); - if (pif.hdr.getZxid() != qp.getZxid()) { - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(qp.getZxid()), - Long.toHexString(pif.hdr.getZxid())); - } else { - if (qp.getType() == Leader.COMMITANDACTIVATE) { - tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid()); - } - helper.commit(); - } - break; - case Leader.INFORM: - case Leader.INFORMANDACTIVATE: - PacketInFlight packet = new PacketInFlight(); - if (qp.getType() == Leader.INFORMANDACTIVATE) { - ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); - long suggestedLeaderId = buffer.getLong(); - byte[] remainingData = new byte[buffer.remaining()]; - buffer.get(remainingData); - logEntry = SerializeUtils.deserializeTxn(remainingData); - packet.hdr = logEntry.getHeader(); - packet.rec = logEntry.getTxn(); - packet.digest = logEntry.getDigest(); - tryReconfig(packet, suggestedLeaderId, qp.getZxid()); - } else { - logEntry = SerializeUtils.deserializeTxn(qp.getData()); - packet.rec = logEntry.getTxn(); - packet.hdr = logEntry.getHeader(); - packet.digest = logEntry.getDigest(); - // Log warning message if txn comes out-of-order - if (packet.hdr.getZxid() != lastQueued + 1) { - LOG.warn( - "Got zxid 0x{} expected 0x{}", - Long.toHexString(packet.hdr.getZxid()), - Long.toHexString(lastQueued + 1)); - } - lastQueued = packet.hdr.getZxid(); - } - helper.propose(packet); - helper.commit(); - break; - case Leader.UPTODATE: - LOG.info("Learner received UPTODATE message"); - if (newLeaderQV != null) { - boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } - } - helper.flushAcks(); - self.setZooKeeperServer(zk); - self.adminServer.setZooKeeperServer(zk); - break outerLoop; - case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery - LOG.info("Learner received NEWLEADER message"); - if (qp.getData() != null && qp.getData().length > 1) { - try { - QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8)); - self.setLastSeenQuorumVerifier(qv, true); - newLeaderQV = qv; - } catch (Exception e) { - e.printStackTrace(); - } - } - - helper.writeState(); - writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); - break; - } - } - } - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); - ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); - writePacket(ack, true); - zk.startServing(); - /* - * Update the election vote here to ensure that all members of the - * ensemble report the same vote to new servers that start up and - * send leader election notifications to the ensemble. - * - * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 - */ - self.updateElectionVote(newEpoch); - - helper.applyDelayedPackets(); - } - - private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig(qv, newLeader, zxid, true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } - } - - protected void revalidate(QuorumPacket qp) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); - DataInputStream dis = new DataInputStream(bis); - long sessionId = dis.readLong(); - boolean valid = dis.readBoolean(); - ServerCnxn cnxn = pendingRevalidations.remove(sessionId); - if (cnxn == null) { - LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId)); - } else { - zk.finishSessionInit(cnxn, valid); - } - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage( - LOG, - ZooTrace.SESSION_TRACE_MASK, - "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid); - } - } - - protected void ping(QuorumPacket qp) throws IOException { - // Send back the ping with our session data - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); - Map<Long, Integer> touchTable = zk.getTouchSnapshot(); - for (Entry<Long, Integer> entry : touchTable.entrySet()) { - dos.writeLong(entry.getKey()); - dos.writeInt(entry.getValue()); - } - - QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); - writePacket(pingReply, true); - } - - /** - * Shutdown the Peer - */ - public void shutdown() { - self.setZooKeeperServer(null); - self.closeAllConnections(); - self.adminServer.setZooKeeperServer(null); - - if (sender != null) { - sender.shutdown(); - } - - closeSocket(); - // shutdown previous zookeeper - if (zk != null) { - // If we haven't finished SNAP sync, force fully shutdown - // to avoid potential inconsistency - zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP)); - } - } - - boolean isRunning() { - return self.isRunning() && zk.isRunning(); - } - - void closeSocket() { - if (sock != null) { - if (sockBeingClosed.compareAndSet(false, true)) { - if (closeSocketAsync) { - final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId()); - closingThread.setDaemon(true); - closingThread.start(); - } else { - closeSockSync(); - } - } - } - } - - void closeSockSync() { - try { - long startTime = Time.currentElapsedTime(); - if (sock != null) { - sock.close(); - sock = null; - } - ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime); - } catch (IOException e) { - LOG.warn("Ignoring error closing connection to leader", e); - } - } - -} |