/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.zip.Adler32; import java.util.zip.CheckedInputStream; import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.InputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.Quotas; import org.apache.zookeeper.StatsTrack; import org.apache.zookeeper.Version; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.ZookeeperBanner; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.common.StringUtils; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.StatPersisted; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.AuthPacket; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.GetSASLRequest; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.SetACLRequest; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.proto.SetSASLResponse; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; import org.apache.zookeeper.server.SessionTracker.Session; import org.apache.zookeeper.server.SessionTracker.SessionExpirer; import org.apache.zookeeper.server.auth.ProviderRegistry; import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; import org.apache.zookeeper.server.util.JvmPauseMonitor; import org.apache.zookeeper.server.util.OSMXBean; import org.apache.zookeeper.server.util.QuotaMetricsUtils; import org.apache.zookeeper.server.util.RequestPathMetricsCollector; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.TxnDigest; import org.apache.zookeeper.txn.TxnHeader; import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class implements a simple standalone ZooKeeperServer. It sets up the * following chain of RequestProcessors to process requests: * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor */ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected static final Logger LOG; private static final RateLogger RATE_LOGGER; public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; public static final String SKIP_ACL = "zookeeper.skipACL"; public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; // When enabled, will check ACL constraints appertained to the requests first, // before sending the requests to the quorum. static boolean enableEagerACLCheck; static final boolean skipACL; public static final boolean enforceQuota; public static final String SASL_SUPER_USER = "zookeeper.superUser"; public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; private static boolean serializeLastProcessedZxidEnabled; // Add a enable/disable option for now, we should remove this one when // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; private static boolean closeSessionTxnEnabled = true; private volatile CountDownLatch restoreLatch; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); RATE_LOGGER = new RateLogger(LOG); ZookeeperBanner.printBanner(LOG); Environment.logEnv("Server environment:", LOG); enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); if (skipACL) { LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); } enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false")); if (enforceQuota) { LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); } digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); } // @VisibleForTesting public static boolean isEnableEagerACLCheck() { return enableEagerACLCheck; } // @VisibleForTesting public static void setEnableEagerACLCheck(boolean enabled) { ZooKeeperServer.enableEagerACLCheck = enabled; LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled); } public static boolean isCloseSessionTxnEnabled() { return closeSessionTxnEnabled; } public static void setCloseSessionTxnEnabled(boolean enabled) { ZooKeeperServer.closeSessionTxnEnabled = enabled; LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, ZooKeeperServer.closeSessionTxnEnabled); } protected ZooKeeperServerBean jmxServerBean; protected DataTreeBean jmxDataTreeBean; public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled protected static volatile int throttledOpWaitTime = Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME); /** value of -1 indicates unset, use default */ protected int minSessionTimeout = -1; /** value of -1 indicates unset, use default */ protected int maxSessionTimeout = -1; /** Socket listen backlog. Value of -1 indicates unset */ protected int listenBacklog = -1; protected SessionTracker sessionTracker; private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; private ResponseCache readResponseCache; private ResponseCache getChildrenResponseCache; private final AtomicLong hzxid = new AtomicLong(0); public static final Exception ok = new Exception("No prob"); protected RequestProcessor firstProcessor; protected JvmPauseMonitor jvmPauseMonitor; protected volatile State state = State.INITIAL; private boolean isResponseCachingEnabled = true; /* contains the configuration file content read at startup */ protected String initialConfig; protected boolean reconfigEnabled; private final RequestPathMetricsCollector requestPathMetricsCollector; private static final int DEFAULT_SNAP_COUNT = 100000; private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000; private boolean localSessionEnabled = false; protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR } /** * This is the secret that we use to generate passwords. For the moment, * it's more of a checksum that's used in reconnection, which carries no * security weight, and is treated internally as if it carries no * security weight. */ private static final long superSecret = 0XB3415C00L; private final AtomicInteger requestsInProcess = new AtomicInteger(0); final Deque outstandingChanges = new ArrayDeque<>(); // this data structure must be accessed under the outstandingChanges lock final Map outstandingChangesForPath = new HashMap<>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; private final ServerStats serverStats; private final ZooKeeperServerListener listener; private ZooKeeperServerShutdownHandler zkShutdownHandler; private volatile int createSessionTrackerServerId = 1; private static final String FLUSH_DELAY = "zookeeper.flushDelay"; private static volatile long flushDelay; private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; private static volatile long maxWriteQueuePollTime; private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; private static volatile int maxBatchSize; /** * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. * Flag not used for small transfers like connectResponses. */ public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; public static final int intBufferStartingSizeBytes; public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; static { long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); setFlushDelay(configuredFlushDelay); setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); if (intBufferStartingSizeBytes < 32) { String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. " + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=\" "; LOG.error(msg); throw new IllegalArgumentException(msg); } LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); } // Connection throttling private final BlueThrottle connThrottle = new BlueThrottle(); private RequestThrottler requestThrottler; public static final String SNAP_COUNT = "zookeeper.snapCount"; /** * This setting sets a limit on the total number of large requests that * can be inflight and is designed to prevent ZooKeeper from accepting * too many large requests such that the JVM runs out of usable heap and * ultimately crashes. * * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} * method which is called by the connection layer ({@link NIOServerCnxn}, * {@link NettyServerCnxn}) before allocating a byte buffer and pulling * data off the TCP socket. The limit is then checked again by the * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which * also atomically updates {@link #currentLargeRequestBytes}. The request is * then marked as a large request, with the request size stored in the Request * object so that it can later be decremented from {@link #currentLargeRequestBytes}. * * When a request is completed or dropped, the relevant code path calls the * {@link #requestFinished(Request)} method which performs the decrement if * needed. */ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; /** * The size threshold after which a request is considered a large request * and is checked against the large request byte limit. */ private volatile int largeRequestThreshold = -1; private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); private final AuthenticationHelper authHelper = new AuthenticationHelper(); void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); } /** * Creates a ZooKeeperServer instance. Nothing is setup, use the setX * methods to prepare the instance (eg datadir, datalogdir, ticktime, * builder, etc...) * */ public ZooKeeperServer() { listener = new ZooKeeperServerListenerImpl(this); serverStats = new ServerStats(this); this.requestPathMetricsCollector = new RequestPathMetricsCollector(); } /** * Keeping this constructor for backward compatibility */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); } /** * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't * actually start listening for clients until run() is invoked. * */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) { serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.txnLogFactory.setServerStats(this.serverStats); this.zkDb = zkDb; this.tickTime = tickTime; setMinSessionTimeout(minSessionTimeout); setMaxSessionTimeout(maxSessionTimeout); this.listenBacklog = clientPortListenBacklog; this.reconfigEnabled = reconfigEnabled; listener = new ZooKeeperServerListenerImpl(this); readResponseCache = new ResponseCache(Integer.getInteger( GET_DATA_RESPONSE_CACHE_SIZE, ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData"); getChildrenResponseCache = new ResponseCache(Integer.getInteger( GET_CHILDREN_RESPONSE_CACHE_SIZE, ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren"); this.initialConfig = initialConfig; this.requestPathMetricsCollector = new RequestPathMetricsCollector(); this.initLargeRequestThrottlingSettings(); LOG.info( "Created server with" + " tickTime {} ms" + " minSessionTimeout {} ms" + " maxSessionTimeout {} ms" + " clientPortListenBacklog {}" + " datadir {}" + " snapdir {}", tickTime, getMinSessionTimeout(), getMaxSessionTimeout(), getClientPortListenBacklog(), txnLogFactory.getDataDir(), txnLogFactory.getSnapDir()); } public String getInitialConfig() { return initialConfig; } /** * Adds JvmPauseMonitor and calls * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} * */ public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); this.jvmPauseMonitor = jvmPauseMonitor; if (jvmPauseMonitor != null) { LOG.info("Added JvmPauseMonitor to server"); } } /** * creates a zookeeperserver instance. * @param txnLogFactory the file transaction snapshot logging class * @param tickTime the ticktime for the server */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) { this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled()); } public ServerStats serverStats() { return serverStats; } public RequestPathMetricsCollector getRequestPathMetricsCollector() { return requestPathMetricsCollector; } public BlueThrottle connThrottle() { return connThrottle; } public void dumpConf(PrintWriter pwriter) { pwriter.print("clientPort="); pwriter.println(getClientPort()); pwriter.print("secureClientPort="); pwriter.println(getSecureClientPort()); pwriter.print("dataDir="); pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); pwriter.print("dataDirSize="); pwriter.println(getDataDirSize()); pwriter.print("dataLogDir="); pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); pwriter.print("dataLogSize="); pwriter.println(getLogDirSize()); pwriter.print("tickTime="); pwriter.println(getTickTime()); pwriter.print("maxClientCnxns="); pwriter.println(getMaxClientCnxnsPerHost()); pwriter.print("minSessionTimeout="); pwriter.println(getMinSessionTimeout()); pwriter.print("maxSessionTimeout="); pwriter.println(getMaxSessionTimeout()); pwriter.print("clientPortListenBacklog="); pwriter.println(getClientPortListenBacklog()); pwriter.print("serverId="); pwriter.println(getServerId()); } public ZooKeeperServerConf getConf() { return new ZooKeeperServerConf( getClientPort(), zkDb.snapLog.getSnapDir().getAbsolutePath(), zkDb.snapLog.getDataDir().getAbsolutePath(), getTickTime(), getMaxClientCnxnsPerHost(), getMinSessionTimeout(), getMaxSessionTimeout(), getServerId(), getClientPortListenBacklog()); } /** * This constructor is for backward compatibility with the existing unit * test code. * It defaults to FileLogProvider persistence provider. */ public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); } /** * Default constructor, relies on the config for its argument values * * @throws IOException */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled()); } /** * get the zookeeper database for this server * @return the zookeeper database for this server */ public ZKDatabase getZKDatabase() { return this.zkDb; } /** * set the zkdatabase for this zookeeper server * @param zkDb */ public void setZKDatabase(ZKDatabase zkDb) { this.zkDb = zkDb; } /** * Restore sessions and data */ public void loadData() throws IOException, InterruptedException { /* * When a new leader starts executing Leader#lead, it * invokes this method. The database, however, has been * initialized before running leader election so that * the server could pick its zxid for its initial vote. * It does it by invoking QuorumPeer#getLastLoggedZxid. * Consequently, we don't need to initialize it once more * and avoid the penalty of loading it a second time. Not * reloading it is particularly important for applications * that host a large database. * * The following if block checks whether the database has * been initialized or not. Note that this method is * invoked by at least one other method: * ZooKeeperServer#startdata. * * See ZOOKEEPER-1642 for more detail. */ if (zkDb.isInitialized()) { setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDataBase()); } // Clean up dead sessions zkDb.getSessions().stream() .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); // Make a clean snapshot takeSnapshot(); } public File takeSnapshot() throws IOException { return takeSnapshot(false); } public File takeSnapshot(boolean syncSnap) throws IOException { return takeSnapshot(syncSnap, true, false); } /** * Takes a snapshot on the server. * * @param syncSnap syncSnap sync the snapshot immediately after write * @param isSevere if true system exist, otherwise throw IOException * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions * * @return file snapshot file object * @throws IOException */ public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { long start = Time.currentElapsedTime(); File snapFile = null; try { if (fastForwardFromEdits) { zkDb.fastForwardDataBase(); } snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { if (isSevere) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, // so we need to exit ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); } else { throw e; } } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in {} ms", elapsed); ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); return snapFile; } /** * Restores database from a snapshot. It is used by the restore admin server command. * * @param inputStream input stream of snapshot * @return last processed zxid */ public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { if (inputStream == null) { throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); } long start = Time.currentElapsedTime(); LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", getZKDatabase().getDataTreeLastProcessedZxid(), getZKDatabase().dataTree.getNodeCount(), getZKDatabase().getSessionCount()); // restore to a new zkDatabase final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); final InputArchive ia = BinaryInputArchive.getArchive(cis); newZKDatabase.deserializeSnapshot(ia, cis); LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", newZKDatabase.getDataTreeLastProcessedZxid(), newZKDatabase.dataTree.getNodeCount(), newZKDatabase.getSessionCount()); // create a CountDownLatch restoreLatch = new CountDownLatch(1); try { // set to the new zkDatabase setZKDatabase(newZKDatabase); // re-create SessionTrack createSessionTracker(); } finally { // unblock request submission restoreLatch.countDown(); restoreLatch = null; } LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", getZKDatabase().getDataTreeLastProcessedZxid(), getZKDatabase().dataTree.getNodeCount(), getZKDatabase().getSessionCount()); long elapsed = Time.currentElapsedTime() - start; LOG.info("Restore taken in {} ms", elapsed); ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); return getLastProcessedZxid(); } public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); } @Override public long getDataDirSize() { if (zkDb == null) { return 0L; } File path = zkDb.snapLog.getDataDir(); return getDirSize(path); } @Override public long getLogDirSize() { if (zkDb == null) { return 0L; } File path = zkDb.snapLog.getSnapDir(); return getDirSize(path); } private long getDirSize(File file) { long size = 0L; if (file.isDirectory()) { File[] files = file.listFiles(); if (files != null) { for (File f : files) { size += getDirSize(f); } } } else { size = file.length(); } return size; } public long getZxid() { return hzxid.get(); } public SessionTracker getSessionTracker() { return sessionTracker; } long getNextZxid() { return hzxid.incrementAndGet(); } public void setZxid(long zxid) { hzxid.set(zxid); } private void close(long sessionId) { Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); submitRequest(si); } public void closeSession(long sessionId) { LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); // we do not want to wait for a session close. send it as soon as we // detect it! close(sessionId); } protected void killSession(long sessionId, long zxid) { zkDb.killSession(sessionId, zxid); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); } if (sessionTracker != null) { sessionTracker.removeSession(sessionId); } } public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info( "Expiring session 0x{}, timeout of {}ms exceeded", Long.toHexString(sessionId), session.getTimeout()); close(sessionId); } public void expire(long sessionId) { LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId)); close(sessionId); } public static class MissingSessionException extends IOException { private static final long serialVersionUID = 7467414635467261007L; public MissingSessionException(String msg) { super(msg); } } void touch(ServerCnxn cnxn) throws MissingSessionException { if (cnxn == null) { return; } long id = cnxn.getSessionId(); int to = cnxn.getSessionTimeout(); if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException("No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } } protected void registerJMX() { // register with JMX try { jmxServerBean = new ZooKeeperServerBean(this); MBeanRegistry.getInstance().register(jmxServerBean, null); try { jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxDataTreeBean = null; } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxServerBean = null; } } public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } } public synchronized void startup() { startupWithServerState(State.RUNNING); } public synchronized void startupWithoutServing() { startupWithServerState(State.INITIAL); } public synchronized void startServing() { setState(State.RUNNING); notifyAll(); } private void startupWithServerState(State state) { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); setState(state); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); notifyAll(); } protected void startJvmPauseMonitor() { if (this.jvmPauseMonitor != null) { this.jvmPauseMonitor.serviceStart(); } } protected void startRequestThrottler() { requestThrottler = createRequestThrottler(); requestThrottler.start(); } protected RequestThrottler createRequestThrottler() { return new RequestThrottler(this); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor) syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor) firstProcessor).start(); } public ZooKeeperServerListener getZooKeeperServerListener() { return listener; } /** * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to * {@link #startup()} being called * * @param newId ID to use */ public void setCreateSessionTrackerServerId(int newId) { createSessionTrackerServerId = newId; } protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl) sessionTracker).start(); } /** * Sets the state of ZooKeeper server. After changing the state, it notifies * the server state change to a registered shutdown handler, if any. *

