aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java')
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java928
1 files changed, 0 insertions, 928 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
deleted file mode 100644
index 3c7b2148400..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ /dev/null
@@ -1,928 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLSocket;
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.server.ExitCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.TxnLogEntry;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.util.ConfigUtils;
-import org.apache.zookeeper.server.util.MessageTracker;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.apache.zookeeper.util.ServiceUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the superclass of two of the three main actors in a ZK
- * ensemble: Followers and Observers. Both Followers and Observers share
- * a good deal of code which is moved into Peer to avoid duplication.
- */
-public class Learner {
-
- static class PacketInFlight {
-
- TxnHeader hdr;
- Record rec;
- TxnDigest digest;
-
- }
-
- QuorumPeer self;
- LearnerZooKeeperServer zk;
-
- protected BufferedOutputStream bufferedOutput;
-
- protected Socket sock;
- protected MultipleAddresses leaderAddr;
- protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
-
- /**
- * Socket getter
- */
- public Socket getSocket() {
- return sock;
- }
-
- LearnerSender sender = null;
- protected InputArchive leaderIs;
- protected OutputArchive leaderOs;
- /** the protocol version of the leader */
- protected int leaderProtocolVersion = 0x01;
-
- private static final int BUFFERED_MESSAGE_SIZE = 10;
- protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
-
- protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
-
- /**
- * Time to wait after connection attempt with the Leader or LearnerMaster before this
- * Learner tries to connect again.
- */
- private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
-
- private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
-
- public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
- private static boolean asyncSending =
- Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
- public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
- public static final boolean closeSocketAsync = Boolean
- .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
-
- static {
- LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
- LOG.info("TCP NoDelay set to: {}", nodelay);
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
- LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
- }
-
- final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>();
-
- public int getPendingRevalidationsCount() {
- return pendingRevalidations.size();
- }
-
- // for testing
- protected static void setAsyncSending(boolean newMode) {
- asyncSending = newMode;
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
-
- }
- protected static boolean getAsyncSending() {
- return asyncSending;
- }
- /**
- * validate a session for a client
- *
- * @param clientId
- * the client to be revalidated
- * @param timeout
- * the timeout for which the session is valid
- * @throws IOException
- */
- void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
- LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeLong(clientId);
- dos.writeInt(timeout);
- dos.close();
- QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
- pendingRevalidations.put(clientId, cnxn);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "To validate session 0x" + Long.toHexString(clientId));
- }
- writePacket(qp, true);
- }
-
- /**
- * write a packet to the leader.
- *
- * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
- * When packets are sent synchronously, writing is done within a synchronization block.
- * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
- * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
- * So we have only one thread writing to leaderOs at a time in either case.
- *
- * @param pp
- * the proposal packet to be sent to the leader
- * @throws IOException
- */
- void writePacket(QuorumPacket pp, boolean flush) throws IOException {
- if (asyncSending) {
- sender.queuePacket(pp);
- } else {
- writePacketNow(pp, flush);
- }
- }
-
- void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
- synchronized (leaderOs) {
- if (pp != null) {
- messageTracker.trackSent(pp.getType());
- leaderOs.writeRecord(pp, "packet");
- }
- if (flush) {
- bufferedOutput.flush();
- }
- }
- }
-
- /**
- * Start thread that will forward any packet in the queue to the leader
- */
- protected void startSendingThread() {
- sender = new LearnerSender(this);
- sender.start();
- }
-
- /**
- * read a packet from the leader
- *
- * @param pp
- * the packet to be instantiated
- * @throws IOException
- */
- void readPacket(QuorumPacket pp) throws IOException {
- synchronized (leaderIs) {
- leaderIs.readRecord(pp, "packet");
- messageTracker.trackReceived(pp.getType());
- }
- if (LOG.isTraceEnabled()) {
- final long traceMask =
- (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
- : ZooTrace.SERVER_PACKET_TRACE_MASK;
-
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
- }
- }
-
- /**
- * send a request packet to the leader
- *
- * @param request
- * the request from the client
- * @throws IOException
- */
- void request(Request request) throws IOException {
- if (request.isThrottled()) {
- LOG.error("Throttled request sent to leader: {}. Exiting", request);
- ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream oa = new DataOutputStream(baos);
- oa.writeLong(request.sessionId);
- oa.writeInt(request.cxid);
- oa.writeInt(request.type);
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- oa.write(payload);
- }
- oa.close();
- QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
- writePacket(qp, true);
- }
-
- /**
- * Returns the address of the node we think is the leader.
- */
- protected QuorumServer findLeader() {
- QuorumServer leaderServer = null;
- // Find the leader by id
- Vote current = self.getCurrentVote();
- for (QuorumServer s : self.getView().values()) {
- if (s.id == current.getId()) {
- // Ensure we have the leader's correct IP address before
- // attempting to connect.
- s.recreateSocketAddresses();
- leaderServer = s;
- break;
- }
- }
- if (leaderServer == null) {
- LOG.warn("Couldn't find the leader with id = {}", current.getId());
- }
- return leaderServer;
- }
-
- /**
- * Overridable helper method to return the System.nanoTime().
- * This method behaves identical to System.nanoTime().
- */
- protected long nanoTime() {
- return System.nanoTime();
- }
-
- /**
- * Overridable helper method to simply call sock.connect(). This can be
- * overriden in tests to fake connection success/failure for connectToLeader.
- */
- protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
- sock.connect(addr, timeout);
- }
-
- /**
- * Establish a connection with the LearnerMaster found by findLearnerMaster.
- * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
- * Retries until either initLimit time has elapsed or 5 tries have happened.
- * @param multiAddr - the address of the Peer to connect to.
- * @throws IOException - if the socket connection fails on the 5th attempt
- * if there is an authentication failure while connecting to leader
- */
- protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
-
- this.leaderAddr = multiAddr;
- Set<InetSocketAddress> addresses;
- if (self.isMultiAddressReachabilityCheckEnabled()) {
- // even if none of the addresses are reachable, we want to try to establish connection
- // see ZOOKEEPER-3758
- addresses = multiAddr.getAllReachableAddressesOrAll();
- } else {
- addresses = multiAddr.getAllAddresses();
- }
- ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
- CountDownLatch latch = new CountDownLatch(addresses.size());
- AtomicReference<Socket> socket = new AtomicReference<>(null);
- addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while trying to connect to Leader", e);
- } finally {
- executor.shutdown();
- try {
- if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.error("not all the LeaderConnector terminated properly");
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
- }
- }
-
- if (socket.get() == null) {
- throw new IOException("Failed connect to " + multiAddr);
- } else {
- sock = socket.get();
- sockBeingClosed.set(false);
- }
-
- self.authLearner.authenticate(sock, hostname);
-
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
- if (asyncSending) {
- startSendingThread();
- }
- }
-
- class LeaderConnector implements Runnable {
-
- private AtomicReference<Socket> socket;
- private InetSocketAddress address;
- private CountDownLatch latch;
-
- LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) {
- this.address = address;
- this.socket = socket;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- try {
- Thread.currentThread().setName("LeaderConnector-" + address);
- Socket sock = connectToLeader();
-
- if (sock != null && sock.isConnected()) {
- if (socket.compareAndSet(null, sock)) {
- LOG.info("Successfully connected to leader, using address: {}", address);
- } else {
- LOG.info("Connection to the leader is already established, close the redundant connection");
- sock.close();
- }
- }
-
- } catch (Exception e) {
- LOG.error("Failed connect to {}", address, e);
- } finally {
- latch.countDown();
- }
- }
-
- private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
- Socket sock = createSocket();
-
- // leader connection timeout defaults to tickTime * initLimit
- int connectTimeout = self.tickTime * self.initLimit;
-
- // but if connectToLearnerMasterLimit is specified, use that value to calculate
- // timeout instead of using the initLimit value
- if (self.connectToLearnerMasterLimit > 0) {
- connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
- }
-
- int remainingTimeout;
- long startNanoTime = nanoTime();
-
- for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
- try {
- // recalculate the init limit time because retries sleep for 1000 milliseconds
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
- if (remainingTimeout <= 0) {
- LOG.error("connectToLeader exceeded on retries.");
- throw new IOException("connectToLeader exceeded on retries.");
- }
-
- sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
- if (self.isSslQuorum()) {
- ((SSLSocket) sock).startHandshake();
- }
- sock.setTcpNoDelay(nodelay);
- break;
- } catch (IOException e) {
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
-
- if (remainingTimeout <= leaderConnectDelayDuringRetryMs) {
- LOG.error(
- "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else if (tries >= 4) {
- LOG.error(
- "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else {
- LOG.warn(
- "Unexpected exception, tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- sock = createSocket();
- }
- }
- Thread.sleep(leaderConnectDelayDuringRetryMs);
- }
-
- return sock;
- }
- }
-
- /**
- * Creating a simple or and SSL socket.
- * This can be overridden in tests to fake already connected sockets for connectToLeader.
- */
- protected Socket createSocket() throws X509Exception, IOException {
- Socket sock;
- if (self.isSslQuorum()) {
- sock = self.getX509Util().createSSLSocket();
- } else {
- sock = new Socket();
- }
- sock.setSoTimeout(self.tickTime * self.initLimit);
- return sock;
- }
-
- /**
- * Once connected to the leader or learner master, perform the handshake
- * protocol to establish a following / observing connection.
- * @param pktType
- * @return the zxid the Leader sends for synchronization purposes.
- * @throws IOException
- */
- protected long registerWithLeader(int pktType) throws IOException {
- /*
- * Send follower info, including last zxid and sid
- */
- long lastLoggedZxid = self.getLastLoggedZxid();
- QuorumPacket qp = new QuorumPacket();
- qp.setType(pktType);
- qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
-
- /*
- * Add sid to payload
- */
- LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
- ByteArrayOutputStream bsid = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
- boa.writeRecord(li, "LearnerInfo");
- qp.setData(bsid.toByteArray());
-
- writePacket(qp, true);
- readPacket(qp);
- final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
- if (qp.getType() == Leader.LEADERINFO) {
- // we are connected to a 1.0 server so accept the new epoch and read the next packet
- leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
- byte[] epochBytes = new byte[4];
- final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
- if (newEpoch > self.getAcceptedEpoch()) {
- wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
- self.setAcceptedEpoch(newEpoch);
- } else if (newEpoch == self.getAcceptedEpoch()) {
- // since we have already acked an epoch equal to the leaders, we cannot ack
- // again, but we still need to send our lastZxid to the leader so that we can
- // sync with it if it does assume leadership of the epoch.
- // the -1 indicates that this reply should not count as an ack for the new epoch
- wrappedEpochBytes.putInt(-1);
- } else {
- throw new IOException("Leaders epoch, "
- + newEpoch
- + " is less than accepted epoch, "
- + self.getAcceptedEpoch());
- }
- QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
- writePacket(ackNewEpoch, true);
- return ZxidUtils.makeZxid(newEpoch, 0);
- } else {
- if (newEpoch > self.getAcceptedEpoch()) {
- self.setAcceptedEpoch(newEpoch);
- }
- if (qp.getType() != Leader.NEWLEADER) {
- LOG.error("First packet should have been NEWLEADER");
- throw new IOException("First packet should have been NEWLEADER");
- }
- return qp.getZxid();
- }
- }
-
- /**
- * Finally, synchronize our history with the Leader (if Follower)
- * or the LearnerMaster (if Observer).
- * @param newLeaderZxid
- * @throws IOException
- * @throws InterruptedException
- */
- protected void syncWithLeader(long newLeaderZxid) throws Exception {
- long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
- QuorumVerifier newLeaderQV = null;
-
- class SyncHelper {
-
- // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot.
- // For SNAP and TRUNC the snapshot is needed to save that history.
- boolean willSnapshot = true;
- boolean syncSnapshot = false;
-
- // PROPOSALs received during sync, for matching up with COMMITs.
- Deque<PacketInFlight> proposals = new ArrayDeque<>();
-
- // PROPOSALs we delay forwarding to the ZK server until sync is done.
- Deque<PacketInFlight> delayedProposals = new ArrayDeque<>();
-
- // COMMITs we delay forwarding to the ZK server until sync is done.
- Deque<Long> delayedCommits = new ArrayDeque<>();
-
- void syncSnapshot() {
- syncSnapshot = true;
- }
-
- void noSnapshot() {
- willSnapshot = false;
- }
-
- void propose(PacketInFlight pif) {
- proposals.add(pif);
- delayedProposals.add(pif);
- }
-
- PacketInFlight nextProposal() {
- return proposals.peekFirst();
- }
-
- void commit() {
- PacketInFlight packet = proposals.remove();
- if (willSnapshot) {
- zk.processTxn(packet.hdr, packet.rec);
- delayedProposals.remove();
- } else {
- delayedCommits.add(packet.hdr.getZxid());
- }
- }
-
- void writeState() throws IOException, InterruptedException {
- // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER,
- // since this allows the leader to apply those transactions to its served state:
- if (willSnapshot) {
- zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions,
- willSnapshot = false; // but anything after this needs to go to the transaction log; or
- }
-
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
-
- // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first.
- fzk.syncProcessor.setDelayForwarding(true);
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- delayedProposals.clear();
- fzk.syncProcessor.syncFlush();
- }
-
- self.setCurrentEpoch(newEpoch);
- }
-
- void flushAcks() throws InterruptedException {
- if (zk instanceof FollowerZooKeeperServer) {
- // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState.
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(false);
- fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK.
- }
- }
-
- void applyDelayedPackets() {
- // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- for (Long zxid : delayedCommits) {
- fzk.commit(zxid);
- }
- } else if (zk instanceof ObserverZooKeeperServer) {
- ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- Long zxid = delayedCommits.peekFirst();
- if (p.hdr.getZxid() != zxid) {
- // log warning message if there is no matching commit
- // old leader send outstanding proposal to observer
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(zxid),
- Long.toHexString(p.hdr.getZxid()));
- continue;
- }
- delayedCommits.remove();
- Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
- request.setTxnDigest(p.digest);
- ozk.commitRequest(request);
- }
- } else {
- // New server type need to handle in-flight packets
- throw new UnsupportedOperationException("Unknown server type");
- }
- }
-
- }
-
- SyncHelper helper = new SyncHelper();
- QuorumPacket qp = new QuorumPacket();
- readPacket(qp);
- synchronized (zk) {
- if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
- self.setSyncMode(QuorumPeer.SyncMode.DIFF);
- if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
- LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
- helper.syncSnapshot();
- } else {
- helper.noSnapshot();
- }
- } else if (qp.getType() == Leader.SNAP) {
- self.setSyncMode(QuorumPeer.SyncMode.SNAP);
- LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
- // The leader is going to dump the database
- zk.getZKDatabase().deserializeSnapshot(leaderIs);
- // ZOOKEEPER-2819: overwrite config node content extracted
- // from leader snapshot with local config, to avoid potential
- // inconsistency of config node content during rolling restart.
- if (!self.isReconfigEnabled()) {
- LOG.debug("Reset config node content from local config after deserialization of snapshot.");
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- }
- String signature = leaderIs.readString("signature");
- if (!signature.equals("BenWasHere")) {
- LOG.error("Missing signature. Got {}", signature);
- throw new IOException("Missing signature");
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
- // Immediately persist the latest snapshot when there is txn log gap
- helper.syncSnapshot();
- } else if (qp.getType() == Leader.TRUNC) {
- // We need to truncate the log to the lastZxid of the leader
- self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
- LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
- boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
- if (!truncated) {
- LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- } else {
- LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- zk.createSessionTracker();
-
-
- // we are now going to start getting transactions to apply followed by an UPTODATE
- long lastQueued = 0;
- TxnLogEntry logEntry;
- outerLoop:
- while (self.isRunning()) {
- readPacket(qp);
- switch (qp.getType()) {
- case Leader.PROPOSAL:
- PacketInFlight pif = new PacketInFlight();
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- pif.hdr = logEntry.getHeader();
- pif.rec = logEntry.getTxn();
- pif.digest = logEntry.getDigest();
- if (pif.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(pif.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = pif.hdr.getZxid();
-
- if (pif.hdr.getType() == OpCode.reconfig) {
- SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
- QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- }
- helper.propose(pif);
- break;
- case Leader.COMMIT:
- case Leader.COMMITANDACTIVATE:
- pif = helper.nextProposal();
- if (pif.hdr.getZxid() != qp.getZxid()) {
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(qp.getZxid()),
- Long.toHexString(pif.hdr.getZxid()));
- } else {
- if (qp.getType() == Leader.COMMITANDACTIVATE) {
- tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid());
- }
- helper.commit();
- }
- break;
- case Leader.INFORM:
- case Leader.INFORMANDACTIVATE:
- PacketInFlight packet = new PacketInFlight();
- if (qp.getType() == Leader.INFORMANDACTIVATE) {
- ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
- long suggestedLeaderId = buffer.getLong();
- byte[] remainingData = new byte[buffer.remaining()];
- buffer.get(remainingData);
- logEntry = SerializeUtils.deserializeTxn(remainingData);
- packet.hdr = logEntry.getHeader();
- packet.rec = logEntry.getTxn();
- packet.digest = logEntry.getDigest();
- tryReconfig(packet, suggestedLeaderId, qp.getZxid());
- } else {
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- packet.rec = logEntry.getTxn();
- packet.hdr = logEntry.getHeader();
- packet.digest = logEntry.getDigest();
- // Log warning message if txn comes out-of-order
- if (packet.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(packet.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = packet.hdr.getZxid();
- }
- helper.propose(packet);
- helper.commit();
- break;
- case Leader.UPTODATE:
- LOG.info("Learner received UPTODATE message");
- if (newLeaderQV != null) {
- boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
- helper.flushAcks();
- self.setZooKeeperServer(zk);
- self.adminServer.setZooKeeperServer(zk);
- break outerLoop;
- case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
- LOG.info("Learner received NEWLEADER message");
- if (qp.getData() != null && qp.getData().length > 1) {
- try {
- QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- newLeaderQV = qv;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- helper.writeState();
- writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
- break;
- }
- }
- }
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
- ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
- writePacket(ack, true);
- zk.startServing();
- /*
- * Update the election vote here to ensure that all members of the
- * ensemble report the same vote to new servers that start up and
- * send leader election notifications to the ensemble.
- *
- * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
- */
- self.updateElectionVote(newEpoch);
-
- helper.applyDelayedPackets();
- }
-
- private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception {
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(qv, newLeader, zxid, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
-
- protected void revalidate(QuorumPacket qp) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
- DataInputStream dis = new DataInputStream(bis);
- long sessionId = dis.readLong();
- boolean valid = dis.readBoolean();
- ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
- if (cnxn == null) {
- LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId));
- } else {
- zk.finishSessionInit(cnxn, valid);
- }
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
- }
- }
-
- protected void ping(QuorumPacket qp) throws IOException {
- // Send back the ping with our session data
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- Map<Long, Integer> touchTable = zk.getTouchSnapshot();
- for (Entry<Long, Integer> entry : touchTable.entrySet()) {
- dos.writeLong(entry.getKey());
- dos.writeInt(entry.getValue());
- }
-
- QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
- writePacket(pingReply, true);
- }
-
- /**
- * Shutdown the Peer
- */
- public void shutdown() {
- self.setZooKeeperServer(null);
- self.closeAllConnections();
- self.adminServer.setZooKeeperServer(null);
-
- if (sender != null) {
- sender.shutdown();
- }
-
- closeSocket();
- // shutdown previous zookeeper
- if (zk != null) {
- // If we haven't finished SNAP sync, force fully shutdown
- // to avoid potential inconsistency
- zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
- }
- }
-
- boolean isRunning() {
- return self.isRunning() && zk.isRunning();
- }
-
- void closeSocket() {
- if (sock != null) {
- if (sockBeingClosed.compareAndSet(false, true)) {
- if (closeSocketAsync) {
- final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
- closingThread.setDaemon(true);
- closingThread.start();
- } else {
- closeSockSync();
- }
- }
- }
- }
-
- void closeSockSync() {
- try {
- long startTime = Time.currentElapsedTime();
- if (sock != null) {
- sock.close();
- sock = null;
- }
- ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
- } catch (IOException e) {
- LOG.warn("Ignoring error closing connection to leader", e);
- }
- }
-
-}