summaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum')
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java311
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java921
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java185
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java141
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java240
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java85
6 files changed, 0 insertions, 1883 deletions
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
deleted file mode 100644
index a44ebc3f7b8..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ /dev/null
@@ -1,311 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.metrics.MetricsContext;
-import org.apache.zookeeper.server.ContainerManager;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.PrepRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import javax.management.JMException;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-/**
- *
- * Just like the standard ZooKeeperServer. We just replace the request
- * processors: PrepRequestProcessor -> ProposalRequestProcessor ->
- * CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->
- * FinalRequestProcessor
- */
-public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
-
- private ContainerManager containerManager; // guarded by sync
-
- CommitProcessor commitProcessor;
-
- PrepRequestProcessor prepRequestProcessor;
-
- /**
- * @throws IOException
- */
- public LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
- }
-
- public Leader getLeader() {
- return self.leader;
- }
-
- @Override
- protected void setupRequestProcessors() {
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
- commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
- commitProcessor.start();
- ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
- proposalProcessor.initialize();
- prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
- prepRequestProcessor.start();
- firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
-
- setupContainerManager();
- }
-
- private synchronized void setupContainerManager() {
- containerManager = new ContainerManager(
- getZKDatabase(),
- prepRequestProcessor,
- Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
- Integer.getInteger("znode.container.maxPerMinute", 10000),
- Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
- );
- }
-
- @Override
- public synchronized void startup() {
- super.startup();
- if (containerManager != null) {
- containerManager.start();
- }
- }
-
- @Override
- protected void registerMetrics() {
- super.registerMetrics();
-
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
- rootContext.registerGauge("learners", gaugeWithLeader(
- (leader) -> leader.getLearners().size())
- );
- rootContext.registerGauge("synced_followers", gaugeWithLeader(
- (leader) -> leader.getForwardingFollowers().size()
- ));
- rootContext.registerGauge("synced_non_voting_followers", gaugeWithLeader(
- (leader) -> leader.getNonVotingFollowers().size()
- ));
- rootContext.registerGauge("synced_observers", self::getSynced_observers_metric);
- rootContext.registerGauge("pending_syncs", gaugeWithLeader(
- (leader) -> leader.getNumPendingSyncs()
- ));
- rootContext.registerGauge("leader_uptime", gaugeWithLeader(
- (leader) -> leader.getUptime()
- ));
- rootContext.registerGauge("last_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getLastBufferSize()
- ));
- rootContext.registerGauge("max_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getMaxBufferSize()
- ));
- rootContext.registerGauge("min_proposal_size", gaugeWithLeader(
- (leader) -> leader.getProposalStats().getMinBufferSize()
- ));
- }
-
- private org.apache.zookeeper.metrics.Gauge gaugeWithLeader(Function<Leader, Number> supplier) {
- return () -> {
- final Leader leader = getLeader();
- if (leader == null) {
- return null;
- }
- return supplier.apply(leader);
- };
- }
-
- @Override
- protected void unregisterMetrics() {
- super.unregisterMetrics();
-
- MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
- rootContext.unregisterGauge("learners");
- rootContext.unregisterGauge("synced_followers");
- rootContext.unregisterGauge("synced_non_voting_followers");
- rootContext.unregisterGauge("synced_observers");
- rootContext.unregisterGauge("pending_syncs");
- rootContext.unregisterGauge("leader_uptime");
-
- rootContext.unregisterGauge("last_proposal_size");
- rootContext.unregisterGauge("max_proposal_size");
- rootContext.unregisterGauge("min_proposal_size");
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (containerManager != null) {
- containerManager.stop();
- }
- super.shutdown(fullyShutDown);
- }
-
- @Override
- public int getGlobalOutstandingLimit() {
- int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
- int globalOutstandingLimit = super.getGlobalOutstandingLimit() / divisor;
- return globalOutstandingLimit;
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LeaderSessionTracker(
- this,
- getZKDatabase().getSessionWithTimeOuts(),
- tickTime,
- self.getId(),
- self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- public boolean touch(long sess, int to) {
- return sessionTracker.touchSession(sess, to);
- }
-
- public boolean checkIfValidGlobalSession(long sess, int to) {
- if (self.areLocalSessionsEnabled() && !upgradeableSessionTracker.isGlobalSession(sess)) {
- return false;
- }
- return sessionTracker.touchSession(sess, to);
- }
-
- /**
- * Requests coming from the learner should go directly to
- * PrepRequestProcessor
- *
- * @param request
- */
- public void submitLearnerRequest(Request request) {
- /*
- * Requests coming from the learner should have gone through
- * submitRequest() on each server which already perform some request
- * validation, so we don't need to do it again.
- *
- * Additionally, LearnerHandler should start submitting requests into
- * the leader's pipeline only when the leader's server is started, so we
- * can submit the request directly into PrepRequestProcessor.
- *
- * This is done so that requests from learners won't go through
- * LeaderRequestProcessor which perform local session upgrade.
- */
- prepRequestProcessor.processRequest(request);
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(LeaderBean leaderBean, LocalPeerBean localPeerBean) {
- // register with JMX
- if (self.jmxLeaderElectionBean != null) {
- try {
- MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- jmxServerBean = leaderBean;
- MBeanRegistry.getInstance().register(leaderBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- boolean registerJMX(LearnerHandlerBean handlerBean) {
- try {
- MBeanRegistry.getInstance().register(handlerBean, jmxServerBean);
- return true;
- } catch (JMException e) {
- LOG.warn("Could not register connection", e);
- }
- return false;
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(Leader leader) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public String getState() {
- return "leader";
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getId();
- }
-
- @Override
- protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
- super.revalidateSession(cnxn, sessionId, sessionTimeout);
- try {
- // setowner as the leader itself, unless updated
- // via the follower handlers
- setOwner(sessionId, ServerCnxn.me);
- } catch (SessionExpiredException e) {
- // this is ok, it just means that the session revalidation failed.
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
deleted file mode 100644
index 1f5f2a0b225..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ /dev/null
@@ -1,921 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.ssl.SSLSocket;
-import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
-import org.apache.jute.OutputArchive;
-import org.apache.jute.Record;
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.common.Time;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.server.ExitCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.apache.zookeeper.server.TxnLogEntry;
-import org.apache.zookeeper.server.ZooTrace;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
-import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.apache.zookeeper.server.util.ConfigUtils;
-import org.apache.zookeeper.server.util.MessageTracker;
-import org.apache.zookeeper.server.util.SerializeUtils;
-import org.apache.zookeeper.server.util.ZxidUtils;
-import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
-import org.apache.zookeeper.util.ServiceUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the superclass of two of the three main actors in a ZK
- * ensemble: Followers and Observers. Both Followers and Observers share
- * a good deal of code which is moved into Peer to avoid duplication.
- */
-public class Learner {
-
- static class PacketInFlight {
-
- TxnHeader hdr;
- Record rec;
- TxnDigest digest;
-
- }
-
- QuorumPeer self;
- LearnerZooKeeperServer zk;
-
- protected BufferedOutputStream bufferedOutput;
-
- protected Socket sock;
- protected MultipleAddresses leaderAddr;
- protected AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
-
- /**
- * Socket getter
- */
- public Socket getSocket() {
- return sock;
- }
-
- LearnerSender sender = null;
- protected InputArchive leaderIs;
- protected OutputArchive leaderOs;
- /** the protocol version of the leader */
- protected int leaderProtocolVersion = 0x01;
-
- private static final int BUFFERED_MESSAGE_SIZE = 10;
- protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);
-
- protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
-
- /**
- * Time to wait after connection attempt with the Leader or LearnerMaster before this
- * Learner tries to connect again.
- */
- private static final int leaderConnectDelayDuringRetryMs = Integer.getInteger("zookeeper.leaderConnectDelayDuringRetryMs", 100);
-
- private static final boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");
-
- public static final String LEARNER_ASYNC_SENDING = "zookeeper.learner.asyncSending";
- private static boolean asyncSending =
- Boolean.parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_ASYNC_SENDING));
- public static final String LEARNER_CLOSE_SOCKET_ASYNC = "zookeeper.learner.closeSocketAsync";
- public static final boolean closeSocketAsync = Boolean
- .parseBoolean(ConfigUtils.getPropertyBackwardCompatibleWay(LEARNER_CLOSE_SOCKET_ASYNC));
-
- static {
- LOG.info("leaderConnectDelayDuringRetryMs: {}", leaderConnectDelayDuringRetryMs);
- LOG.info("TCP NoDelay set to: {}", nodelay);
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
- LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
- }
-
- final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
-
- public int getPendingRevalidationsCount() {
- return pendingRevalidations.size();
- }
-
- // for testing
- protected static void setAsyncSending(boolean newMode) {
- asyncSending = newMode;
- LOG.info("{} = {}", LEARNER_ASYNC_SENDING, asyncSending);
-
- }
- protected static boolean getAsyncSending() {
- return asyncSending;
- }
- /**
- * validate a session for a client
- *
- * @param clientId
- * the client to be revalidated
- * @param timeout
- * the timeout for which the session is valid
- * @throws IOException
- */
- void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
- LOG.info("Revalidating client: 0x{}", Long.toHexString(clientId));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeLong(clientId);
- dos.writeInt(timeout);
- dos.close();
- QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
- pendingRevalidations.put(clientId, cnxn);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "To validate session 0x" + Long.toHexString(clientId));
- }
- writePacket(qp, true);
- }
-
- /**
- * write a packet to the leader.
- *
- * This method is called by multiple threads. We need to make sure that only one thread is writing to leaderOs at a time.
- * When packets are sent synchronously, writing is done within a synchronization block.
- * When packets are sent asynchronously, sender.queuePacket() is called, which writes to a BlockingQueue, which is thread-safe.
- * Reading from this BlockingQueue and writing to leaderOs is the learner sender thread only.
- * So we have only one thread writing to leaderOs at a time in either case.
- *
- * @param pp
- * the proposal packet to be sent to the leader
- * @throws IOException
- */
- void writePacket(QuorumPacket pp, boolean flush) throws IOException {
- if (asyncSending) {
- sender.queuePacket(pp);
- } else {
- writePacketNow(pp, flush);
- }
- }
-
- void writePacketNow(QuorumPacket pp, boolean flush) throws IOException {
- synchronized (leaderOs) {
- if (pp != null) {
- messageTracker.trackSent(pp.getType());
- leaderOs.writeRecord(pp, "packet");
- }
- if (flush) {
- bufferedOutput.flush();
- }
- }
- }
-
- /**
- * Start thread that will forward any packet in the queue to the leader
- */
- protected void startSendingThread() {
- sender = new LearnerSender(this);
- sender.start();
- }
-
- /**
- * read a packet from the leader
- *
- * @param pp
- * the packet to be instantiated
- * @throws IOException
- */
- void readPacket(QuorumPacket pp) throws IOException {
- synchronized (leaderIs) {
- leaderIs.readRecord(pp, "packet");
- messageTracker.trackReceived(pp.getType());
- }
- if (LOG.isTraceEnabled()) {
- final long traceMask =
- (pp.getType() == Leader.PING) ? ZooTrace.SERVER_PING_TRACE_MASK
- : ZooTrace.SERVER_PACKET_TRACE_MASK;
-
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
- }
- }
-
- /**
- * send a request packet to the leader
- *
- * @param request
- * the request from the client
- * @throws IOException
- */
- void request(Request request) throws IOException {
- if (request.isThrottled()) {
- LOG.error("Throttled request sent to leader: {}. Exiting", request);
- ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream oa = new DataOutputStream(baos);
- oa.writeLong(request.sessionId);
- oa.writeInt(request.cxid);
- oa.writeInt(request.type);
- if (request.request != null) {
- request.request.rewind();
- int len = request.request.remaining();
- byte[] b = new byte[len];
- request.request.get(b);
- request.request.rewind();
- oa.write(b);
- }
- oa.close();
- QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
- writePacket(qp, true);
- }
-
- /**
- * Returns the address of the node we think is the leader.
- */
- protected QuorumServer findLeader() {
- QuorumServer leaderServer = null;
- // Find the leader by id
- Vote current = self.getCurrentVote();
- for (QuorumServer s : self.getView().values()) {
- if (s.id == current.getId()) {
- // Ensure we have the leader's correct IP address before
- // attempting to connect.
- s.recreateSocketAddresses();
- leaderServer = s;
- break;
- }
- }
- if (leaderServer == null) {
- LOG.warn("Couldn't find the leader with id = {}", current.getId());
- }
- return leaderServer;
- }
-
- /**
- * Overridable helper method to return the System.nanoTime().
- * This method behaves identical to System.nanoTime().
- */
- protected long nanoTime() {
- return System.nanoTime();
- }
-
- /**
- * Overridable helper method to simply call sock.connect(). This can be
- * overriden in tests to fake connection success/failure for connectToLeader.
- */
- protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {
- sock.connect(addr, timeout);
- }
-
- /**
- * Establish a connection with the LearnerMaster found by findLearnerMaster.
- * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
- * Retries until either initLimit time has elapsed or 5 tries have happened.
- * @param multiAddr - the address of the Peer to connect to.
- * @throws IOException - if the socket connection fails on the 5th attempt
- * if there is an authentication failure while connecting to leader
- */
- protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
-
- this.leaderAddr = multiAddr;
- Set<InetSocketAddress> addresses;
- if (self.isMultiAddressReachabilityCheckEnabled()) {
- // even if none of the addresses are reachable, we want to try to establish connection
- // see ZOOKEEPER-3758
- addresses = multiAddr.getAllReachableAddressesOrAll();
- } else {
- addresses = multiAddr.getAllAddresses();
- }
- ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
- CountDownLatch latch = new CountDownLatch(addresses.size());
- AtomicReference<Socket> socket = new AtomicReference<>(null);
- addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while trying to connect to Leader", e);
- } finally {
- executor.shutdown();
- try {
- if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.error("not all the LeaderConnector terminated properly");
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
- }
- }
-
- if (socket.get() == null) {
- throw new IOException("Failed connect to " + multiAddr);
- } else {
- sock = socket.get();
- sockBeingClosed.set(false);
- }
-
- self.authLearner.authenticate(sock, hostname);
-
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
- if (asyncSending) {
- startSendingThread();
- }
- }
-
- class LeaderConnector implements Runnable {
-
- private AtomicReference<Socket> socket;
- private InetSocketAddress address;
- private CountDownLatch latch;
-
- LeaderConnector(InetSocketAddress address, AtomicReference<Socket> socket, CountDownLatch latch) {
- this.address = address;
- this.socket = socket;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- try {
- Thread.currentThread().setName("LeaderConnector-" + address);
- Socket sock = connectToLeader();
-
- if (sock != null && sock.isConnected()) {
- if (socket.compareAndSet(null, sock)) {
- LOG.info("Successfully connected to leader, using address: {}", address);
- } else {
- LOG.info("Connection to the leader is already established, close the redundant connection");
- sock.close();
- }
- }
-
- } catch (Exception e) {
- LOG.error("Failed connect to {}", address, e);
- } finally {
- latch.countDown();
- }
- }
-
- private Socket connectToLeader() throws IOException, X509Exception, InterruptedException {
- Socket sock = createSocket();
-
- // leader connection timeout defaults to tickTime * initLimit
- int connectTimeout = self.tickTime * self.initLimit;
-
- // but if connectToLearnerMasterLimit is specified, use that value to calculate
- // timeout instead of using the initLimit value
- if (self.connectToLearnerMasterLimit > 0) {
- connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
- }
-
- int remainingTimeout;
- long startNanoTime = nanoTime();
-
- for (int tries = 0; tries < 5 && socket.get() == null; tries++) {
- try {
- // recalculate the init limit time because retries sleep for 1000 milliseconds
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
- if (remainingTimeout <= 0) {
- LOG.error("connectToLeader exceeded on retries.");
- throw new IOException("connectToLeader exceeded on retries.");
- }
-
- sockConnect(sock, address, Math.min(connectTimeout, remainingTimeout));
- if (self.isSslQuorum()) {
- ((SSLSocket) sock).startHandshake();
- }
- sock.setTcpNoDelay(nodelay);
- break;
- } catch (IOException e) {
- remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1_000_000);
-
- if (remainingTimeout <= leaderConnectDelayDuringRetryMs) {
- LOG.error(
- "Unexpected exception, connectToLeader exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else if (tries >= 4) {
- LOG.error(
- "Unexpected exception, retries exceeded. tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- throw e;
- } else {
- LOG.warn(
- "Unexpected exception, tries={}, remaining init limit={}, connecting to {}",
- tries,
- remainingTimeout,
- address,
- e);
- sock = createSocket();
- }
- }
- Thread.sleep(leaderConnectDelayDuringRetryMs);
- }
-
- return sock;
- }
- }
-
- /**
- * Creating a simple or and SSL socket.
- * This can be overridden in tests to fake already connected sockets for connectToLeader.
- */
- protected Socket createSocket() throws X509Exception, IOException {
- Socket sock;
- if (self.isSslQuorum()) {
- sock = self.getX509Util().createSSLSocket();
- } else {
- sock = new Socket();
- }
- sock.setSoTimeout(self.tickTime * self.initLimit);
- return sock;
- }
-
- /**
- * Once connected to the leader or learner master, perform the handshake
- * protocol to establish a following / observing connection.
- * @param pktType
- * @return the zxid the Leader sends for synchronization purposes.
- * @throws IOException
- */
- protected long registerWithLeader(int pktType) throws IOException {
- /*
- * Send follower info, including last zxid and sid
- */
- long lastLoggedZxid = self.getLastLoggedZxid();
- QuorumPacket qp = new QuorumPacket();
- qp.setType(pktType);
- qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
-
- /*
- * Add sid to payload
- */
- LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
- ByteArrayOutputStream bsid = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
- boa.writeRecord(li, "LearnerInfo");
- qp.setData(bsid.toByteArray());
-
- writePacket(qp, true);
- readPacket(qp);
- final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
- if (qp.getType() == Leader.LEADERINFO) {
- // we are connected to a 1.0 server so accept the new epoch and read the next packet
- leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
- byte[] epochBytes = new byte[4];
- final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
- if (newEpoch > self.getAcceptedEpoch()) {
- wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
- self.setAcceptedEpoch(newEpoch);
- } else if (newEpoch == self.getAcceptedEpoch()) {
- // since we have already acked an epoch equal to the leaders, we cannot ack
- // again, but we still need to send our lastZxid to the leader so that we can
- // sync with it if it does assume leadership of the epoch.
- // the -1 indicates that this reply should not count as an ack for the new epoch
- wrappedEpochBytes.putInt(-1);
- } else {
- throw new IOException("Leaders epoch, "
- + newEpoch
- + " is less than accepted epoch, "
- + self.getAcceptedEpoch());
- }
- QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
- writePacket(ackNewEpoch, true);
- return ZxidUtils.makeZxid(newEpoch, 0);
- } else {
- if (newEpoch > self.getAcceptedEpoch()) {
- self.setAcceptedEpoch(newEpoch);
- }
- if (qp.getType() != Leader.NEWLEADER) {
- LOG.error("First packet should have been NEWLEADER");
- throw new IOException("First packet should have been NEWLEADER");
- }
- return qp.getZxid();
- }
- }
-
- /**
- * Finally, synchronize our history with the Leader (if Follower)
- * or the LearnerMaster (if Observer).
- * @param newLeaderZxid
- * @throws IOException
- * @throws InterruptedException
- */
- protected void syncWithLeader(long newLeaderZxid) throws Exception {
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
- QuorumPacket qp = new QuorumPacket();
- long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-
- QuorumVerifier newLeaderQV = null;
-
- // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
- // For SNAP and TRUNC the snapshot is needed to save that history
- boolean snapshotNeeded = true;
- boolean syncSnapshot = false;
- readPacket(qp);
- Deque<Long> packetsCommitted = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
- synchronized (zk) {
- if (qp.getType() == Leader.DIFF) {
- LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
- self.setSyncMode(QuorumPeer.SyncMode.DIFF);
- if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
- LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
- snapshotNeeded = true;
- syncSnapshot = true;
- } else {
- snapshotNeeded = false;
- }
- } else if (qp.getType() == Leader.SNAP) {
- self.setSyncMode(QuorumPeer.SyncMode.SNAP);
- LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
- // The leader is going to dump the database
- // db is clear as part of deserializeSnapshot()
- zk.getZKDatabase().deserializeSnapshot(leaderIs);
- // ZOOKEEPER-2819: overwrite config node content extracted
- // from leader snapshot with local config, to avoid potential
- // inconsistency of config node content during rolling restart.
- if (!self.isReconfigEnabled()) {
- LOG.debug("Reset config node content from local config after deserialization of snapshot.");
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- }
- String signature = leaderIs.readString("signature");
- if (!signature.equals("BenWasHere")) {
- LOG.error("Missing signature. Got {}", signature);
- throw new IOException("Missing signature");
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
- // immediately persist the latest snapshot when there is txn log gap
- syncSnapshot = true;
- } else if (qp.getType() == Leader.TRUNC) {
- //we need to truncate the log to the lastzxid of the leader
- self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
- LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
- boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
- if (!truncated) {
- // not able to truncate the log
- LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
- } else {
- LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
- ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
- }
- zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
- zk.createSessionTracker();
-
- long lastQueued = 0;
-
- // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
- // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
- // we need to make sure that we don't take the snapshot twice.
- boolean isPreZAB1_0 = true;
- //If we are not going to take the snapshot be sure the transactions are not applied in memory
- // but written out to the transaction log
- boolean writeToTxnLog = !snapshotNeeded;
- TxnLogEntry logEntry;
- // we are now going to start getting transactions to apply followed by an UPTODATE
- outerLoop:
- while (self.isRunning()) {
- readPacket(qp);
- switch (qp.getType()) {
- case Leader.PROPOSAL:
- PacketInFlight pif = new PacketInFlight();
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- pif.hdr = logEntry.getHeader();
- pif.rec = logEntry.getTxn();
- pif.digest = logEntry.getDigest();
- if (pif.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(pif.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = pif.hdr.getZxid();
-
- if (pif.hdr.getType() == OpCode.reconfig) {
- SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
- QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- }
-
- packetsNotLogged.add(pif);
- packetsNotCommitted.add(pif);
- break;
- case Leader.COMMIT:
- case Leader.COMMITANDACTIVATE:
- pif = packetsNotCommitted.peekFirst();
- if (pif.hdr.getZxid() != qp.getZxid()) {
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(qp.getZxid()),
- Long.toHexString(pif.hdr.getZxid()));
- } else {
- if (qp.getType() == Leader.COMMITANDACTIVATE) {
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(
- qv,
- ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
- true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
- if (!writeToTxnLog) {
- zk.processTxn(pif.hdr, pif.rec);
- packetsNotLogged.remove();
- packetsNotCommitted.remove();
- } else {
- packetsNotCommitted.remove();
- packetsCommitted.add(qp.getZxid());
- }
- }
- break;
- case Leader.INFORM:
- case Leader.INFORMANDACTIVATE:
- PacketInFlight packet = new PacketInFlight();
-
- if (qp.getType() == Leader.INFORMANDACTIVATE) {
- ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
- long suggestedLeaderId = buffer.getLong();
- byte[] remainingdata = new byte[buffer.remaining()];
- buffer.get(remainingdata);
- logEntry = SerializeUtils.deserializeTxn(remainingdata);
- packet.hdr = logEntry.getHeader();
- packet.rec = logEntry.getTxn();
- packet.digest = logEntry.getDigest();
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- } else {
- logEntry = SerializeUtils.deserializeTxn(qp.getData());
- packet.rec = logEntry.getTxn();
- packet.hdr = logEntry.getHeader();
- packet.digest = logEntry.getDigest();
- // Log warning message if txn comes out-of-order
- if (packet.hdr.getZxid() != lastQueued + 1) {
- LOG.warn(
- "Got zxid 0x{} expected 0x{}",
- Long.toHexString(packet.hdr.getZxid()),
- Long.toHexString(lastQueued + 1));
- }
- lastQueued = packet.hdr.getZxid();
- }
- if (!writeToTxnLog) {
- // Apply to db directly if we haven't taken the snapshot
- zk.processTxn(packet.hdr, packet.rec);
- } else {
- packetsNotLogged.add(packet);
- packetsCommitted.add(qp.getZxid());
- }
-
- break;
- case Leader.UPTODATE:
- LOG.info("Learner received UPTODATE message");
- if (newLeaderQV != null) {
- boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
- if (isPreZAB1_0) {
- zk.takeSnapshot(syncSnapshot);
- self.setCurrentEpoch(newEpoch);
- }
- self.setZooKeeperServer(zk);
- self.adminServer.setZooKeeperServer(zk);
- break outerLoop;
- case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
- // means this is Zab 1.0
- LOG.info("Learner received NEWLEADER message");
- if (qp.getData() != null && qp.getData().length > 1) {
- try {
- QuorumVerifier qv = self.configFromString(new String(qp.getData(), UTF_8));
- self.setLastSeenQuorumVerifier(qv, true);
- newLeaderQV = qv;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- if (snapshotNeeded) {
- zk.takeSnapshot(syncSnapshot);
- }
-
- self.setCurrentEpoch(newEpoch);
- writeToTxnLog = true;
- //Anything after this needs to go to the transaction log, not applied directly in memory
- isPreZAB1_0 = false;
-
- // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(true);
- for (PacketInFlight p : packetsNotLogged) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- packetsNotLogged.clear();
- fzk.syncProcessor.syncFlush();
- }
-
- writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
-
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(false);
- fzk.syncProcessor.syncFlush();
- }
- break;
- }
- }
- }
- ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
- writePacket(ack, true);
- zk.startServing();
- /*
- * Update the election vote here to ensure that all members of the
- * ensemble report the same vote to new servers that start up and
- * send leader election notifications to the ensemble.
- *
- * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
- */
- self.updateElectionVote(newEpoch);
-
- // We need to log the stuff that came in between the snapshot and the uptodate
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- for (Long zxid : packetsCommitted) {
- fzk.commit(zxid);
- }
- } else if (zk instanceof ObserverZooKeeperServer) {
- // Similar to follower, we need to log requests between the snapshot
- // and UPTODATE
- ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
- Long zxid = packetsCommitted.peekFirst();
- if (p.hdr.getZxid() != zxid) {
- // log warning message if there is no matching commit
- // old leader send outstanding proposal to observer
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(zxid),
- Long.toHexString(p.hdr.getZxid()));
- continue;
- }
- packetsCommitted.remove();
- Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
- request.setTxn(p.rec);
- request.setHdr(p.hdr);
- request.setTxnDigest(p.digest);
- ozk.commitRequest(request);
- }
- } else {
- // New server type need to handle in-flight packets
- throw new UnsupportedOperationException("Unknown server type");
- }
- }
-
- protected void revalidate(QuorumPacket qp) throws IOException {
- ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
- DataInputStream dis = new DataInputStream(bis);
- long sessionId = dis.readLong();
- boolean valid = dis.readBoolean();
- ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
- if (cnxn == null) {
- LOG.warn("Missing session 0x{} for validation", Long.toHexString(sessionId));
- } else {
- zk.finishSessionInit(cnxn, valid);
- }
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(
- LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
- }
- }
-
- protected void ping(QuorumPacket qp) throws IOException {
- // Send back the ping with our session data
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- Map<Long, Integer> touchTable = zk.getTouchSnapshot();
- for (Entry<Long, Integer> entry : touchTable.entrySet()) {
- dos.writeLong(entry.getKey());
- dos.writeInt(entry.getValue());
- }
-
- QuorumPacket pingReply = new QuorumPacket(qp.getType(), qp.getZxid(), bos.toByteArray(), qp.getAuthinfo());
- writePacket(pingReply, true);
- }
-
- /**
- * Shutdown the Peer
- */
- public void shutdown() {
- self.setZooKeeperServer(null);
- self.closeAllConnections();
- self.adminServer.setZooKeeperServer(null);
-
- if (sender != null) {
- sender.shutdown();
- }
-
- closeSocket();
- // shutdown previous zookeeper
- if (zk != null) {
- // If we haven't finished SNAP sync, force fully shutdown
- // to avoid potential inconsistency
- zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
- }
- }
-
- boolean isRunning() {
- return self.isRunning() && zk.isRunning();
- }
-
- void closeSocket() {
- if (sock != null) {
- if (sockBeingClosed.compareAndSet(false, true)) {
- if (closeSocketAsync) {
- final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
- closingThread.setDaemon(true);
- closingThread.start();
- } else {
- closeSockSync();
- }
- }
- }
- }
-
- void closeSockSync() {
- try {
- long startTime = Time.currentElapsedTime();
- if (sock != null) {
- sock.close();
- sock = null;
- }
- ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
- } catch (IOException e) {
- LOG.warn("Ignoring error closing connection to leader", e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
deleted file mode 100644
index c1dc5cf2b8c..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
+++ /dev/null
@@ -1,185 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServerBean;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Parent class for all ZooKeeperServers for Learners
- */
-public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
-
- /*
- * Request processors
- */
- protected CommitProcessor commitProcessor;
- protected SyncRequestProcessor syncProcessor;
-
- public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) throws IOException {
- super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self);
- }
-
- /**
- * Abstract method to return the learner associated with this server.
- * Since the Learner may change under our feet (when QuorumPeer reassigns
- * it) we can't simply take a reference here. Instead, we need the
- * subclasses to implement this.
- */
- public abstract Learner getLearner();
-
- /**
- * Returns the current state of the session tracker. This is only currently
- * used by a Learner to build a ping response packet.
- *
- */
- protected Map<Long, Integer> getTouchSnapshot() {
- if (sessionTracker != null) {
- return ((LearnerSessionTracker) sessionTracker).snapshot();
- }
- Map<Long, Integer> map = Collections.emptyMap();
- return map;
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getId();
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(
- this,
- getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime,
- self.getId(),
- self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- @Override
- protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
- if (upgradeableSessionTracker.isLocalSession(sessionId)) {
- super.revalidateSession(cnxn, sessionId, sessionTimeout);
- } else {
- getLearner().validateSession(cnxn, sessionId, sessionTimeout);
- }
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
- // register with JMX
- if (self.jmxLeaderElectionBean != null) {
- try {
- MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- self.jmxLeaderElectionBean = null;
- }
-
- try {
- jmxServerBean = serverBean;
- MBeanRegistry.getInstance().register(serverBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(Learner peer) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- }
- else {
- LOG.info("Shutting down");
- try {
- if (syncProcessor != null) {
- // Shutting down the syncProcessor here, first, ensures queued transactions here are written to
- // permanent storage, which ensures that crash recovery data is consistent with what is used for a
- // leader election immediately following shutdown, because of the old leader going down; and also
- // that any state on its way to being written is also loaded in the potential call to
- // fast-forward-from-edits, in super.shutdown(...), so we avoid getting a DIFF from the new leader
- // that contains entries we have already written to our transaction log.
- syncProcessor.shutdown();
- }
- }
- catch (Exception e) {
- LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
- }
- }
- try {
- super.shutdown(fullyShutDown);
- } catch (Exception e) {
- LOG.warn("Ignoring unexpected exception during shutdown", e);
- }
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
deleted file mode 100644
index 37ca16ed52b..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.SyncRequestProcessor;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiConsumer;
-
-/**
- * A ZooKeeperServer for the Observer node type. Not much is different, but
- * we anticipate specializing the request processors in the future.
- *
- */
-public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(ObserverZooKeeperServer.class);
-
- /**
- * Enable since request processor for writing txnlog to disk and
- * take periodic snapshot. Default is ON.
- */
-
- private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
-
- /*
- * Pending sync requests
- */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
-
- ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
- LOG.info("syncEnabled ={}", syncRequestProcessorEnabled);
- }
-
- public Observer getObserver() {
- return self.observer;
- }
-
- @Override
- public Learner getLearner() {
- return self.observer;
- }
-
- /**
- * Unlike a Follower, which sees a full request only during the PROPOSAL
- * phase, Observers get all the data required with the INFORM packet.
- * This method commits a request that has been unpacked by from an INFORM
- * received from the Leader.
- *
- * @param request
- */
- public void commitRequest(Request request) {
- if (syncProcessor != null) {
- // Write to txnlog and take periodic snapshot
- syncProcessor.processRequest(request);
- }
- commitProcessor.commit(request);
- }
-
- /**
- * Set up the request processors for an Observer:
- * firstProcesor-&gt;commitProcessor-&gt;finalProcessor
- */
- @Override
- protected void setupRequestProcessors() {
- // We might consider changing the processor behaviour of
- // Observers to, for example, remove the disk sync requirements.
- // Currently, they behave almost exactly the same as followers.
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
- commitProcessor.start();
- firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
- ((ObserverRequestProcessor) firstProcessor).start();
-
- /*
- * Observer should write to disk, so that the it won't request
- * too old txn from the leader which may lead to getting an entire
- * snapshot.
- *
- * However, this may degrade performance as it has to write to disk
- * and do periodic snapshot which may double the memory requirements
- */
- if (syncRequestProcessorEnabled) {
- syncProcessor = new SyncRequestProcessor(this, null);
- syncProcessor.start();
- }
- else {
- syncProcessor = null;
- }
- }
-
- /*
- * Process a sync request
- */
- public synchronized void sync() {
- if (pendingSyncs.size() == 0) {
- LOG.warn("Not expecting a sync.");
- return;
- }
-
- Request r = pendingSyncs.remove();
- commitProcessor.commit(r);
- }
-
- @Override
- public String getState() {
- return "observer";
- }
-
- @Override
- public void dumpMonitorValues(BiConsumer<String, Object> response) {
- super.dumpMonitorValues(response);
- response.accept("observer_master_id", getObserver().getLearnerMasterId());
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
deleted file mode 100644
index b74ca0d716b..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++ /dev/null
@@ -1,240 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.jmx.MBeanRegistry;
-import org.apache.zookeeper.server.DataTreeBean;
-import org.apache.zookeeper.server.FinalRequestProcessor;
-import org.apache.zookeeper.server.PrepRequestProcessor;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.ZooKeeperServerBean;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * A ZooKeeperServer which comes into play when peer is partitioned from the
- * majority. Handles read-only clients, but drops connections from not-read-only
- * ones.
- * <p>
- * The very first processor in the chain of request processors is a
- * ReadOnlyRequestProcessor which drops state-changing requests.
- */
-public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
-
- protected final QuorumPeer self;
- private volatile boolean shutdown = false;
-
- ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
- super(
- logFactory,
- self.tickTime,
- self.minSessionTimeout,
- self.maxSessionTimeout,
- self.clientPortListenBacklog,
- zkDb,
- self.getInitialConfig(),
- self.isReconfigEnabled());
- this.self = self;
- }
-
- @Override
- protected void setupRequestProcessors() {
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
- ((PrepRequestProcessor) prepProcessor).start();
- firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
- ((ReadOnlyRequestProcessor) firstProcessor).start();
- }
-
- @Override
- public synchronized void startup() {
- // check to avoid startup follows shutdown
- if (shutdown) {
- LOG.warn("Not starting Read-only server as startup follows shutdown!");
- return;
- }
- registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean);
- super.startup();
- self.setZooKeeperServer(this);
- self.adminServer.setZooKeeperServer(this);
- LOG.info("Read-only server started");
- }
-
- @Override
- public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(
- this, getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
- getZooKeeperServerListener());
- }
-
- @Override
- protected void startSessionTracker() {
- ((LearnerSessionTracker) sessionTracker).start();
- }
-
- @Override
- protected void setLocalSessionFlag(Request si) {
- switch (si.type) {
- case OpCode.createSession:
- if (self.areLocalSessionsEnabled()) {
- si.setLocalSession(true);
- }
- break;
- case OpCode.closeSession:
- if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) {
- si.setLocalSession(true);
- } else {
- LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode",
- Long.toHexString(si.sessionId));
- }
- break;
- default:
- break;
- }
- }
-
- @Override
- protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
- if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
- String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
- LOG.info(msg);
- throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
- }
- }
-
- @Override
- protected void registerJMX() {
- // register with JMX
- try {
- jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
- MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxDataTreeBean = null;
- }
- }
-
- public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
- // register with JMX
- try {
- jmxServerBean = serverBean;
- MBeanRegistry.getInstance().register(serverBean, localPeerBean);
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- jmxServerBean = null;
- }
- }
-
- @Override
- protected void unregisterJMX() {
- // unregister from JMX
- try {
- if (jmxDataTreeBean != null) {
- MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxDataTreeBean = null;
- }
-
- protected void unregisterJMX(ZooKeeperServer zks) {
- // unregister from JMX
- try {
- if (jmxServerBean != null) {
- MBeanRegistry.getInstance().unregister(jmxServerBean);
- }
- } catch (Exception e) {
- LOG.warn("Failed to unregister with JMX", e);
- }
- jmxServerBean = null;
- }
-
- @Override
- public String getState() {
- return "read-only";
- }
-
- /**
- * Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
- */
- @Override
- public long getServerId() {
- return self.getId();
- }
-
- @Override
- public synchronized void shutdown(boolean fullyShutDown) {
- if (!canShutdown()) {
- super.shutdown(fullyShutDown);
- LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- }
- else {
- shutdown = true;
- unregisterJMX(this);
-
- // set peer's server to null
- self.setZooKeeperServer(null);
- // clear all the connections
- self.closeAllConnections();
-
- self.adminServer.setZooKeeperServer(null);
- }
- // shutdown the server itself
- super.shutdown(fullyShutDown);
- }
-
- @Override
- public void dumpConf(PrintWriter pwriter) {
- super.dumpConf(pwriter);
-
- pwriter.print("initLimit=");
- pwriter.println(self.getInitLimit());
- pwriter.print("syncLimit=");
- pwriter.println(self.getSyncLimit());
- pwriter.print("electionAlg=");
- pwriter.println(self.getElectionType());
- pwriter.print("electionPort=");
- pwriter.println(self.getElectionAddress().getAllPorts()
- .stream().map(Objects::toString).collect(Collectors.joining("|")));
- pwriter.print("quorumPort=");
- pwriter.println(self.getQuorumAddress().getAllPorts()
- .stream().map(Objects::toString).collect(Collectors.joining("|")));
- pwriter.print("peerType=");
- pwriter.println(self.getLearnerType().ordinal());
- }
-
- @Override
- protected void setState(State state) {
- this.state = state;
- }
-
-}
diff --git a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
deleted file mode 100644
index ec4c326e9aa..00000000000
--- a/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server.quorum;
-
-import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
-import org.apache.zookeeper.server.ServerMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Flushable;
-import java.io.IOException;
-import java.net.Socket;
-
-public class SendAckRequestProcessor implements RequestProcessor, Flushable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
-
- Learner learner;
-
- SendAckRequestProcessor(Learner peer) {
- this.learner = peer;
- }
-
- public void processRequest(Request si) {
- if (si.type != OpCode.sync) {
- QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
- try {
- si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
-
- learner.writePacket(qp, false);
- } catch (IOException e) {
- LOG.warn("Closing connection to leader, exception during packet send", e);
- try {
- if (!learner.sock.isClosed()) {
- learner.sock.close();
- }
- } catch (IOException e1) {
- // Nothing to do, we are shutting things down, so an exception here is irrelevant
- LOG.debug("Ignoring error closing the connection", e1);
- }
- }
- }
- }
-
- public void flush() throws IOException {
- try {
- learner.writePacket(null, true);
- } catch (IOException e) {
- LOG.warn("Closing connection to leader, exception during packet send", e);
- try {
- Socket socket = learner.sock;
- if (socket != null && !socket.isClosed()) {
- learner.sock.close();
- }
- } catch (IOException e1) {
- // Nothing to do, we are shutting things down, so an exception here is irrelevant
- LOG.debug("Ignoring error closing the connection", e1);
- }
- }
- }
-
- public void shutdown() {
- // Nothing needed
- }
-
-}