* The following are the server state transitions: *

  • During startup the server will be in the INITIAL state.
  • *
  • After successfully starting, the server sets the state to RUNNING. *
  • *
  • The server transitions to the ERROR state if it hits an internal * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource * error events, e.g., SyncRequestProcessor not being able to write a txn to * disk.
  • *
  • During shutdown the server sets the state to SHUTDOWN, which * corresponds to the server not running.
  • * *
  • During maintenance (e.g. restore) the server sets the state to MAINTENANCE *
* * @param state new server state. */ protected void setState(State state) { this.state = state; // Notify server state changes to the registered shutdown handler, if any. if (zkShutdownHandler != null) { zkShutdownHandler.handle(state); } else { LOG.debug( "ZKShutdownHandler is not registered, so ZooKeeper server" + " won't take any action on ERROR or SHUTDOWN server state changes"); } } /** * This can be used while shutting down the server to see whether the server * is already shutdown or not. * * @return true if the server is running or server hits an error, false * otherwise. */ protected boolean canShutdown() { return state == State.RUNNING || state == State.ERROR; } /** * @return true if the server is running, false otherwise. */ public boolean isRunning() { return state == State.RUNNING; } public void shutdown() { shutdown(false); } /** * Shut down the server instance * @param fullyShutDown true if another server using the same database will not replace this one in the same process */ public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { if (fullyShutDown && zkDb != null) { zkDb.clear(); } LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; } LOG.info("shutting down"); // new RuntimeException("Calling shutdown").printStackTrace(); setState(State.SHUTDOWN); // unregister all metrics that are keeping a strong reference to this object // subclasses will do their specific clean up unregisterMetrics(); if (requestThrottler != null) { requestThrottler.shutdown(); } // Since sessionTracker and syncThreads poll we just have to // set running to false and they will detect it during the poll // interval. if (sessionTracker != null) { sessionTracker.shutdown(); } if (firstProcessor != null) { firstProcessor.shutdown(); } if (jvmPauseMonitor != null) { jvmPauseMonitor.serviceStop(); } if (zkDb != null) { if (fullyShutDown) { zkDb.clear(); } else { // else there is no need to clear the database // * When a new quorum is established we can still apply the diff // on top of the same zkDb data // * If we fetch a new snapshot from leader, the zkDb will be // cleared anyway before loading the snapshot try { // This will fast-forward the database to the latest recorded transactions zkDb.fastForwardDataBase(); } catch (IOException e) { LOG.error("Error updating DB", e); zkDb.clear(); } } } requestPathMetricsCollector.shutdown(); unregisterJMX(); } protected void unregisterJMX() { // unregister from JMX try { if (jmxDataTreeBean != null) { MBeanRegistry.getInstance().unregister(jmxDataTreeBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } try { if (jmxServerBean != null) { MBeanRegistry.getInstance().unregister(jmxServerBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxServerBean = null; jmxDataTreeBean = null; } public void incInProcess() { requestsInProcess.incrementAndGet(); } public void decInProcess() { requestsInProcess.decrementAndGet(); if (requestThrottler != null) { requestThrottler.throttleWake(); } } public int getInProcess() { return requestsInProcess.get(); } public int getInflight() { return requestThrottleInflight(); } private int requestThrottleInflight() { if (requestThrottler != null) { return requestThrottler.getInflight(); } return 0; } static class PrecalculatedDigest { final long nodeDigest; final long treeDigest; PrecalculatedDigest(long nodeDigest, long treeDigest) { this.nodeDigest = nodeDigest; this.treeDigest = treeDigest; } } /** * This structure is used to facilitate information sharing between PrepRP * and FinalRP. */ static class ChangeRecord { PrecalculatedDigest precalculatedDigest; byte[] data; ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List acl) { this.zxid = zxid; this.path = path; this.stat = stat; this.childCount = childCount; this.acl = acl; } long zxid; String path; StatPersisted stat; /* Make sure to create a new object when changing */ int childCount; List acl; /* Make sure to create a new object when changing */ ChangeRecord duplicate(long zxid) { StatPersisted stat = new StatPersisted(); if (this.stat != null) { DataTree.copyStatPersisted(this.stat, stat); } ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList<>() : new ArrayList<>(acl)); changeRecord.precalculatedDigest = precalculatedDigest; changeRecord.data = data; return changeRecord; } } byte[] generatePasswd(long id) { Random r = new Random(id ^ superSecret); byte[] p = new byte[16]; r.nextBytes(p); return p; } protected boolean checkPasswd(long sessionId, byte[] passwd) { return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); } long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { if (passwd == null) { // Possible since it's just deserialized from a packet on the wire. passwd = new byte[0]; } long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); CreateSessionTxn txn = new CreateSessionTxn(timeout); cnxn.setSessionId(sessionId); Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); submitRequest(si); return sessionId; } /** * set the owner of this session as owner * @param id the session id * @param owner the owner of the session * @throws SessionExpiredException */ public void setOwner(long id, Object owner) throws SessionExpiredException { sessionTracker.setOwner(id, owner); } protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); } finishSessionInit(cnxn, rc); } public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { if (checkPasswd(sessionId, passwd)) { revalidateSession(cnxn, sessionId, sessionTimeout); } else { LOG.warn( "Incorrect password from {} for session 0x{}", cnxn.getRemoteSocketAddress(), Long.toHexString(sessionId)); finishSessionInit(cnxn, false); } } public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { if (valid) { if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { serverCnxnFactory.registerConnection(cnxn); } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { secureServerCnxnFactory.registerConnection(cnxn); } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } try { ConnectResponse rsp = new ConnectResponse( 0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], this instanceof ReadOnlyZooKeeperServer); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); if (valid) { LOG.debug( "Established session 0x{} with negotiated timeout {} for client {}", Long.toHexString(cnxn.getSessionId()), cnxn.getSessionTimeout(), cnxn.getRemoteSocketAddress()); cnxn.enableRecv(); } else { LOG.info( "Invalid session 0x{} for client {}, probably expired", Long.toHexString(cnxn.getSessionId()), cnxn.getRemoteSocketAddress()); cnxn.sendBuffer(ServerCnxnFactory.closeConn); } } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); } } public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { closeSession(cnxn.getSessionId()); } public long getServerId() { return 0; } /** * If the underlying Zookeeper server support local session, this method * will set a isLocalSession to true if a request is associated with * a local session. * * @param si */ protected void setLocalSessionFlag(Request si) { } public void submitRequest(Request si) { if (restoreLatch != null) { try { LOG.info("Blocking request submission while restore is in progress"); restoreLatch.await(); } catch (final InterruptedException e) { LOG.warn("Unexpected interruption", e); } } enqueueRequest(si); } public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } requestThrottler.submitRequest(si); } public void submitRequestNow(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { LOG.debug("Dropping request.", e); // Update request accounting/throttling limits requestFinished(si); } catch (RequestProcessorException e) { LOG.error("Unable to process request", e); // Update request accounting/throttling limits requestFinished(si); } } public static int getSnapCount() { int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT); // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor if (snapCount < 2) { LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); snapCount = 2; } return snapCount; } public int getGlobalOutstandingLimit() { return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT); } public static long getSnapSizeInBytes() { long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default if (size <= 0) { LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); } return size * 1024; // Convert to bytes } public void setServerCnxnFactory(ServerCnxnFactory factory) { serverCnxnFactory = factory; } public ServerCnxnFactory getServerCnxnFactory() { return serverCnxnFactory; } public ServerCnxnFactory getSecureServerCnxnFactory() { return secureServerCnxnFactory; } public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { secureServerCnxnFactory = factory; } /** * return the last processed id from the * datatree */ public long getLastProcessedZxid() { return zkDb.getDataTreeLastProcessedZxid(); } /** * return the outstanding requests * in the queue, which haven't been * processed yet */ public long getOutstandingRequests() { return getInProcess(); } /** * return the total number of client connections that are alive * to this server */ public int getNumAliveConnections() { int numAliveConnections = 0; if (serverCnxnFactory != null) { numAliveConnections += serverCnxnFactory.getNumAliveConnections(); } if (secureServerCnxnFactory != null) { numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); } return numAliveConnections; } /** * truncate the log to get in sync with others * if in a quorum * @param zxid the zxid that it needs to get in sync * with others * @throws IOException */ public void truncateLog(long zxid) throws IOException { this.zkDb.truncateLog(zxid); } public int getTickTime() { return tickTime; } public void setTickTime(int tickTime) { LOG.info("tickTime set to {} ms", tickTime); this.tickTime = tickTime; } public static int getThrottledOpWaitTime() { return throttledOpWaitTime; } public static void setThrottledOpWaitTime(int time) { LOG.info("throttledOpWaitTime set to {} ms", time); throttledOpWaitTime = time; } public int getMinSessionTimeout() { return minSessionTimeout; } public void setMinSessionTimeout(int min) { this.minSessionTimeout = min == -1 ? tickTime * 2 : min; LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout); } public int getMaxSessionTimeout() { return maxSessionTimeout; } public void setMaxSessionTimeout(int max) { this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout); } public int getClientPortListenBacklog() { return listenBacklog; } public void setClientPortListenBacklog(int backlog) { this.listenBacklog = backlog; LOG.info("clientPortListenBacklog set to {}", backlog); } public int getClientPort() { return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; } public int getSecureClientPort() { return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; } /** Maximum number of connections allowed from particular host (ip) */ public int getMaxClientCnxnsPerHost() { if (serverCnxnFactory != null) { return serverCnxnFactory.getMaxClientCnxnsPerHost(); } if (secureServerCnxnFactory != null) { return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); } return -1; } public void setTxnLogFactory(FileTxnSnapLog txnLog) { this.txnLogFactory = txnLog; } public FileTxnSnapLog getTxnLogFactory() { return this.txnLogFactory; } /** * Returns the elapsed sync of time of transaction log in milliseconds. */ public long getTxnLogElapsedSyncTime() { return txnLogFactory.getTxnLogElapsedSyncTime(); } public String getState() { return "standalone"; } public void dumpEphemerals(PrintWriter pwriter) { zkDb.dumpEphemerals(pwriter); } public Map> getEphemerals() { return zkDb.getEphemerals(); } public double getConnectionDropChance() { return connThrottle.getDropChance(); } public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), Long.toHexString(request.getLastZxidSeen())); long sessionId = request.getSessionId(); int tokensNeeded = 1; if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { if (localSessionEnabled) { tokensNeeded = connThrottle.getRequiredTokensForLocal(); } else { tokensNeeded = connThrottle.getRequiredTokensForGlobal(); } } else { tokensNeeded = connThrottle.getRequiredTokensForRenew(); } } if (!connThrottle.checkLimit(tokensNeeded)) { throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); if (!cnxn.protocolManager.isReadonlyAvailable()) { LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(request.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } int sessionTimeout = request.getTimeOut(); byte[] passwd = request.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); if (sessionId == 0) { long id = createSession(cnxn, passwd, sessionTimeout); LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), Long.toHexString(request.getLastZxidSeen()), request.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { validateSession(cnxn, sessionId); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(sessionId), Long.toHexString(request.getLastZxidSeen()), request.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } if (secureServerCnxnFactory != null) { secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); } } /** * Validate if a particular session can be reestablished. * * @param cnxn * @param sessionId */ protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException { // do nothing } public boolean shouldThrottle(long outStandingCount) { int globalOutstandingLimit = getGlobalOutstandingLimit(); if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { return outStandingCount > 0; } return false; } long getFlushDelay() { return flushDelay; } static void setFlushDelay(long delay) { LOG.info("{} = {} ms", FLUSH_DELAY, delay); flushDelay = delay; } long getMaxWriteQueuePollTime() { return maxWriteQueuePollTime; } static void setMaxWriteQueuePollTime(long maxTime) { LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); maxWriteQueuePollTime = maxTime; } int getMaxBatchSize() { return maxBatchSize; } static void setMaxBatchSize(int size) { LOG.info("{}={}", MAX_BATCH_SIZE, size); maxBatchSize = size; } private void initLargeRequestThrottlingSettings() { setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); } public int getLargeRequestMaxBytes() { return largeRequestMaxBytes; } public void setLargeRequestMaxBytes(int bytes) { if (bytes <= 0) { LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); } else { largeRequestMaxBytes = bytes; LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); } } public int getLargeRequestThreshold() { return largeRequestThreshold; } public void setLargeRequestThreshold(int threshold) { if (threshold == 0 || threshold < -1) { LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); largeRequestThreshold = -1; } else { largeRequestThreshold = threshold; LOG.info("The large request threshold is set to {}", largeRequestThreshold); } } public int getLargeRequestBytes() { return currentLargeRequestBytes.get(); } private boolean isLargeRequest(int length) { // The large request limit is disabled when threshold is -1 if (largeRequestThreshold == -1) { return false; } return length > largeRequestThreshold; } public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { if (!isLargeRequest(length)) { return true; } if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { return true; } else { ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); throw new IOException("Rejecting large request"); } } private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { if (!isLargeRequest(length)) { return true; } int bytes = currentLargeRequestBytes.addAndGet(length); if (bytes > largeRequestMaxBytes) { currentLargeRequestBytes.addAndGet(-length); ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); throw new IOException("Rejecting large request"); } return true; } public void requestFinished(Request request) { int largeRequestLength = request.getLargeRequestSize(); if (largeRequestLength != -1) { currentLargeRequestBytes.addAndGet(-largeRequestLength); } } public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. // // Be aware that we're actually checking the global outstanding // request before this request. // // It's fine if the IOException thrown before we decrease the count // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); AuthPacket authPacket = request.readRecord(AuthPacket::new); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose // a different server to connect to. authReturn = ap.handleAuthentication( new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn == KeeperException.Code.OK) { LOG.info("Session 0x{}: auth success for scheme {} and address {}", Long.toHexString(cnxn.getSessionId()), scheme, cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { LOG.warn( "No authentication provider for scheme: {} has {}", scheme, ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } return; } else if (h.getType() == OpCode.sasl) { processSasl(request, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); int length = request.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); si.setLargeRequestSize(length); } si.setOwner(ServerCnxn.me); submitRequest(si); } } } private static boolean isSaslSuperUser(String id) { if (id == null || id.isEmpty()) { return false; } Properties properties = System.getProperties(); int prefixLen = SASL_SUPER_USER.length(); for (String k : properties.stringPropertyNames()) { if (k.startsWith(SASL_SUPER_USER) && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) { String value = properties.getProperty(k); if (value != null && value.equals(id)) { return true; } } } return false; } private static boolean shouldAllowSaslFailedClientsConnect() { return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); } private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { LOG.debug("Responding to client SASL token."); GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: {}", clientToken.length); byte[] responseToken = null; try { ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; try { // note that clientToken might be empty (clientToken.length == 0): // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the // SASL negotiation process. responseToken = saslServer.evaluateResponse(clientToken); if (saslServer.isComplete()) { String authorizationID = saslServer.getAuthorizationID(); LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}", Long.toHexString(cnxn.getSessionId()), authorizationID); cnxn.addAuthInfo(new Id("sasl", authorizationID)); if (isSaslSuperUser(authorizationID)) { cnxn.addAuthInfo(new Id("super", "")); LOG.info( "Session 0x{}: Authenticated Id '{}' as super user", Long.toHexString(cnxn.getSessionId()), authorizationID); } } } catch (SaslException e) { LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) { LOG.warn("Maintaining client connection despite SASL authentication failure."); } else { int error; if (authHelper.isSaslAuthRequired()) { LOG.warn( "Closing client connection due to server requires client SASL authenticaiton," + "but client SASL authentication has failed, or client is not configured with SASL " + "authentication."); error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); } else { LOG.warn("Closing client connection due to SASL authentication failure."); error = Code.AUTHFAILED.intValue(); } ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); cnxn.sendCloseSession(); cnxn.disableRecv(); return; } } } catch (NullPointerException e) { LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); } if (responseToken != null) { LOG.debug("Size of server SASL response: {}", responseToken.length); } ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); Record record = new SetSASLResponse(responseToken); cnxn.sendResponse(replyHeader, record, "response"); } // entry point for quorum/Learner.java public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { processTxnForSessionEvents(null, hdr, txn); return processTxnInDB(hdr, txn, null); } // entry point for FinalRequestProcessor.java public ProcessTxnResult processTxn(Request request) { TxnHeader hdr = request.getHdr(); processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null); final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read if (!writeRequest && !quorumRequest) { return new ProcessTxnResult(); } synchronized (outstandingChanges) { ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); // request.hdr is set for write requests, which are the only ones // that add to outstandingChanges. if (writeRequest) { long zxid = hdr.getZxid(); while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { ChangeRecord cr = outstandingChanges.remove(); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); if (cr.zxid < zxid) { LOG.warn( "Zxid outstanding 0x{} is less than current 0x{}", Long.toHexString(cr.zxid), Long.toHexString(zxid)); } if (outstandingChangesForPath.get(cr.path) == cr) { outstandingChangesForPath.remove(cr.path); } } } // do not add non quorum packets to the queue. if (quorumRequest) { getZKDatabase().addCommittedProposal(request); } return rc; } } private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { int opCode = (request == null) ? hdr.getType() : request.type; long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; if (opCode == OpCode.createSession) { if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.commitSession(sessionId, cst.getTimeOut()); } else if (request == null || !request.isLocalSession()) { LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } } private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { if (hdr == null) { return new ProcessTxnResult(); } else { return getZKDatabase().processTxn(hdr, txn, digest); } } public Map> getSessionExpiryMap() { return sessionTracker.getSessionExpiryMap(); } /** * This method is used to register the ZooKeeperServerShutdownHandler to get * server's error or shutdown state change notifications. * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for * every server state changes {@link #setState(State)}. * * @param zkShutdownHandler shutdown handler */ void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { this.zkShutdownHandler = zkShutdownHandler; } public boolean isResponseCachingEnabled() { return isResponseCachingEnabled; } public void setResponseCachingEnabled(boolean isEnabled) { isResponseCachingEnabled = isEnabled; } public ResponseCache getReadResponseCache() { return isResponseCachingEnabled ? readResponseCache : null; } public ResponseCache getGetChildrenResponseCache() { return isResponseCachingEnabled ? getChildrenResponseCache : null; } protected void registerMetrics() { MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); final ZKDatabase zkdb = this.getZKDatabase(); final ServerStats stats = this.serverStats(); rootContext.registerGauge("avg_latency", stats::getAvgLatency); rootContext.registerGauge("max_latency", stats::getMaxLatency); rootContext.registerGauge("min_latency", stats::getMinLatency); rootContext.registerGauge("packets_received", stats::getPacketsReceived); rootContext.registerGauge("packets_sent", stats::getPacketsSent); rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); rootContext.registerGauge("uptime", stats::getUptime); rootContext.registerGauge("znode_count", zkdb::getNodeCount); rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); rootContext.registerGauge("global_sessions", zkdb::getSessionCount); rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); OSMXBean osMbean = new OSMXBean(); rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount); rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount); rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount); rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE, () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree())); rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE, () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree())); rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE, () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree())); rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE, () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree())); } protected void unregisterMetrics() { MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); rootContext.unregisterGauge("avg_latency"); rootContext.unregisterGauge("max_latency"); rootContext.unregisterGauge("min_latency"); rootContext.unregisterGauge("packets_received"); rootContext.unregisterGauge("packets_sent"); rootContext.unregisterGauge("num_alive_connections"); rootContext.unregisterGauge("outstanding_requests"); rootContext.unregisterGauge("uptime"); rootContext.unregisterGauge("znode_count"); rootContext.unregisterGauge("watch_count"); rootContext.unregisterGauge("ephemerals_count"); rootContext.unregisterGauge("approximate_data_size"); rootContext.unregisterGauge("global_sessions"); rootContext.unregisterGauge("local_sessions"); rootContext.unregisterGauge("open_file_descriptor_count"); rootContext.unregisterGauge("max_file_descriptor_count"); rootContext.unregisterGauge("connection_drop_probability"); rootContext.unregisterGauge("last_client_response_size"); rootContext.unregisterGauge("max_client_response_size"); rootContext.unregisterGauge("min_client_response_size"); rootContext.unregisterGauge("auth_failed_count"); rootContext.unregisterGauge("non_mtls_remote_conn_count"); rootContext.unregisterGauge("non_mtls_local_conn_count"); rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE); rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE); rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE); rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE); } /** * Hook into admin server, useful to expose additional data * that do not represent metrics. * * @param response a sink which collects the data. */ public void dumpMonitorValues(BiConsumer response) { ServerStats stats = serverStats(); response.accept("version", Version.getFullVersion()); response.accept("server_state", stats.getServerState()); } /** * Grant or deny authorization to an operation on a node as a function of: * @param cnxn : the server connection or null for admin server commands * @param acl : set of ACLs for the node * @param perm : the permission that the client is requesting * @param ids : the credentials supplied by the client * @param path : the ZNode path * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. */ public void checkACL(ServerCnxn cnxn, List acl, int perm, List ids, String path, List setAcls) throws KeeperException.NoAuthException { if (skipACL) { return; } LOG.debug("Permission requested: {} ", perm); LOG.debug("ACLs for node: {}", acl); LOG.debug("Client credentials: {}", ids); if (acl == null || acl.size() == 0) { return; } for (Id authId : ids) { if (authId.getScheme().equals("super")) { return; } } for (ACL a : acl) { Id id = a.getId(); if ((a.getPerms() & perm) != 0) { if (id.getScheme().equals("world") && id.getId().equals("anyone")) { return; } ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); if (ap != null) { for (Id authId : ids) { if (authId.getScheme().equals(id.getScheme()) && ap.matches( new ServerAuthenticationProvider.ServerObjs(this, cnxn), new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { return; } } } } } throw new KeeperException.NoAuthException(); } /** * check a path whether exceeded the quota. * * @param path * the path of the node, used for the quota prefix check * @param lastData * the current node data, {@code null} for none * @param data * the data to be set, or {@code null} for none * @param type * currently, create and setData need to check quota */ public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException { if (!enforceQuota) { return; } long dataBytes = (data == null) ? 0 : data.length; ZKDatabase zkDatabase = getZKDatabase(); String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path); if (StringUtils.isEmpty(lastPrefix)) { return; } final String namespace = PathUtils.getTopNamespace(path); switch (type) { case OpCode.create: checkQuota(lastPrefix, dataBytes, 1, namespace); break; case OpCode.setData: checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace); break; default: throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type); } } /** * check a path whether exceeded the quota. * * @param lastPrefix the path of the node which has a quota. * @param bytesDiff * the diff to be added to number of bytes * @param countDiff * the diff to be added to the count * @param namespace * the namespace for collecting quota exceeded errors */ private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace) throws KeeperException.QuotaExceededException { LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff); // now check the quota we set String limitNode = Quotas.limitPath(lastPrefix); DataNode node = getZKDatabase().getNode(limitNode); StatsTrack limitStats; if (node == null) { // should not happen LOG.error("Missing limit node for quota {}", limitNode); return; } synchronized (node) { limitStats = new StatsTrack(node.data); } //check the quota boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1); boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1); if (!checkCountQuota && !checkByteQuota) { return; } //check the statPath quota String statNode = Quotas.statPath(lastPrefix); node = getZKDatabase().getNode(statNode); StatsTrack currentStats; if (node == null) { // should not happen LOG.error("Missing node for stat {}", statNode); return; } synchronized (node) { currentStats = new StatsTrack(node.data); } //check the Count Quota if (checkCountQuota) { long newCount = currentStats.getCount() + countDiff; boolean isCountHardLimit = limitStats.getCountHardLimit() > -1; long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount(); if (newCount > countLimit) { String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]"; RATE_LOGGER.rateLimitLog(msg); if (isCountHardLimit) { updateQuotaExceededMetrics(namespace); throw new KeeperException.QuotaExceededException(lastPrefix); } } } //check the Byte Quota if (checkByteQuota) { long newBytes = currentStats.getBytes() + bytesDiff; boolean isByteHardLimit = limitStats.getByteHardLimit() > -1; long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes(); if (newBytes > byteLimit) { String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]"; RATE_LOGGER.rateLimitLog(msg); if (isByteHardLimit) { updateQuotaExceededMetrics(namespace); throw new KeeperException.QuotaExceededException(lastPrefix); } } } } public static boolean isDigestEnabled() { return digestEnabled; } public static void setDigestEnabled(boolean digestEnabled) { LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); ZooKeeperServer.digestEnabled = digestEnabled; } public static boolean isSerializeLastProcessedZxidEnabled() { return serializeLastProcessedZxidEnabled; } public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); } /** * Trim a path to get the immediate predecessor. * * @param path * @return * @throws KeeperException.BadArgumentsException */ private String parentPath(String path) throws KeeperException.BadArgumentsException { int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { throw new KeeperException.BadArgumentsException(path); } return lastSlash == 0 ? "/" : path.substring(0, lastSlash); } private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { boolean mustCheckACL = false; String path = null; List acl = null; switch (request.type) { case OpCode.create: case OpCode.create2: { CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); } break; } case OpCode.delete: { DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); if (req != null) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); if (req != null) { path = req.getPath(); } break; } case OpCode.setACL: { SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); } break; } } if (mustCheckACL) { /* we ignore the extrapolated ACL returned by fixupACL because * we only care about it being well-formed (and if it isn't, an * exception will be raised). */ PrepRequestProcessor.fixupACL(path, request.authInfo, acl); } return path; } private int effectiveACLPerms(Request request) { switch (request.type) { case OpCode.create: case OpCode.create2: return ZooDefs.Perms.CREATE; case OpCode.delete: return ZooDefs.Perms.DELETE; case OpCode.setData: return ZooDefs.Perms.WRITE; case OpCode.setACL: return ZooDefs.Perms.ADMIN; default: return ZooDefs.Perms.ALL; } } /** * Check Write Requests for Potential Access Restrictions *

* Before a request is being proposed to the quorum, lets check it * against local ACLs. Non-write requests (read, session, etc.) * are passed along. Invalid requests are sent a response. *

* While we are at it, if the request will set an ACL: make sure it's * a valid one. * * @param request * @return true if request is permitted, false if not. */ public boolean authWriteRequest(Request request) { int err; String pathToCheck; if (!enableEagerACLCheck) { return true; } err = KeeperException.Code.OK.intValue(); try { pathToCheck = effectiveACLPath(request); if (pathToCheck != null) { checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); } } catch (KeeperException.NoAuthException e) { LOG.debug("Request failed ACL check", e); err = e.code().intValue(); } catch (KeeperException.InvalidACLException e) { LOG.debug("Request has an invalid ACL check", e); err = e.code().intValue(); } catch (KeeperException.NoNodeException e) { LOG.debug("ACL check against non-existent node: {}", e.getMessage()); } catch (KeeperException.BadArgumentsException e) { LOG.debug("ACL check against illegal node path: {}", e.getMessage()); } catch (Throwable t) { LOG.error("Uncaught exception in authWriteRequest with: ", t); throw t; } finally { if (err != KeeperException.Code.OK.intValue()) { /* This request has a bad ACL, so we are dismissing it early. */ decInProcess(); ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); try { request.cnxn.sendResponse(rh, null, null); } catch (IOException e) { LOG.error("IOException : {}", e); } } } return err == KeeperException.Code.OK.intValue(); } public int getOutstandingHandshakeNum() { if (serverCnxnFactory instanceof NettyServerCnxnFactory) { return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); } else { return 0; } } public boolean isReconfigEnabled() { return this.reconfigEnabled; } public ZooKeeperServerShutdownHandler getZkShutdownHandler() { return zkShutdownHandler; } static void updateQuotaExceededMetrics(final String namespace) { if (namespace == null) { return; } ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1); } }