diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum')
6 files changed, 1877 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java new file mode 100644 index 00000000000..fd2ea277a40 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.server.ContainerManager; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import javax.management.JMException; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * + * Just like the standard ZooKeeperServer. We just replace the request + * processors: PrepRequestProcessor -> ProposalRequestProcessor -> + * CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> + * FinalRequestProcessor + */ +public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + + private ContainerManager containerManager; // guarded by sync + + CommitProcessor commitProcessor; + + PrepRequestProcessor prepRequestProcessor; + + /** + * @throws IOException + */ + public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); + } + + public Leader getLeader() { + return self.leader; + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader()); + commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener()); + commitProcessor.start(); + ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); + proposalProcessor.initialize(); + prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); + prepRequestProcessor.start(); + firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); + + setupContainerManager(); + } + + private synchronized void setupContainerManager() { + containerManager = new ContainerManager( + getZKDatabase(), + prepRequestProcessor, + Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), + Integer.getInteger("znode.container.maxPerMinute", 10000), + Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) + ); + } + + @Override + public synchronized void startup() { + super.startup(); + if (containerManager != null) { + containerManager.start(); + } + } + + @Override + protected void registerMetrics() { + super.registerMetrics(); + + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + rootContext.registerGauge("learners", gaugeWithLeader( + (leader) -> leader.getLearners().size()) + ); + rootContext.registerGauge("synced_followers", gaugeWithLeader( + (leader) -> leader.getForwardingFollowers().size() + )); + rootContext.registerGauge("synced_non_voting_followers", gaugeWithLeader( + (leader) -> leader.getNonVotingFollowers().size() + )); + rootContext.registerGauge("synced_observers", self::getSynced_observers_metric); + rootContext.registerGauge("pending_syncs", gaugeWithLeader( + (leader) -> leader.getNumPendingSyncs() + )); + rootContext.registerGauge("leader_uptime", gaugeWithLeader( + (leader) -> leader.getUptime() + )); + rootContext.registerGauge("last_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getLastBufferSize() + )); + rootContext.registerGauge("max_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getMaxBufferSize() + )); + rootContext.registerGauge("min_proposal_size", gaugeWithLeader( + (leader) -> leader.getProposalStats().getMinBufferSize() + )); + } + + private org.apache.zookeeper.metrics.Gauge gaugeWithLeader(Function<Leader, Number> supplier) { + return () -> { + final Leader leader = getLeader(); + if (leader == null) { + return null; + } + return supplier.apply(leader); + }; + } + + @Override + protected void unregisterMetrics() { + super.unregisterMetrics(); + + MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); + rootContext.unregisterGauge("learners"); + rootContext.unregisterGauge("synced_followers"); + rootContext.unregisterGauge("synced_non_voting_followers"); + rootContext.unregisterGauge("synced_observers"); + rootContext.unregisterGauge("pending_syncs"); + rootContext.unregisterGauge("leader_uptime"); + + rootContext.unregisterGauge("last_proposal_size"); + rootContext.unregisterGauge("max_proposal_size"); + rootContext.unregisterGauge("min_proposal_size"); + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (containerManager != null) { + containerManager.stop(); + } + super.shutdown(fullyShutDown); + } + + @Override + public int getGlobalOutstandingLimit() { + int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1; + int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor; + return globalOutstandingLimit; + } + + @Override + public void createSessionTracker() { + sessionTracker = new LeaderSessionTracker( + this, + getZKDatabase().getSessionWithTimeOuts(), + tickTime, + self.getId(), + self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + public boolean touch(long sess, int to) { + return sessionTracker.touchSession(sess, to); + } + + public boolean checkIfValidGlobalSession(long sess, int to) { + if (self.areLocalSessionsEnabled() && !upgradeableSessionTracker.isGlobalSession(sess)) { + return false; + } + return sessionTracker.touchSession(sess, to); + } + + /** + * Requests coming from the learner should go directly to + * PrepRequestProcessor + * + * @param request + */ + public void submitLearnerRequest(Request request) { + /* + * Requests coming from the learner should have gone through + * submitRequest() on each server which already perform some request + * validation, so we don't need to do it again. + * + * Additionally, LearnerHandler should start submitting requests into + * the leader's pipeline only when the leader's server is started, so we + * can submit the request directly into PrepRequestProcessor. + * + * This is done so that requests from learners won't go through + * LeaderRequestProcessor which perform local session upgrade. + */ + prepRequestProcessor.processRequest(request); + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(LeaderBean leaderBean, LocalPeerBean localPeerBean) { + // register with JMX + if (self.jmxLeaderElectionBean != null) { + try { + MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + self.jmxLeaderElectionBean = null; + } + + try { + jmxServerBean = leaderBean; + MBeanRegistry.getInstance().register(leaderBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + boolean registerJMX(LearnerHandlerBean handlerBean) { + try { + MBeanRegistry.getInstance().register(handlerBean, jmxServerBean); + return true; + } catch (JMException e) { + LOG.warn("Could not register connection", e); + } + return false; + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(Leader leader) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public String getState() { + return "leader"; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { + super.revalidateSession(cnxn, sessionId, sessionTimeout); + try { + // setowner as the leader itself, unless updated + // via the follower handlers + setOwner(sessionId, ServerCnxn.me); + } catch (SessionExpiredException e) { + // this is ok, it just means that the session revalidation failed. + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java new file mode 100644 index 00000000000..8e80fae57dc --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -0,0 +1,920 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.SSLSocket; +import org.apache.jute.BinaryInputArchive; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.Time; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.server.ExitCode; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ServerMetrics; +import org.apache.zookeeper.server.TxnLogEntry; +import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.server.util.ConfigUtils; +import org.apache.zookeeper.server.util.MessageTracker; +import org.apache.zookeeper.server.util.SerializeUtils; +import org.apache.zookeeper.server.util.ZxidUtils; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnDigest; +import org.apache.zookeeper.txn.TxnHeader; +import org.apache.zookeeper.util.ServiceUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is the superclass of two of the three main actors in a ZK + * ensemble: Followers and Observers. Both Followers and Observers share + * a good deal of code which is moved into Peer to avoid duplication. + */ +public class Learner { + + static class PacketInFlight { + + TxnHeader hdr; + Record rec; + TxnDigest digest; + + } + + QuorumPeer self; + LearnerZooKeeperServer zk; + + protected BufferedOutputStream bufferedOutput; + + protected Socket sock; + protected MultipleAddresses leaderAddr; + protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false); + + /** + * Socket getter + */ + public Socket getSocket() { + return sock; + } + + LearnerSender sender = null; + protected InputArchive leaderIs; + protected OutputArchive leaderOs; + /** the protocol version of the leader */ + protected int leaderProtocolVersion = 0x01; + + private static final int BUFFERED_MESSAGE_SIZE = 10; + protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE); + + protected static final Logger LOG = LoggerFactory.getLogger(Learner.class); + + /** + * Time to wait after connection attempt with the Leader or LearnerMaster before this + * Learner tries to connect again. + */ + private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100); + + private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true"); + + public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending"; + private static boolean asyncSending = + Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING)); + public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync"; + public static final boolean closeSocketAsync = Boolean + .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC)); + + static { + LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs); + LOG.info("TCP NoDelay set to: {}", nodelay); + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); + } + + final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); + + public int getPendingRevalidationsCount() { + return pendingRevalidations.size(); + } + + // for testing + protected static void setAsyncSending(boolean newMode) { + asyncSending = newMode; + LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending); + + } + protected static boolean getAsyncSending() { + return asyncSending; + } + /** + * validate a session for a client + * + * @param clientId + * the client to be revalidated + * @param timeout + * the timeout for which the session is valid + * @throws IOException + */ + void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { + LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(clientId); + dos.writeInt(timeout); + dos.close(); + QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); + pendingRevalidations.put(clientId, cnxn); + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "To validate session 0x" + Long.toHexString(clientId)); + } + writePacket(qp, true); + } + + /** + * write a packet to the leader. + * + * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time. + * When packets are sent synchronously, writing is done within a synchronization block. + * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe. + * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only. + * So we have only one thread writing to leaderOs at a time in either case. + * + * @param pp + * the proposal packet to be sent to the leader + * @throws IOException + */ + void writePacket(QuorumPacket pp, boolean flush) throws IOException { + if (asyncSending) { + sender.queuePacket(pp); + } else { + writePacketNow(pp, flush); + } + } + + void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { + synchronized (leaderOs) { + if (pp != null) { + messageTracker.trackSent(pp.getType()); + leaderOs.writeRecord(pp, "packet"); + } + if (flush) { + bufferedOutput.flush(); + } + } + } + + /** + * Start thread that will forward any packet in the queue to the leader + */ + protected void startSendingThread() { + sender = new LearnerSender(this); + sender.start(); + } + + /** + * read a packet from the leader + * + * @param pp + * the packet to be instantiated + * @throws IOException + */ + void readPacket(QuorumPacket pp) throws IOException { + synchronized (leaderIs) { + leaderIs.readRecord(pp, "packet"); + messageTracker.trackReceived(pp.getType()); + } + if (LOG.isTraceEnabled()) { + final long traceMask = + (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK + : ZooTrace.SERVER_PACKET_TRACE_MASK; + + ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp); + } + } + + /** + * send a request packet to the leader + * + * @param request + * the request from the client + * @throws IOException + */ + void request(Request request) throws IOException { + if (request.isThrottled()) { + LOG.error("Throttled request sent to leader: {}. Exiting", request); + ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream oa = new DataOutputStream(baos); + oa.writeLong(request.sessionId); + oa.writeInt(request.cxid); + oa.writeInt(request.type); + if (request.request != null) { + request.request.rewind(); + int len = request.request.remaining(); + byte[] b = new byte[len]; + request.request.get(b); + request.request.rewind(); + oa.write(b); + } + oa.close(); + QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); + writePacket(qp, true); + } + + /** + * Returns the address of the node we think is the leader. + */ + protected QuorumServer findLeader() { + QuorumServer leaderServer = null; + // Find the leader by id + Vote current = self.getCurrentVote(); + for (QuorumServer s : self.getView().values()) { + if (s.id == current.getId()) { + // Ensure we have the leader's correct IP address before + // attempting to connect. + s.recreateSocketAddresses(); + leaderServer = s; + break; + } + } + if (leaderServer == null) { + LOG.warn("Couldn't find the leader with id = {}", current.getId()); + } + return leaderServer; + } + + /** + * Overridable helper method to return the System.nanoTime(). + * This method behaves identical to System.nanoTime(). + */ + protected long nanoTime() { + return System.nanoTime(); + } + + /** + * Overridable helper method to simply call sock.connect(). This can be + * overriden in tests to fake connection success/failure for connectToLeader. + */ + protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException { + sock.connect(addr, timeout); + } + + /** + * Establish a connection with the LearnerMaster found by findLearnerMaster. + * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. + * Retries until either initLimit time has elapsed or 5 tries have happened. + * @param multiAddr - the address of the Peer to connect to. + * @throws IOException - if the socket connection fails on the 5th attempt + * if there is an authentication failure while connecting to leader + */ + protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException { + + this.leaderAddr = multiAddr; + Set<InetSocketAddress> addresses; + if (self.isMultiAddressReachabilityCheckEnabled()) { + // even if none of the addresses are reachable, we want to try to establish connection + // see ZOOKEEPER-3758 + addresses = multiAddr.getAllReachableAddressesOrAll(); + } else { + addresses = multiAddr.getAllAddresses(); + } + ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); + CountDownLatch latch = new CountDownLatch(addresses.size()); + AtomicReference<Socket> socket = new AtomicReference<>(null); + addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit); + + try { + latch.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while trying to connect to Leader", e); + } finally { + executor.shutdown(); + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + LOG.error("not all the LeaderConnector terminated properly"); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while terminating LeaderConnector executor.", ie); + } + } + + if (socket.get() == null) { + throw new IOException("Failed connect to " + multiAddr); + } else { + sock = socket.get(); + sockBeingClosed.set(false); + } + + self.authLearner.authenticate(sock, hostname); + + leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); + bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); + leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); + if (asyncSending) { + startSendingThread(); + } + } + + class LeaderConnector implements Runnable { + + private AtomicReference<Socket> socket; + private InetSocketAddress address; + private CountDownLatch latch; + + LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) { + this.address = address; + this.socket = socket; + this.latch = latch; + } + + @Override + public void run() { + try { + Thread.currentThread().setName("LeaderConnector-" + address); + Socket sock = connectToLeader(); + + if (sock != null && sock.isConnected()) { + if (socket.compareAndSet(null, sock)) { + LOG.info("Successfully connected to leader, using address: {}", address); + } else { + LOG.info("Connection to the leader is already established, close the redundant connection"); + sock.close(); + } + } + + } catch (Exception e) { + LOG.error("Failed connect to {}", address, e); + } finally { + latch.countDown(); + } + } + + private Socket connectToLeader() throws IOException, X509Exception, InterruptedException { + Socket sock = createSocket(); + + // leader connection timeout defaults to tickTime * initLimit + int connectTimeout = self.tickTime * self.initLimit; + + // but if connectToLearnerMasterLimit is specified, use that value to calculate + // timeout instead of using the initLimit value + if (self.connectToLearnerMasterLimit > 0) { + connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; + } + + int remainingTimeout; + long startNanoTime = nanoTime(); + + for (int tries = 0; tries < 5 && socket.get() == null; tries++) { + try { + // recalculate the init limit time because retries sleep for 1000 milliseconds + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + if (remainingTimeout <= 0) { + LOG.error("connectToLeader exceeded on retries."); + throw new IOException("connectToLeader exceeded on retries."); + } + + sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout)); + if (self.isSslQuorum()) { + ((SSLSocket) sock).startHandshake(); + } + sock.setTcpNoDelay(nodelay); + break; + } catch (IOException e) { + remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000); + + if (remainingTimeout <= leaderConnectDelayDuringRetryMs) { + LOG.error( + "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else if (tries >= 4) { + LOG.error( + "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + throw e; + } else { + LOG.warn( + "Unexpected exception, tries={}, remaining init limit={}, connecting to {}", + tries, + remainingTimeout, + address, + e); + sock = createSocket(); + } + } + Thread.sleep(leaderConnectDelayDuringRetryMs); + } + + return sock; + } + } + + /** + * Creating a simple or and SSL socket. + * This can be overridden in tests to fake already connected sockets for connectToLeader. + */ + protected Socket createSocket() throws X509Exception, IOException { + Socket sock; + if (self.isSslQuorum()) { + sock = self.getX509Util().createSSLSocket(); + } else { + sock = new Socket(); + } + sock.setSoTimeout(self.tickTime * self.initLimit); + return sock; + } + + /** + * Once connected to the leader or learner master, perform the handshake + * protocol to establish a following / observing connection. + * @param pktType + * @return the zxid the Leader sends for synchronization purposes. + * @throws IOException + */ + protected long registerWithLeader(int pktType) throws IOException { + /* + * Send follower info, including last zxid and sid + */ + long lastLoggedZxid = self.getLastLoggedZxid(); + QuorumPacket qp = new QuorumPacket(); + qp.setType(pktType); + qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); + + /* + * Add sid to payload + */ + LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); + ByteArrayOutputStream bsid = new ByteArrayOutputStream(); + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); + boa.writeRecord(li, "LearnerInfo"); + qp.setData(bsid.toByteArray()); + + writePacket(qp, true); + readPacket(qp); + final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + if (qp.getType() == Leader.LEADERINFO) { + // we are connected to a 1.0 server so accept the new epoch and read the next packet + leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); + byte[] epochBytes = new byte[4]; + final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); + if (newEpoch > self.getAcceptedEpoch()) { + wrappedEpochBytes.putInt((int) self.getCurrentEpoch()); + self.setAcceptedEpoch(newEpoch); + } else if (newEpoch == self.getAcceptedEpoch()) { + // since we have already acked an epoch equal to the leaders, we cannot ack + // again, but we still need to send our lastZxid to the leader so that we can + // sync with it if it does assume leadership of the epoch. + // the -1 indicates that this reply should not count as an ack for the new epoch + wrappedEpochBytes.putInt(-1); + } else { + throw new IOException("Leaders epoch, " + + newEpoch + + " is less than accepted epoch, " + + self.getAcceptedEpoch()); + } + QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); + writePacket(ackNewEpoch, true); + return ZxidUtils.makeZxid(newEpoch, 0); + } else { + if (newEpoch > self.getAcceptedEpoch()) { + self.setAcceptedEpoch(newEpoch); + } + if (qp.getType() != Leader.NEWLEADER) { + LOG.error("First packet should have been NEWLEADER"); + throw new IOException("First packet should have been NEWLEADER"); + } + return qp.getZxid(); + } + } + + /** + * Finally, synchronize our history with the Leader (if Follower) + * or the LearnerMaster (if Observer). + * @param newLeaderZxid + * @throws IOException + * @throws InterruptedException + */ + protected void syncWithLeader(long newLeaderZxid) throws Exception { + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); + QuorumPacket qp = new QuorumPacket(); + long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); + + QuorumVerifier newLeaderQV = null; + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + boolean syncSnapshot = false; + readPacket(qp); + Deque<Long> packetsCommitted = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); + synchronized (zk) { + if (qp.getType() == Leader.DIFF) { + LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); + self.setSyncMode(QuorumPeer.SyncMode.DIFF); + if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { + LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); + snapshotNeeded = true; + syncSnapshot = true; + } else { + snapshotNeeded = false; + } + } else if (qp.getType() == Leader.SNAP) { + self.setSyncMode(QuorumPeer.SyncMode.SNAP); + LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); + // The leader is going to dump the database + // db is clear as part of deserializeSnapshot() + zk.getZKDatabase().deserializeSnapshot(leaderIs); + // ZOOKEEPER-2819: overwrite config node content extracted + // from leader snapshot with local config, to avoid potential + // inconsistency of config node content during rolling restart. + if (!self.isReconfigEnabled()) { + LOG.debug("Reset config node content from local config after deserialization of snapshot."); + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + } + String signature = leaderIs.readString("signature"); + if (!signature.equals("BenWasHere")) { + LOG.error("Missing signature. Got {}", signature); + throw new IOException("Missing signature"); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; + } else if (qp.getType() == Leader.TRUNC) { + //we need to truncate the log to the lastzxid of the leader + self.setSyncMode(QuorumPeer.SyncMode.TRUNC); + LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); + boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); + if (!truncated) { + // not able to truncate the log + LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + + } else { + LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); + ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); + } + zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); + zk.createSessionTracker(); + + long lastQueued = 0; + + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) + // we need to make sure that we don't take the snapshot twice. + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; + TxnLogEntry logEntry; + // we are now going to start getting transactions to apply followed by an UPTODATE + outerLoop: + while (self.isRunning()) { + readPacket(qp); + switch (qp.getType()) { + case Leader.PROPOSAL: + PacketInFlight pif = new PacketInFlight(); + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + pif.hdr = logEntry.getHeader(); + pif.rec = logEntry.getTxn(); + pif.digest = logEntry.getDigest(); + if (pif.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(pif.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = pif.hdr.getZxid(); + + if (pif.hdr.getType() == OpCode.reconfig) { + SetDataTxn setDataTxn = (SetDataTxn) pif.rec; + QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + } + + packetsNotLogged.add(pif); + packetsNotCommitted.add(pif); + break; + case Leader.COMMIT: + case Leader.COMMITANDACTIVATE: + pif = packetsNotCommitted.peekFirst(); + if (pif.hdr.getZxid() != qp.getZxid()) { + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(qp.getZxid()), + Long.toHexString(pif.hdr.getZxid())); + } else { + if (qp.getType() == Leader.COMMITANDACTIVATE) { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig( + qv, + ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), + true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (!writeToTxnLog) { + zk.processTxn(pif.hdr, pif.rec); + packetsNotLogged.remove(); + packetsNotCommitted.remove(); + } else { + packetsNotCommitted.remove(); + packetsCommitted.add(qp.getZxid()); + } + } + break; + case Leader.INFORM: + case Leader.INFORMANDACTIVATE: + PacketInFlight packet = new PacketInFlight(); + + if (qp.getType() == Leader.INFORMANDACTIVATE) { + ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); + long suggestedLeaderId = buffer.getLong(); + byte[] remainingdata = new byte[buffer.remaining()]; + buffer.get(remainingdata); + logEntry = SerializeUtils.deserializeTxn(remainingdata); + packet.hdr = logEntry.getHeader(); + packet.rec = logEntry.getTxn(); + packet.digest = logEntry.getDigest(); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } else { + logEntry = SerializeUtils.deserializeTxn(qp.getData()); + packet.rec = logEntry.getTxn(); + packet.hdr = logEntry.getHeader(); + packet.digest = logEntry.getDigest(); + // Log warning message if txn comes out-of-order + if (packet.hdr.getZxid() != lastQueued + 1) { + LOG.warn( + "Got zxid 0x{} expected 0x{}", + Long.toHexString(packet.hdr.getZxid()), + Long.toHexString(lastQueued + 1)); + } + lastQueued = packet.hdr.getZxid(); + } + if (!writeToTxnLog) { + // Apply to db directly if we haven't taken the snapshot + zk.processTxn(packet.hdr, packet.rec); + } else { + packetsNotLogged.add(packet); + packetsCommitted.add(qp.getZxid()); + } + + break; + case Leader.UPTODATE: + LOG.info("Learner received UPTODATE message"); + if (newLeaderQV != null) { + boolean majorChange = self.processReconfig(newLeaderQV, null, null, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (isPreZAB1_0) { + zk.takeSnapshot(syncSnapshot); + self.setCurrentEpoch(newEpoch); + } + self.setZooKeeperServer(zk); + self.adminServer.setZooKeeperServer(zk); + break outerLoop; + case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 + LOG.info("Learner received NEWLEADER message"); + if (qp.getData() != null && qp.getData().length > 1) { + try { + QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8)); + self.setLastSeenQuorumVerifier(qv, true); + newLeaderQV = qv; + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (snapshotNeeded) { + zk.takeSnapshot(syncSnapshot); + } + + self.setCurrentEpoch(newEpoch); + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); + } + + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } + break; + } + } + } + ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); + writePacket(ack, true); + zk.startServing(); + /* + * Update the election vote here to ensure that all members of the + * ensemble report the same vote to new servers that start up and + * send leader election notifications to the ensemble. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732 + */ + self.updateElectionVote(newEpoch); + + // We need to log the stuff that came in between the snapshot and the uptodate + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : packetsCommitted) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + // Similar to follower, we need to log requests between the snapshot + // and UPTODATE + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + Long zxid = packetsCommitted.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + packetsCommitted.remove(); + Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); + request.setTxn(p.rec); + request.setHdr(p.hdr); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + protected void revalidate(QuorumPacket qp) throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); + DataInputStream dis = new DataInputStream(bis); + long sessionId = dis.readLong(); + boolean valid = dis.readBoolean(); + ServerCnxn cnxn = pendingRevalidations.remove(sessionId); + if (cnxn == null) { + LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId)); + } else { + zk.finishSessionInit(cnxn, valid); + } + if (LOG.isTraceEnabled()) { + ZooTrace.logTraceMessage( + LOG, + ZooTrace.SESSION_TRACE_MASK, + "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid); + } + } + + protected void ping(QuorumPacket qp) throws IOException { + // Send back the ping with our session data + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + Map<Long, Integer> touchTable = zk.getTouchSnapshot(); + for (Entry<Long, Integer> entry : touchTable.entrySet()) { + dos.writeLong(entry.getKey()); + dos.writeInt(entry.getValue()); + } + + QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo()); + writePacket(pingReply, true); + } + + /** + * Shutdown the Peer + */ + public void shutdown() { + self.setZooKeeperServer(null); + self.closeAllConnections(); + self.adminServer.setZooKeeperServer(null); + + if (sender != null) { + sender.shutdown(); + } + + closeSocket(); + // shutdown previous zookeeper + if (zk != null) { + // If we haven't finished SNAP sync, force fully shutdown + // to avoid potential inconsistency + zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP)); + } + } + + boolean isRunning() { + return self.isRunning() && zk.isRunning(); + } + + void closeSocket() { + if (sock != null) { + if (sockBeingClosed.compareAndSet(false, true)) { + if (closeSocketAsync) { + final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId()); + closingThread.setDaemon(true); + closingThread.start(); + } else { + closeSockSync(); + } + } + } + } + + void closeSockSync() { + try { + long startTime = Time.currentElapsedTime(); + if (sock != null) { + sock.close(); + sock = null; + } + ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime); + } catch (IOException e) { + LOG.warn("Ignoring error closing connection to leader", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java new file mode 100644 index 00000000000..990f75bee9e --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServerBean; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Parent class for all ZooKeeperServers for Learners + */ +public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { + + /* + * Request processors + */ + protected CommitProcessor commitProcessor; + protected SyncRequestProcessor syncProcessor; + + public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) throws IOException { + super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self); + } + + /** + * Abstract method to return the learner associated with this server. + * Since the Learner may change under our feet (when QuorumPeer reassigns + * it) we can't simply take a reference here. Instead, we need the + * subclasses to implement this. + */ + public abstract Learner getLearner(); + + /** + * Returns the current state of the session tracker. This is only currently + * used by a Learner to build a ping response packet. + * + */ + protected Map<Long, Integer> getTouchSnapshot() { + if (sessionTracker != null) { + return ((LearnerSessionTracker) sessionTracker).snapshot(); + } + Map<Long, Integer> map = Collections.emptyMap(); + return map; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, + getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, + self.getId(), + self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + @Override + protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { + if (upgradeableSessionTracker.isLocalSession(sessionId)) { + super.revalidateSession(cnxn, sessionId, sessionTimeout); + } else { + getLearner().validateSession(cnxn, sessionId, sessionTimeout); + } + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) { + // register with JMX + if (self.jmxLeaderElectionBean != null) { + try { + MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + } + self.jmxLeaderElectionBean = null; + } + + try { + jmxServerBean = serverBean; + MBeanRegistry.getInstance().register(serverBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(Learner peer) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (!canShutdown()) { + LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); + } + else { + LOG.info("Shutting down"); + try { + if (syncProcessor != null) { + // Shutting down the syncProcessor here, first, ensures queued transactions here are written to + // permanent storage, which ensures that crash recovery data is consistent with what is used for a + // leader election immediately following shutdown, because of the old leader going down; and also + // that any state on its way to being written is also loaded in the potential call to + // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader + // that contains entries we have already written to our transaction log. + syncProcessor.shutdown(); + } + } + catch (Exception e) { + LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); + } + } + try { + super.shutdown(fullyShutDown); + } catch (Exception e) { + LOG.warn("Ignoring unexpected exception during shutdown", e); + } + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java new file mode 100644 index 00000000000..b71720aec89 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; + +/** + * A ZooKeeperServer for the Observer node type. Not much is different, but + * we anticipate specializing the request processors in the future. + * + */ +public class ObserverZooKeeperServer extends LearnerZooKeeperServer { + + private static final Logger LOG = LoggerFactory.getLogger(ObserverZooKeeperServer.class); + + /** + * Enable since request processor for writing txnlog to disk and + * take periodic snapshot. Default is ON. + */ + + private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled(); + + /* + * Pending sync requests + */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>(); + + ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { + super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); + LOG.info("syncEnabled ={}", syncRequestProcessorEnabled); + } + + public Observer getObserver() { + return self.observer; + } + + @Override + public Learner getLearner() { + return self.observer; + } + + /** + * Unlike a Follower, which sees a full request only during the PROPOSAL + * phase, Observers get all the data required with the INFORM packet. + * This method commits a request that has been unpacked by from an INFORM + * received from the Leader. + * + * @param request + */ + public void commitRequest(Request request) { + if (syncProcessor != null) { + // Write to txnlog and take periodic snapshot + syncProcessor.processRequest(request); + } + commitProcessor.commit(request); + } + + /** + * Set up the request processors for an Observer: + * firstProcesor->commitProcessor->finalProcessor + */ + @Override + protected void setupRequestProcessors() { + // We might consider changing the processor behaviour of + // Observers to, for example, remove the disk sync requirements. + // Currently, they behave almost exactly the same as followers. + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); + commitProcessor.start(); + firstProcessor = new ObserverRequestProcessor(this, commitProcessor); + ((ObserverRequestProcessor) firstProcessor).start(); + + /* + * Observer should write to disk, so that the it won't request + * too old txn from the leader which may lead to getting an entire + * snapshot. + * + * However, this may degrade performance as it has to write to disk + * and do periodic snapshot which may double the memory requirements + */ + if (syncRequestProcessorEnabled) { + syncProcessor = new SyncRequestProcessor(this, null); + syncProcessor.start(); + } + else { + syncProcessor = null; + } + } + + /* + * Process a sync request + */ + public synchronized void sync() { + if (pendingSyncs.size() == 0) { + LOG.warn("Not expecting a sync."); + return; + } + + Request r = pendingSyncs.remove(); + commitProcessor.commit(r); + } + + @Override + public String getState() { + return "observer"; + } + + @Override + public void dumpMonitorValues(BiConsumer<String, Object> response) { + super.dumpMonitorValues(response); + response.accept("observer_master_id", getObserver().getLearnerMasterId()); + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java new file mode 100644 index 00000000000..b1403ecd59b --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.DataTreeBean; +import org.apache.zookeeper.server.FinalRequestProcessor; +import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerBean; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * A ZooKeeperServer which comes into play when peer is partitioned from the + * majority. Handles read-only clients, but drops connections from not-read-only + * ones. + * <p> + * The very first processor in the chain of request processors is a + * ReadOnlyRequestProcessor which drops state-changing requests. + */ +public class ReadOnlyZooKeeperServer extends ZooKeeperServer { + + protected final QuorumPeer self; + private volatile boolean shutdown = false; + + ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) { + super( + logFactory, + self.tickTime, + self.minSessionTimeout, + self.maxSessionTimeout, + self.clientPortListenBacklog, + zkDb, + self.getInitialConfig(), + self.isReconfigEnabled()); + this.self = self; + } + + @Override + protected void setupRequestProcessors() { + RequestProcessor finalProcessor = new FinalRequestProcessor(this); + RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor); + ((PrepRequestProcessor) prepProcessor).start(); + firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor); + ((ReadOnlyRequestProcessor) firstProcessor).start(); + } + + @Override + public synchronized void startup() { + // check to avoid startup follows shutdown + if (shutdown) { + LOG.warn("Not starting Read-only server as startup follows shutdown!"); + return; + } + registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean); + super.startup(); + self.setZooKeeperServer(this); + self.adminServer.setZooKeeperServer(this); + LOG.info("Read-only server started"); + } + + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, self.getId(), self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + @Override + protected void startSessionTracker() { + ((LearnerSessionTracker) sessionTracker).start(); + } + + @Override + protected void setLocalSessionFlag(Request si) { + switch (si.type) { + case OpCode.createSession: + if (self.areLocalSessionsEnabled()) { + si.setLocalSession(true); + } + break; + case OpCode.closeSession: + if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) { + si.setLocalSession(true); + } else { + LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode", + Long.toHexString(si.sessionId)); + } + break; + default: + break; + } + } + + @Override + protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException { + if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) { + String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress(); + LOG.info(msg); + throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE); + } + } + + @Override + protected void registerJMX() { + // register with JMX + try { + jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree()); + MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxDataTreeBean = null; + } + } + + public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) { + // register with JMX + try { + jmxServerBean = serverBean; + MBeanRegistry.getInstance().register(serverBean, localPeerBean); + } catch (Exception e) { + LOG.warn("Failed to register with JMX", e); + jmxServerBean = null; + } + } + + @Override + protected void unregisterJMX() { + // unregister from JMX + try { + if (jmxDataTreeBean != null) { + MBeanRegistry.getInstance().unregister(jmxDataTreeBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxDataTreeBean = null; + } + + protected void unregisterJMX(ZooKeeperServer zks) { + // unregister from JMX + try { + if (jmxServerBean != null) { + MBeanRegistry.getInstance().unregister(jmxServerBean); + } + } catch (Exception e) { + LOG.warn("Failed to unregister with JMX", e); + } + jmxServerBean = null; + } + + @Override + public String getState() { + return "read-only"; + } + + /** + * Returns the id of the associated QuorumPeer, which will do for a unique + * id of this server. + */ + @Override + public long getServerId() { + return self.getId(); + } + + @Override + public synchronized void shutdown(boolean fullyShutDown) { + if (!canShutdown()) { + super.shutdown(fullyShutDown); + LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); + } + else { + shutdown = true; + unregisterJMX(this); + + // set peer's server to null + self.setZooKeeperServer(null); + // clear all the connections + self.closeAllConnections(); + + self.adminServer.setZooKeeperServer(null); + } + // shutdown the server itself + super.shutdown(fullyShutDown); + } + + @Override + public void dumpConf(PrintWriter pwriter) { + super.dumpConf(pwriter); + + pwriter.print("initLimit="); + pwriter.println(self.getInitLimit()); + pwriter.print("syncLimit="); + pwriter.println(self.getSyncLimit()); + pwriter.print("electionAlg="); + pwriter.println(self.getElectionType()); + pwriter.print("electionPort="); + pwriter.println(self.getElectionAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining("|"))); + pwriter.print("quorumPort="); + pwriter.println(self.getQuorumAddress().getAllPorts() + .stream().map(Objects::toString).collect(Collectors.joining("|"))); + pwriter.print("peerType="); + pwriter.println(self.getLearnerType().ordinal()); + } + + @Override + protected void setState(State state) { + this.state = state; + } + +} diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java new file mode 100644 index 00000000000..3b7a9dfc331 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Flushable; +import java.io.IOException; +import java.net.Socket; + +public class SendAckRequestProcessor implements RequestProcessor, Flushable { + + private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); + + Learner learner; + + SendAckRequestProcessor(Learner peer) { + this.learner = peer; + } + + public void processRequest(Request si) { + if (si.type != OpCode.sync) { + QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null); + try { + si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY); + + learner.writePacket(qp, false); + } catch (IOException e) { + LOG.warn("Closing connection to leader, exception during packet send", e); + try { + if (!learner.sock.isClosed()) { + learner.sock.close(); + } + } catch (IOException e1) { + // Nothing to do, we are shutting things down, so an exception here is irrelevant + LOG.debug("Ignoring error closing the connection", e1); + } + } + } + } + + public void flush() throws IOException { + try { + learner.writePacket(null, true); + } catch (IOException e) { + LOG.warn("Closing connection to leader, exception during packet send", e); + try { + Socket socket = learner.sock; + if (socket != null && !socket.isClosed()) { + learner.sock.close(); + } + } catch (IOException e1) { + // Nothing to do, we are shutting things down, so an exception here is irrelevant + LOG.debug("Ignoring error closing the connection", e1); + } + } + } + + public void shutdown() { + // Nothing needed + } + +} |