diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java')
-rw-r--r-- | zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java | 2412 |
1 files changed, 0 insertions, 2412 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java deleted file mode 100644 index 00af31b46d4..00000000000 --- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ /dev/null @@ -1,2412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server; - -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.zip.Adler32; -import java.util.zip.CheckedInputStream; -import javax.security.sasl.SaslException; -import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.InputArchive; -import org.apache.jute.Record; -import org.apache.zookeeper.Environment; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.SessionExpiredException; -import org.apache.zookeeper.Quotas; -import org.apache.zookeeper.StatsTrack; -import org.apache.zookeeper.Version; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooDefs.OpCode; -import org.apache.zookeeper.ZookeeperBanner; -import org.apache.zookeeper.common.PathUtils; -import org.apache.zookeeper.common.StringUtils; -import org.apache.zookeeper.common.Time; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.data.StatPersisted; -import org.apache.zookeeper.jmx.MBeanRegistry; -import org.apache.zookeeper.metrics.MetricsContext; -import org.apache.zookeeper.proto.AuthPacket; -import org.apache.zookeeper.proto.ConnectRequest; -import org.apache.zookeeper.proto.ConnectResponse; -import org.apache.zookeeper.proto.CreateRequest; -import org.apache.zookeeper.proto.DeleteRequest; -import org.apache.zookeeper.proto.GetSASLRequest; -import org.apache.zookeeper.proto.ReplyHeader; -import org.apache.zookeeper.proto.RequestHeader; -import org.apache.zookeeper.proto.SetACLRequest; -import org.apache.zookeeper.proto.SetDataRequest; -import org.apache.zookeeper.proto.SetSASLResponse; -import org.apache.zookeeper.server.DataTree.ProcessTxnResult; -import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; -import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; -import org.apache.zookeeper.server.SessionTracker.Session; -import org.apache.zookeeper.server.SessionTracker.SessionExpirer; -import org.apache.zookeeper.server.auth.ProviderRegistry; -import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; -import org.apache.zookeeper.server.util.JvmPauseMonitor; -import org.apache.zookeeper.server.util.OSMXBean; -import org.apache.zookeeper.server.util.QuotaMetricsUtils; -import org.apache.zookeeper.server.util.RequestPathMetricsCollector; -import org.apache.zookeeper.txn.CreateSessionTxn; -import org.apache.zookeeper.txn.TxnDigest; -import org.apache.zookeeper.txn.TxnHeader; -import org.apache.zookeeper.util.ServiceUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class implements a simple standalone ZooKeeperServer. It sets up the - * following chain of RequestProcessors to process requests: - * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor - */ -public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { - - protected static final Logger LOG; - private static final RateLogger RATE_LOGGER; - - public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; - - public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; - public static final String SKIP_ACL = "zookeeper.skipACL"; - public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; - - // When enabled, will check ACL constraints appertained to the requests first, - // before sending the requests to the quorum. - static boolean enableEagerACLCheck; - - static final boolean skipACL; - - public static final boolean enforceQuota; - - public static final String SASL_SUPER_USER = "zookeeper.superUser"; - - public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; - public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; - private static boolean digestEnabled; - - public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; - private static boolean serializeLastProcessedZxidEnabled; - - // Add a enable/disable option for now, we should remove this one when - // this feature is confirmed to be stable - public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; - private static boolean closeSessionTxnEnabled = true; - private volatile CountDownLatch restoreLatch; - - static { - LOG = LoggerFactory.getLogger(ZooKeeperServer.class); - - RATE_LOGGER = new RateLogger(LOG); - - ZookeeperBanner.printBanner(LOG); - - Environment.logEnv("Server environment:", LOG); - - enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); - LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); - - skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); - if (skipACL) { - LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); - } - - enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false")); - if (enforceQuota) { - LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); - } - - digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); - LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); - - closeSessionTxnEnabled = Boolean.parseBoolean( - System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); - LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); - - setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( - System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); - } - - // @VisibleForTesting - public static boolean isEnableEagerACLCheck() { - return enableEagerACLCheck; - } - - // @VisibleForTesting - public static void setEnableEagerACLCheck(boolean enabled) { - ZooKeeperServer.enableEagerACLCheck = enabled; - LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled); - } - - public static boolean isCloseSessionTxnEnabled() { - return closeSessionTxnEnabled; - } - - public static void setCloseSessionTxnEnabled(boolean enabled) { - ZooKeeperServer.closeSessionTxnEnabled = enabled; - LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, - ZooKeeperServer.closeSessionTxnEnabled); - } - - protected ZooKeeperServerBean jmxServerBean; - protected DataTreeBean jmxDataTreeBean; - - public static final int DEFAULT_TICK_TIME = 3000; - protected int tickTime = DEFAULT_TICK_TIME; - public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled - protected static volatile int throttledOpWaitTime = - Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME); - /** value of -1 indicates unset, use default */ - protected int minSessionTimeout = -1; - /** value of -1 indicates unset, use default */ - protected int maxSessionTimeout = -1; - /** Socket listen backlog. Value of -1 indicates unset */ - protected int listenBacklog = -1; - protected SessionTracker sessionTracker; - private FileTxnSnapLog txnLogFactory = null; - private ZKDatabase zkDb; - private ResponseCache readResponseCache; - private ResponseCache getChildrenResponseCache; - private final AtomicLong hzxid = new AtomicLong(0); - public static final Exception ok = new Exception("No prob"); - protected RequestProcessor firstProcessor; - protected JvmPauseMonitor jvmPauseMonitor; - protected volatile State state = State.INITIAL; - private boolean isResponseCachingEnabled = true; - /* contains the configuration file content read at startup */ - protected String initialConfig; - protected boolean reconfigEnabled; - private final RequestPathMetricsCollector requestPathMetricsCollector; - private static final int DEFAULT_SNAP_COUNT = 100000; - private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000; - - private boolean localSessionEnabled = false; - protected enum State { - INITIAL, - RUNNING, - SHUTDOWN, - ERROR - } - - /** - * This is the secret that we use to generate passwords. For the moment, - * it's more of a checksum that's used in reconnection, which carries no - * security weight, and is treated internally as if it carries no - * security weight. - */ - private static final long superSecret = 0XB3415C00L; - - private final AtomicInteger requestsInProcess = new AtomicInteger(0); - final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); - // this data structure must be accessed under the outstandingChanges lock - final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>(); - - protected ServerCnxnFactory serverCnxnFactory; - protected ServerCnxnFactory secureServerCnxnFactory; - - private final ServerStats serverStats; - private final ZooKeeperServerListener listener; - private ZooKeeperServerShutdownHandler zkShutdownHandler; - private volatile int createSessionTrackerServerId = 1; - - private static final String FLUSH_DELAY = "zookeeper.flushDelay"; - private static volatile long flushDelay; - private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; - private static volatile long maxWriteQueuePollTime; - private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; - private static volatile int maxBatchSize; - - /** - * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. - * Flag not used for small transfers like connectResponses. - */ - public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; - public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; - public static final int intBufferStartingSizeBytes; - - public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; - public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; - - static { - long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); - setFlushDelay(configuredFlushDelay); - setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); - setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); - - intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); - - if (intBufferStartingSizeBytes < 32) { - String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. " - + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" "; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - - LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); - } - - // Connection throttling - private final BlueThrottle connThrottle = new BlueThrottle(); - - private RequestThrottler requestThrottler; - public static final String SNAP_COUNT = "zookeeper.snapCount"; - - /** - * This setting sets a limit on the total number of large requests that - * can be inflight and is designed to prevent ZooKeeper from accepting - * too many large requests such that the JVM runs out of usable heap and - * ultimately crashes. - * - * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} - * method which is called by the connection layer ({@link NIOServerCnxn}, - * {@link NettyServerCnxn}) before allocating a byte buffer and pulling - * data off the TCP socket. The limit is then checked again by the - * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which - * also atomically updates {@link #currentLargeRequestBytes}. The request is - * then marked as a large request, with the request size stored in the Request - * object so that it can later be decremented from {@link #currentLargeRequestBytes}. - * - * When a request is completed or dropped, the relevant code path calls the - * {@link #requestFinished(Request)} method which performs the decrement if - * needed. - */ - private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; - - /** - * The size threshold after which a request is considered a large request - * and is checked against the large request byte limit. - */ - private volatile int largeRequestThreshold = -1; - - private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); - - private final AuthenticationHelper authHelper = new AuthenticationHelper(); - - void removeCnxn(ServerCnxn cnxn) { - zkDb.removeCnxn(cnxn); - } - - /** - * Creates a ZooKeeperServer instance. Nothing is setup, use the setX - * methods to prepare the instance (eg datadir, datalogdir, ticktime, - * builder, etc...) - * - */ - public ZooKeeperServer() { - listener = new ZooKeeperServerListenerImpl(this); - serverStats = new ServerStats(this); - this.requestPathMetricsCollector = new RequestPathMetricsCollector(); - } - - /** - * Keeping this constructor for backward compatibility - */ - public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { - this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); - } - - /** - * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't - * actually start listening for clients until run() is invoked. - * - */ - public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) { - serverStats = new ServerStats(this); - this.txnLogFactory = txnLogFactory; - this.txnLogFactory.setServerStats(this.serverStats); - this.zkDb = zkDb; - this.tickTime = tickTime; - setMinSessionTimeout(minSessionTimeout); - setMaxSessionTimeout(maxSessionTimeout); - this.listenBacklog = clientPortListenBacklog; - this.reconfigEnabled = reconfigEnabled; - - listener = new ZooKeeperServerListenerImpl(this); - - readResponseCache = new ResponseCache(Integer.getInteger( - GET_DATA_RESPONSE_CACHE_SIZE, - ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData"); - - getChildrenResponseCache = new ResponseCache(Integer.getInteger( - GET_CHILDREN_RESPONSE_CACHE_SIZE, - ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren"); - - this.initialConfig = initialConfig; - - this.requestPathMetricsCollector = new RequestPathMetricsCollector(); - - this.initLargeRequestThrottlingSettings(); - - LOG.info( - "Created server with" - + " tickTime {} ms" - + " minSessionTimeout {} ms" - + " maxSessionTimeout {} ms" - + " clientPortListenBacklog {}" - + " dataLogdir {}" - + " snapdir {}", - tickTime, - getMinSessionTimeout(), - getMaxSessionTimeout(), - getClientPortListenBacklog(), - txnLogFactory.getDataLogDir(), - txnLogFactory.getSnapDir()); - } - - public String getInitialConfig() { - return initialConfig; - } - - /** - * Adds JvmPauseMonitor and calls - * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} - * - */ - public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { - this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); - this.jvmPauseMonitor = jvmPauseMonitor; - if (jvmPauseMonitor != null) { - LOG.info("Added JvmPauseMonitor to server"); - } - } - - /** - * creates a zookeeperserver instance. - * @param txnLogFactory the file transaction snapshot logging class - * @param tickTime the ticktime for the server - */ - public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) { - this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled()); - } - - public ServerStats serverStats() { - return serverStats; - } - - public RequestPathMetricsCollector getRequestPathMetricsCollector() { - return requestPathMetricsCollector; - } - - public BlueThrottle connThrottle() { - return connThrottle; - } - - public void dumpConf(PrintWriter pwriter) { - pwriter.print("clientPort="); - pwriter.println(getClientPort()); - pwriter.print("secureClientPort="); - pwriter.println(getSecureClientPort()); - pwriter.print("dataDir="); - pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); - pwriter.print("dataDirSize="); - pwriter.println(getDataDirSize()); - pwriter.print("dataLogDir="); - pwriter.println(zkDb.snapLog.getDataLogDir().getAbsolutePath()); - pwriter.print("dataLogSize="); - pwriter.println(getLogDirSize()); - pwriter.print("tickTime="); - pwriter.println(getTickTime()); - pwriter.print("maxClientCnxns="); - pwriter.println(getMaxClientCnxnsPerHost()); - pwriter.print("minSessionTimeout="); - pwriter.println(getMinSessionTimeout()); - pwriter.print("maxSessionTimeout="); - pwriter.println(getMaxSessionTimeout()); - pwriter.print("clientPortListenBacklog="); - pwriter.println(getClientPortListenBacklog()); - - pwriter.print("serverId="); - pwriter.println(getServerId()); - } - - public ZooKeeperServerConf getConf() { - return new ZooKeeperServerConf( - getClientPort(), - zkDb.snapLog.getSnapDir().getAbsolutePath(), - zkDb.snapLog.getDataLogDir().getAbsolutePath(), - getTickTime(), - getMaxClientCnxnsPerHost(), - getMinSessionTimeout(), - getMaxSessionTimeout(), - getServerId(), - getClientPortListenBacklog()); - } - - /** - * This constructor is for backward compatibility with the existing unit - * test code. - * It defaults to FileLogProvider persistence provider. - */ - public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { - this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); - } - - /** - * Default constructor, relies on the config for its argument values - * - * @throws IOException - */ - public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { - this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled()); - } - - /** - * get the zookeeper database for this server - * @return the zookeeper database for this server - */ - public ZKDatabase getZKDatabase() { - return this.zkDb; - } - - /** - * set the zkdatabase for this zookeeper server - * @param zkDb - */ - public void setZKDatabase(ZKDatabase zkDb) { - this.zkDb = zkDb; - } - - /** - * Restore sessions and data - */ - public void loadData() throws IOException, InterruptedException { - /* - * When a new leader starts executing Leader#lead, it - * invokes this method. The database, however, has been - * initialized before running leader election so that - * the server could pick its zxid for its initial vote. - * It does it by invoking QuorumPeer#getLastLoggedZxid. - * Consequently, we don't need to initialize it once more - * and avoid the penalty of loading it a second time. Not - * reloading it is particularly important for applications - * that host a large database. - * - * The following if block checks whether the database has - * been initialized or not. Note that this method is - * invoked by at least one other method: - * ZooKeeperServer#startdata. - * - * See ZOOKEEPER-1642 for more detail. - */ - if (zkDb.isInitialized()) { - setZxid(zkDb.getDataTreeLastProcessedZxid()); - } else { - setZxid(zkDb.loadDataBase()); - } - - // Clean up dead sessions - zkDb.getSessions().stream() - .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) - .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); - - // Make a clean snapshot - takeSnapshot(); - } - - public File takeSnapshot() throws IOException { - return takeSnapshot(false); - } - - public File takeSnapshot(boolean syncSnap) throws IOException { - return takeSnapshot(syncSnap, true, false); - } - - /** - * Takes a snapshot on the server. - * - * @param syncSnap syncSnap sync the snapshot immediately after write - * @param isSevere if true system exist, otherwise throw IOException - * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions - * - * @return file snapshot file object - * @throws IOException - */ - public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { - long start = Time.currentElapsedTime(); - File snapFile = null; - try { - if (fastForwardFromEdits) { - zkDb.fastForwardDataBase(); - } - snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); - } catch (IOException e) { - if (isSevere) { - LOG.error("Severe unrecoverable error, exiting", e); - // This is a severe error that we cannot recover from, - // so we need to exit - ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); - } else { - throw e; - } - } - long elapsed = Time.currentElapsedTime() - start; - LOG.info("Snapshot taken in {} ms", elapsed); - ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); - return snapFile; - } - - /** - * Restores database from a snapshot. It is used by the restore admin server command. - * - * @param inputStream input stream of snapshot - * @return last processed zxid - */ - public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { - if (inputStream == null) { - throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); - } - - long start = Time.currentElapsedTime(); - LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", - getZKDatabase().getDataTreeLastProcessedZxid(), - getZKDatabase().dataTree.getNodeCount(), - getZKDatabase().getSessionCount()); - - // restore to a new zkDatabase - final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); - final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); - final InputArchive ia = BinaryInputArchive.getArchive(cis); - newZKDatabase.deserializeSnapshot(ia, cis); - LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", - newZKDatabase.getDataTreeLastProcessedZxid(), - newZKDatabase.dataTree.getNodeCount(), - newZKDatabase.getSessionCount()); - - // create a CountDownLatch - restoreLatch = new CountDownLatch(1); - - try { - // set to the new zkDatabase - setZKDatabase(newZKDatabase); - - // re-create SessionTrack - createSessionTracker(); - } finally { - // unblock request submission - restoreLatch.countDown(); - restoreLatch = null; - } - - LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", - getZKDatabase().getDataTreeLastProcessedZxid(), - getZKDatabase().dataTree.getNodeCount(), - getZKDatabase().getSessionCount()); - - long elapsed = Time.currentElapsedTime() - start; - LOG.info("Restore taken in {} ms", elapsed); - ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); - - return getLastProcessedZxid(); - } - - public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { - return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); - } - - @Override - public long getDataDirSize() { - if (zkDb == null) { - return 0L; - } - File path = zkDb.snapLog.getSnapDir(); - return getDirSize(path); - } - - @Override - public long getLogDirSize() { - if (zkDb == null) { - return 0L; - } - File path = zkDb.snapLog.getDataLogDir(); - return getDirSize(path); - } - - private long getDirSize(File file) { - long size = 0L; - if (file.isDirectory()) { - File[] files = file.listFiles(); - if (files != null) { - for (File f : files) { - size += getDirSize(f); - } - } - } else { - size = file.length(); - } - return size; - } - - public long getZxid() { - return hzxid.get(); - } - - public SessionTracker getSessionTracker() { - return sessionTracker; - } - - long getNextZxid() { - return hzxid.incrementAndGet(); - } - - public void setZxid(long zxid) { - hzxid.set(zxid); - } - - private void close(long sessionId) { - Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); - submitRequest(si); - } - - public void closeSession(long sessionId) { - LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); - - // we do not want to wait for a session close. send it as soon as we - // detect it! - close(sessionId); - } - - protected void killSession(long sessionId, long zxid) { - zkDb.killSession(sessionId, zxid); - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage( - LOG, - ZooTrace.SESSION_TRACE_MASK, - "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); - } - if (sessionTracker != null) { - sessionTracker.removeSession(sessionId); - } - } - - public void expire(Session session) { - long sessionId = session.getSessionId(); - LOG.info( - "Expiring session 0x{}, timeout of {}ms exceeded", - Long.toHexString(sessionId), - session.getTimeout()); - close(sessionId); - } - - public void expire(long sessionId) { - LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId)); - - close(sessionId); - } - - public static class MissingSessionException extends IOException { - - private static final long serialVersionUID = 7467414635467261007L; - - public MissingSessionException(String msg) { - super(msg); - } - - } - - void touch(ServerCnxn cnxn) throws MissingSessionException { - if (cnxn == null) { - return; - } - long id = cnxn.getSessionId(); - int to = cnxn.getSessionTimeout(); - if (!sessionTracker.touchSession(id, to)) { - throw new MissingSessionException("No session with sessionid 0x" - + Long.toHexString(id) - + " exists, probably expired and removed"); - } - } - - protected void registerJMX() { - // register with JMX - try { - jmxServerBean = new ZooKeeperServerBean(this); - MBeanRegistry.getInstance().register(jmxServerBean, null); - - try { - jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); - MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - jmxDataTreeBean = null; - } - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - jmxServerBean = null; - } - } - - public void startdata() throws IOException, InterruptedException { - //check to see if zkDb is not null - if (zkDb == null) { - zkDb = new ZKDatabase(this.txnLogFactory); - } - if (!zkDb.isInitialized()) { - loadData(); - } - } - - public synchronized void startup() { - startupWithServerState(State.RUNNING); - } - - public synchronized void startupWithoutServing() { - startupWithServerState(State.INITIAL); - } - - public synchronized void startServing() { - setState(State.RUNNING); - notifyAll(); - } - - private void startupWithServerState(State state) { - if (sessionTracker == null) { - createSessionTracker(); - } - startSessionTracker(); - setupRequestProcessors(); - - startRequestThrottler(); - - registerJMX(); - - startJvmPauseMonitor(); - - registerMetrics(); - - setState(state); - - requestPathMetricsCollector.start(); - - localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); - - notifyAll(); - } - - protected void startJvmPauseMonitor() { - if (this.jvmPauseMonitor != null) { - this.jvmPauseMonitor.serviceStart(); - } - } - - protected void startRequestThrottler() { - requestThrottler = createRequestThrottler(); - requestThrottler.start(); - } - - protected RequestThrottler createRequestThrottler() { - return new RequestThrottler(this); - } - - protected void setupRequestProcessors() { - RequestProcessor finalProcessor = new FinalRequestProcessor(this); - RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); - ((SyncRequestProcessor) syncProcessor).start(); - firstProcessor = new PrepRequestProcessor(this, syncProcessor); - ((PrepRequestProcessor) firstProcessor).start(); - } - - public ZooKeeperServerListener getZooKeeperServerListener() { - return listener; - } - - /** - * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to - * {@link #startup()} being called - * - * @param newId ID to use - */ - public void setCreateSessionTrackerServerId(int newId) { - createSessionTrackerServerId = newId; - } - - protected void createSessionTracker() { - sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); - } - - protected void startSessionTracker() { - ((SessionTrackerImpl) sessionTracker).start(); - } - - /** - * Sets the state of ZooKeeper server. After changing the state, it notifies - * the server state change to a registered shutdown handler, if any. - * <p> - * The following are the server state transitions: - * <ul><li>During startup the server will be in the INITIAL state.</li> - * <li>After successfully starting, the server sets the state to RUNNING. - * </li> - * <li>The server transitions to the ERROR state if it hits an internal - * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource - * error events, e.g., SyncRequestProcessor not being able to write a txn to - * disk.</li> - * <li>During shutdown the server sets the state to SHUTDOWN, which - * corresponds to the server not running.</li> - * - * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE - * </li></ul> - * - * @param state new server state. - */ - protected void setState(State state) { - this.state = state; - // Notify server state changes to the registered shutdown handler, if any. - if (zkShutdownHandler != null) { - zkShutdownHandler.handle(state); - } else { - LOG.debug( - "ZKShutdownHandler is not registered, so ZooKeeper server" - + " won't take any action on ERROR or SHUTDOWN server state changes"); - } - } - - /** - * This can be used while shutting down the server to see whether the server - * is already shutdown or not. - * - * @return true if the server is running or server hits an error, false - * otherwise. - */ - protected boolean canShutdown() { - return state == State.RUNNING || state == State.ERROR; - } - - /** - * @return true if the server is running, false otherwise. - */ - public boolean isRunning() { - return state == State.RUNNING; - } - - public void shutdown() { - shutdown(false); - } - - /** - * Shut down the server instance - * @param fullyShutDown true if another server using the same database will not replace this one in the same process - */ - public synchronized void shutdown(boolean fullyShutDown) { - if (!canShutdown()) { - if (fullyShutDown && zkDb != null) { - zkDb.clear(); - } - LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - return; - } - LOG.info("shutting down"); - - // new RuntimeException("Calling shutdown").printStackTrace(); - setState(State.SHUTDOWN); - - // unregister all metrics that are keeping a strong reference to this object - // subclasses will do their specific clean up - unregisterMetrics(); - - if (requestThrottler != null) { - requestThrottler.shutdown(); - } - - // Since sessionTracker and syncThreads poll we just have to - // set running to false and they will detect it during the poll - // interval. - if (sessionTracker != null) { - sessionTracker.shutdown(); - } - if (firstProcessor != null) { - firstProcessor.shutdown(); - } - if (jvmPauseMonitor != null) { - jvmPauseMonitor.serviceStop(); - } - - if (zkDb != null) { - if (fullyShutDown) { - zkDb.clear(); - } else { - // else there is no need to clear the database - // * When a new quorum is established we can still apply the diff - // on top of the same zkDb data - // * If we fetch a new snapshot from leader, the zkDb will be - // cleared anyway before loading the snapshot - try { - // This will fast-forward the database to the latest recorded transactions - zkDb.fastForwardDataBase(); - } catch (IOException e) { - LOG.error("Error updating DB", e); - zkDb.clear(); - } - } - } - - requestPathMetricsCollector.shutdown(); - unregisterJMX(); - } - - protected void unregisterJMX() { - // unregister from JMX - try { - if (jmxDataTreeBean != null) { - MBeanRegistry.getInstance().unregister(jmxDataTreeBean); - } - } catch (Exception e) { - LOG.warn("Failed to unregister with JMX", e); - } - try { - if (jmxServerBean != null) { - MBeanRegistry.getInstance().unregister(jmxServerBean); - } - } catch (Exception e) { - LOG.warn("Failed to unregister with JMX", e); - } - jmxServerBean = null; - jmxDataTreeBean = null; - } - - public void incInProcess() { - requestsInProcess.incrementAndGet(); - } - - public void decInProcess() { - requestsInProcess.decrementAndGet(); - if (requestThrottler != null) { - requestThrottler.throttleWake(); - } - } - - public int getInProcess() { - return requestsInProcess.get(); - } - - public int getInflight() { - return requestThrottleInflight(); - } - - private int requestThrottleInflight() { - if (requestThrottler != null) { - return requestThrottler.getInflight(); - } - return 0; - } - - static class PrecalculatedDigest { - final long nodeDigest; - final long treeDigest; - - PrecalculatedDigest(long nodeDigest, long treeDigest) { - this.nodeDigest = nodeDigest; - this.treeDigest = treeDigest; - } - } - - - /** - * This structure is used to facilitate information sharing between PrepRP - * and FinalRP. - */ - static class ChangeRecord { - PrecalculatedDigest precalculatedDigest; - byte[] data; - - ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) { - this.zxid = zxid; - this.path = path; - this.stat = stat; - this.childCount = childCount; - this.acl = acl; - } - - long zxid; - - String path; - - StatPersisted stat; /* Make sure to create a new object when changing */ - - int childCount; - - List<ACL> acl; /* Make sure to create a new object when changing */ - - ChangeRecord duplicate(long zxid) { - StatPersisted stat = new StatPersisted(); - if (this.stat != null) { - DataTree.copyStatPersisted(this.stat, stat); - } - ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount, - acl == null ? new ArrayList<>() : new ArrayList<>(acl)); - changeRecord.precalculatedDigest = precalculatedDigest; - changeRecord.data = data; - return changeRecord; - } - - } - - byte[] generatePasswd(long id) { - Random r = new Random(id ^ superSecret); - byte[] p = new byte[16]; - r.nextBytes(p); - return p; - } - - protected boolean checkPasswd(long sessionId, byte[] passwd) { - return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); - } - - long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { - if (passwd == null) { - // Possible since it's just deserialized from a packet on the wire. - passwd = new byte[0]; - } - long sessionId = sessionTracker.createSession(timeout); - Random r = new Random(sessionId ^ superSecret); - r.nextBytes(passwd); - CreateSessionTxn txn = new CreateSessionTxn(timeout); - cnxn.setSessionId(sessionId); - Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); - submitRequest(si); - return sessionId; - } - - /** - * set the owner of this session as owner - * @param id the session id - * @param owner the owner of the session - * @throws SessionExpiredException - */ - public void setOwner(long id, Object owner) throws SessionExpiredException { - sessionTracker.setOwner(id, owner); - } - - protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { - boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); - if (LOG.isTraceEnabled()) { - ZooTrace.logTraceMessage( - LOG, - ZooTrace.SESSION_TRACE_MASK, - "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); - } - finishSessionInit(cnxn, rc); - } - - public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { - if (checkPasswd(sessionId, passwd)) { - revalidateSession(cnxn, sessionId, sessionTimeout); - } else { - LOG.warn( - "Incorrect password from {} for session 0x{}", - cnxn.getRemoteSocketAddress(), - Long.toHexString(sessionId)); - finishSessionInit(cnxn, false); - } - } - - public void finishSessionInit(ServerCnxn cnxn, boolean valid) { - // register with JMX - try { - if (valid) { - if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { - serverCnxnFactory.registerConnection(cnxn); - } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { - secureServerCnxnFactory.registerConnection(cnxn); - } - } - } catch (Exception e) { - LOG.warn("Failed to register with JMX", e); - } - - try { - ConnectResponse rsp = new ConnectResponse( - 0, - valid ? cnxn.getSessionTimeout() : 0, - valid ? cnxn.getSessionId() : 0, // send 0 if session is no - // longer valid - valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], - this instanceof ReadOnlyZooKeeperServer); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); - bos.writeInt(-1, "len"); - rsp.serialize(bos, "connect"); - baos.close(); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - bb.putInt(bb.remaining() - 4).rewind(); - cnxn.sendBuffer(bb); - - if (valid) { - LOG.debug( - "Established session 0x{} with negotiated timeout {} for client {}", - Long.toHexString(cnxn.getSessionId()), - cnxn.getSessionTimeout(), - cnxn.getRemoteSocketAddress()); - cnxn.enableRecv(); - } else { - - LOG.info( - "Invalid session 0x{} for client {}, probably expired", - Long.toHexString(cnxn.getSessionId()), - cnxn.getRemoteSocketAddress()); - cnxn.sendBuffer(ServerCnxnFactory.closeConn); - } - - } catch (Exception e) { - LOG.warn("Exception while establishing session, closing", e); - cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); - } - } - - public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { - closeSession(cnxn.getSessionId()); - } - - public long getServerId() { - return 0; - } - - /** - * If the underlying Zookeeper server support local session, this method - * will set a isLocalSession to true if a request is associated with - * a local session. - * - * @param si - */ - protected void setLocalSessionFlag(Request si) { - } - - public void submitRequest(Request si) { - if (restoreLatch != null) { - try { - LOG.info("Blocking request submission while restore is in progress"); - restoreLatch.await(); - } catch (final InterruptedException e) { - LOG.warn("Unexpected interruption", e); - } - } - enqueueRequest(si); - } - - public void enqueueRequest(Request si) { - if (requestThrottler == null) { - synchronized (this) { - try { - // Since all requests are passed to the request - // processor it should wait for setting up the request - // processor chain. The state will be updated to RUNNING - // after the setup. - while (state == State.INITIAL) { - wait(1000); - } - } catch (InterruptedException e) { - LOG.warn("Unexpected interruption", e); - } - if (requestThrottler == null) { - throw new RuntimeException("Not started"); - } - } - } - requestThrottler.submitRequest(si); - } - - public void submitRequestNow(Request si) { - if (firstProcessor == null) { - synchronized (this) { - try { - // Since all requests are passed to the request - // processor it should wait for setting up the request - // processor chain. The state will be updated to RUNNING - // after the setup. - while (state == State.INITIAL) { - wait(1000); - } - } catch (InterruptedException e) { - LOG.warn("Unexpected interruption", e); - } - if (firstProcessor == null || state != State.RUNNING) { - throw new RuntimeException("Not started"); - } - } - } - try { - touch(si.cnxn); - boolean validpacket = Request.isValid(si.type); - if (validpacket) { - setLocalSessionFlag(si); - firstProcessor.processRequest(si); - if (si.cnxn != null) { - incInProcess(); - } - } else { - LOG.warn("Received packet at server of unknown type {}", si.type); - // Update request accounting/throttling limits - requestFinished(si); - new UnimplementedRequestProcessor().processRequest(si); - } - } catch (MissingSessionException e) { - LOG.debug("Dropping request.", e); - // Update request accounting/throttling limits - requestFinished(si); - } catch (RequestProcessorException e) { - LOG.error("Unable to process request", e); - // Update request accounting/throttling limits - requestFinished(si); - } - } - - public static int getSnapCount() { - int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT); - // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor - if (snapCount < 2) { - LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); - snapCount = 2; - } - return snapCount; - } - - public int getGlobalOutstandingLimit() { - return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT); - } - - public static long getSnapSizeInBytes() { - long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default - if (size <= 0) { - LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); - } - return size * 1024; // Convert to bytes - } - - public void setServerCnxnFactory(ServerCnxnFactory factory) { - serverCnxnFactory = factory; - } - - public ServerCnxnFactory getServerCnxnFactory() { - return serverCnxnFactory; - } - - public ServerCnxnFactory getSecureServerCnxnFactory() { - return secureServerCnxnFactory; - } - - public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { - secureServerCnxnFactory = factory; - } - - /** - * return the last processed id from the - * datatree - */ - public long getLastProcessedZxid() { - return zkDb.getDataTreeLastProcessedZxid(); - } - - /** - * return the outstanding requests - * in the queue, which haven't been - * processed yet - */ - public long getOutstandingRequests() { - return getInProcess(); - } - - /** - * return the total number of client connections that are alive - * to this server - */ - public int getNumAliveConnections() { - int numAliveConnections = 0; - - if (serverCnxnFactory != null) { - numAliveConnections += serverCnxnFactory.getNumAliveConnections(); - } - - if (secureServerCnxnFactory != null) { - numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); - } - - return numAliveConnections; - } - - /** - * truncate the log to get in sync with others - * if in a quorum - * @param zxid the zxid that it needs to get in sync - * with others - * @throws IOException - */ - public void truncateLog(long zxid) throws IOException { - this.zkDb.truncateLog(zxid); - } - - public int getTickTime() { - return tickTime; - } - - public void setTickTime(int tickTime) { - LOG.info("tickTime set to {} ms", tickTime); - this.tickTime = tickTime; - } - - public static int getThrottledOpWaitTime() { - return throttledOpWaitTime; - } - - public static void setThrottledOpWaitTime(int time) { - LOG.info("throttledOpWaitTime set to {} ms", time); - throttledOpWaitTime = time; - } - - public int getMinSessionTimeout() { - return minSessionTimeout; - } - - public void setMinSessionTimeout(int min) { - this.minSessionTimeout = min == -1 ? tickTime * 2 : min; - LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout); - } - - public int getMaxSessionTimeout() { - return maxSessionTimeout; - } - - public void setMaxSessionTimeout(int max) { - this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; - LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout); - } - - public int getClientPortListenBacklog() { - return listenBacklog; - } - - public void setClientPortListenBacklog(int backlog) { - this.listenBacklog = backlog; - LOG.info("clientPortListenBacklog set to {}", backlog); - } - - public int getClientPort() { - return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; - } - - public int getSecureClientPort() { - return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; - } - - /** Maximum number of connections allowed from particular host (ip) */ - public int getMaxClientCnxnsPerHost() { - if (serverCnxnFactory != null) { - return serverCnxnFactory.getMaxClientCnxnsPerHost(); - } - if (secureServerCnxnFactory != null) { - return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); - } - return -1; - } - - public void setTxnLogFactory(FileTxnSnapLog txnLog) { - this.txnLogFactory = txnLog; - } - - public FileTxnSnapLog getTxnLogFactory() { - return this.txnLogFactory; - } - - /** - * Returns the elapsed sync of time of transaction log in milliseconds. - */ - public long getTxnLogElapsedSyncTime() { - return txnLogFactory.getTxnLogElapsedSyncTime(); - } - - public String getState() { - return "standalone"; - } - - public void dumpEphemerals(PrintWriter pwriter) { - zkDb.dumpEphemerals(pwriter); - } - - public Map<Long, Set<String>> getEphemerals() { - return zkDb.getEphemerals(); - } - - public double getConnectionDropChance() { - return connThrottle.getDropChance(); - } - - public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { - LOG.debug( - "Session establishment request from client {} client's lastZxid is 0x{}", - cnxn.getRemoteSocketAddress(), - Long.toHexString(request.getLastZxidSeen())); - - long sessionId = request.getSessionId(); - int tokensNeeded = 1; - if (connThrottle.isConnectionWeightEnabled()) { - if (sessionId == 0) { - if (localSessionEnabled) { - tokensNeeded = connThrottle.getRequiredTokensForLocal(); - } else { - tokensNeeded = connThrottle.getRequiredTokensForGlobal(); - } - } else { - tokensNeeded = connThrottle.getRequiredTokensForRenew(); - } - } - - if (!connThrottle.checkLimit(tokensNeeded)) { - throw new ClientCnxnLimitException(); - } - ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); - ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - - if (!cnxn.protocolManager.isReadonlyAvailable()) { - LOG.warn( - "Connection request from old client {}; will be dropped if server is in r-o mode", - cnxn.getRemoteSocketAddress()); - } - - if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { - String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); - LOG.info(msg); - throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); - } - if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { - String msg = "Refusing session(0x" - + Long.toHexString(sessionId) - + ") request for client " - + cnxn.getRemoteSocketAddress() - + " as it has seen zxid 0x" - + Long.toHexString(request.getLastZxidSeen()) - + " our last zxid is 0x" - + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) - + " client must try another server"; - - LOG.info(msg); - throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); - } - int sessionTimeout = request.getTimeOut(); - byte[] passwd = request.getPasswd(); - int minSessionTimeout = getMinSessionTimeout(); - if (sessionTimeout < minSessionTimeout) { - sessionTimeout = minSessionTimeout; - } - int maxSessionTimeout = getMaxSessionTimeout(); - if (sessionTimeout > maxSessionTimeout) { - sessionTimeout = maxSessionTimeout; - } - cnxn.setSessionTimeout(sessionTimeout); - // We don't want to receive any packets until we are sure that the - // session is setup - cnxn.disableRecv(); - if (sessionId == 0) { - long id = createSession(cnxn, passwd, sessionTimeout); - LOG.debug( - "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", - Long.toHexString(id), - Long.toHexString(request.getLastZxidSeen()), - request.getTimeOut(), - cnxn.getRemoteSocketAddress()); - } else { - validateSession(cnxn, sessionId); - LOG.debug( - "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", - Long.toHexString(sessionId), - Long.toHexString(request.getLastZxidSeen()), - request.getTimeOut(), - cnxn.getRemoteSocketAddress()); - if (serverCnxnFactory != null) { - serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); - } - if (secureServerCnxnFactory != null) { - secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); - } - cnxn.setSessionId(sessionId); - reopenSession(cnxn, sessionId, passwd, sessionTimeout); - ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); - - } - } - - /** - * Validate if a particular session can be reestablished. - * - * @param cnxn - * @param sessionId - */ - protected void validateSession(ServerCnxn cnxn, long sessionId) - throws IOException { - // do nothing - } - - public boolean shouldThrottle(long outStandingCount) { - int globalOutstandingLimit = getGlobalOutstandingLimit(); - if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { - return outStandingCount > 0; - } - return false; - } - - long getFlushDelay() { - return flushDelay; - } - - static void setFlushDelay(long delay) { - LOG.info("{} = {} ms", FLUSH_DELAY, delay); - flushDelay = delay; - } - - long getMaxWriteQueuePollTime() { - return maxWriteQueuePollTime; - } - - static void setMaxWriteQueuePollTime(long maxTime) { - LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); - maxWriteQueuePollTime = maxTime; - } - - int getMaxBatchSize() { - return maxBatchSize; - } - - static void setMaxBatchSize(int size) { - LOG.info("{}={}", MAX_BATCH_SIZE, size); - maxBatchSize = size; - } - - private void initLargeRequestThrottlingSettings() { - setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); - setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); - } - - public int getLargeRequestMaxBytes() { - return largeRequestMaxBytes; - } - - public void setLargeRequestMaxBytes(int bytes) { - if (bytes <= 0) { - LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); - LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); - } else { - largeRequestMaxBytes = bytes; - LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); - } - } - - public int getLargeRequestThreshold() { - return largeRequestThreshold; - } - - public void setLargeRequestThreshold(int threshold) { - if (threshold == 0 || threshold < -1) { - LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); - largeRequestThreshold = -1; - } else { - largeRequestThreshold = threshold; - LOG.info("The large request threshold is set to {}", largeRequestThreshold); - } - } - - public int getLargeRequestBytes() { - return currentLargeRequestBytes.get(); - } - - private boolean isLargeRequest(int length) { - // The large request limit is disabled when threshold is -1 - if (largeRequestThreshold == -1) { - return false; - } - return length > largeRequestThreshold; - } - - public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { - if (!isLargeRequest(length)) { - return true; - } - if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { - return true; - } else { - ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); - throw new IOException("Rejecting large request"); - } - - } - - private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { - if (!isLargeRequest(length)) { - return true; - } - - int bytes = currentLargeRequestBytes.addAndGet(length); - if (bytes > largeRequestMaxBytes) { - currentLargeRequestBytes.addAndGet(-length); - ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); - throw new IOException("Rejecting large request"); - } - return true; - } - - public void requestFinished(Request request) { - int largeRequestLength = request.getLargeRequestSize(); - if (largeRequestLength != -1) { - currentLargeRequestBytes.addAndGet(-largeRequestLength); - } - } - - public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { - // Need to increase the outstanding request count first, otherwise - // there might be a race condition that it enabled recv after - // processing request and then disabled when check throttling. - // - // Be aware that we're actually checking the global outstanding - // request before this request. - // - // It's fine if the IOException thrown before we decrease the count - // in cnxn, since it will close the cnxn anyway. - cnxn.incrOutstandingAndCheckThrottle(h); - - if (h.getType() == OpCode.auth) { - LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); - AuthPacket authPacket = request.readRecord(AuthPacket::new); - String scheme = authPacket.getScheme(); - ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); - Code authReturn = KeeperException.Code.AUTHFAILED; - if (ap != null) { - try { - // handleAuthentication may close the connection, to allow the client to choose - // a different server to connect to. - authReturn = ap.handleAuthentication( - new ServerAuthenticationProvider.ServerObjs(this, cnxn), - authPacket.getAuth()); - } catch (RuntimeException e) { - LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); - authReturn = KeeperException.Code.AUTHFAILED; - } - } - if (authReturn == KeeperException.Code.OK) { - LOG.info("Session 0x{}: auth success for scheme {} and address {}", - Long.toHexString(cnxn.getSessionId()), scheme, - cnxn.getRemoteSocketAddress()); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); - cnxn.sendResponse(rh, null, null); - } else { - if (ap == null) { - LOG.warn( - "No authentication provider for scheme: {} has {}", - scheme, - ProviderRegistry.listProviders()); - } else { - LOG.warn("Authentication failed for scheme: {}", scheme); - } - // send a response... - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); - cnxn.sendResponse(rh, null, null); - // ... and close connection - cnxn.sendBuffer(ServerCnxnFactory.closeConn); - cnxn.disableRecv(); - } - return; - } else if (h.getType() == OpCode.sasl) { - processSasl(request, cnxn, h); - } else { - if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { - // Authentication enforcement is failed - // Already sent response to user about failure and closed the session, lets return - return; - } else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); - int length = request.limit(); - if (isLargeRequest(length)) { - // checkRequestSize will throw IOException if request is rejected - checkRequestSizeWhenMessageReceived(length); - si.setLargeRequestSize(length); - } - si.setOwner(ServerCnxn.me); - submitRequest(si); - } - } - } - - private static boolean isSaslSuperUser(String id) { - if (id == null || id.isEmpty()) { - return false; - } - - Properties properties = System.getProperties(); - int prefixLen = SASL_SUPER_USER.length(); - - for (String k : properties.stringPropertyNames()) { - if (k.startsWith(SASL_SUPER_USER) - && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) { - String value = properties.getProperty(k); - - if (value != null && value.equals(id)) { - return true; - } - } - } - - return false; - } - - private static boolean shouldAllowSaslFailedClientsConnect() { - return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); - } - - private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { - LOG.debug("Responding to client SASL token."); - GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); - byte[] clientToken = clientTokenRecord.getToken(); - LOG.debug("Size of client SASL token: {}", clientToken.length); - byte[] responseToken = null; - try { - ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; - try { - // note that clientToken might be empty (clientToken.length == 0): - // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the - // SASL negotiation process. - responseToken = saslServer.evaluateResponse(clientToken); - if (saslServer.isComplete()) { - String authorizationID = saslServer.getAuthorizationID(); - LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}", - Long.toHexString(cnxn.getSessionId()), authorizationID); - cnxn.addAuthInfo(new Id("sasl", authorizationID)); - - if (isSaslSuperUser(authorizationID)) { - cnxn.addAuthInfo(new Id("super", "")); - LOG.info( - "Session 0x{}: Authenticated Id '{}' as super user", - Long.toHexString(cnxn.getSessionId()), - authorizationID); - } - } - } catch (SaslException e) { - LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); - if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) { - LOG.warn("Maintaining client connection despite SASL authentication failure."); - } else { - int error; - if (authHelper.isSaslAuthRequired()) { - LOG.warn( - "Closing client connection due to server requires client SASL authenticaiton," - + "but client SASL authentication has failed, or client is not configured with SASL " - + "authentication."); - error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); - } else { - LOG.warn("Closing client connection due to SASL authentication failure."); - error = Code.AUTHFAILED.intValue(); - } - - ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); - cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); - cnxn.sendCloseSession(); - cnxn.disableRecv(); - return; - } - } - } catch (NullPointerException e) { - LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); - } - if (responseToken != null) { - LOG.debug("Size of server SASL response: {}", responseToken.length); - } - - ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); - Record record = new SetSASLResponse(responseToken); - cnxn.sendResponse(replyHeader, record, "response"); - } - - // entry point for quorum/Learner.java - public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { - processTxnForSessionEvents(null, hdr, txn); - return processTxnInDB(hdr, txn, null); - } - - // entry point for FinalRequestProcessor.java - public ProcessTxnResult processTxn(Request request) { - TxnHeader hdr = request.getHdr(); - processTxnForSessionEvents(request, hdr, request.getTxn()); - - final boolean writeRequest = (hdr != null); - final boolean quorumRequest = request.isQuorum(); - - // return fast w/o synchronization when we get a read - if (!writeRequest && !quorumRequest) { - return new ProcessTxnResult(); - } - synchronized (outstandingChanges) { - ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); - - // request.hdr is set for write requests, which are the only ones - // that add to outstandingChanges. - if (writeRequest) { - long zxid = hdr.getZxid(); - while (!outstandingChanges.isEmpty() - && outstandingChanges.peek().zxid <= zxid) { - ChangeRecord cr = outstandingChanges.remove(); - ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); - if (cr.zxid < zxid) { - LOG.warn( - "Zxid outstanding 0x{} is less than current 0x{}", - Long.toHexString(cr.zxid), - Long.toHexString(zxid)); - } - if (outstandingChangesForPath.get(cr.path) == cr) { - outstandingChangesForPath.remove(cr.path); - } - } - } - - // do not add non quorum packets to the queue. - if (quorumRequest) { - getZKDatabase().addCommittedProposal(request); - } - return rc; - } - } - - private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { - int opCode = (request == null) ? hdr.getType() : request.type; - long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; - - if (opCode == OpCode.createSession) { - if (hdr != null && txn instanceof CreateSessionTxn) { - CreateSessionTxn cst = (CreateSessionTxn) txn; - sessionTracker.commitSession(sessionId, cst.getTimeOut()); - } else if (request == null || !request.isLocalSession()) { - LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); - } - } else if (opCode == OpCode.closeSession) { - sessionTracker.removeSession(sessionId); - } - } - - private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { - if (hdr == null) { - return new ProcessTxnResult(); - } else { - return getZKDatabase().processTxn(hdr, txn, digest); - } - } - - public Map<Long, Set<Long>> getSessionExpiryMap() { - return sessionTracker.getSessionExpiryMap(); - } - - /** - * This method is used to register the ZooKeeperServerShutdownHandler to get - * server's error or shutdown state change notifications. - * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for - * every server state changes {@link #setState(State)}. - * - * @param zkShutdownHandler shutdown handler - */ - void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { - this.zkShutdownHandler = zkShutdownHandler; - } - - public boolean isResponseCachingEnabled() { - return isResponseCachingEnabled; - } - - public void setResponseCachingEnabled(boolean isEnabled) { - isResponseCachingEnabled = isEnabled; - } - - public ResponseCache getReadResponseCache() { - return isResponseCachingEnabled ? readResponseCache : null; - } - - public ResponseCache getGetChildrenResponseCache() { - return isResponseCachingEnabled ? getChildrenResponseCache : null; - } - - protected void registerMetrics() { - MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); - - final ZKDatabase zkdb = this.getZKDatabase(); - final ServerStats stats = this.serverStats(); - - rootContext.registerGauge("avg_latency", stats::getAvgLatency); - - rootContext.registerGauge("max_latency", stats::getMaxLatency); - rootContext.registerGauge("min_latency", stats::getMinLatency); - - rootContext.registerGauge("packets_received", stats::getPacketsReceived); - rootContext.registerGauge("packets_sent", stats::getPacketsSent); - rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); - - rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); - rootContext.registerGauge("uptime", stats::getUptime); - - rootContext.registerGauge("znode_count", zkdb::getNodeCount); - - rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); - rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); - - rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); - - rootContext.registerGauge("global_sessions", zkdb::getSessionCount); - rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); - - OSMXBean osMbean = new OSMXBean(); - rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); - rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); - rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); - - rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); - rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); - rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); - - rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); - rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount); - rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount); - rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount); - - rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE, - () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree())); - rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE, - () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree())); - rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE, - () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree())); - rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE, - () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree())); - } - - protected void unregisterMetrics() { - - MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); - - rootContext.unregisterGauge("avg_latency"); - - rootContext.unregisterGauge("max_latency"); - rootContext.unregisterGauge("min_latency"); - - rootContext.unregisterGauge("packets_received"); - rootContext.unregisterGauge("packets_sent"); - rootContext.unregisterGauge("num_alive_connections"); - - rootContext.unregisterGauge("outstanding_requests"); - rootContext.unregisterGauge("uptime"); - - rootContext.unregisterGauge("znode_count"); - - rootContext.unregisterGauge("watch_count"); - rootContext.unregisterGauge("ephemerals_count"); - rootContext.unregisterGauge("approximate_data_size"); - - rootContext.unregisterGauge("global_sessions"); - rootContext.unregisterGauge("local_sessions"); - - rootContext.unregisterGauge("open_file_descriptor_count"); - rootContext.unregisterGauge("max_file_descriptor_count"); - rootContext.unregisterGauge("connection_drop_probability"); - - rootContext.unregisterGauge("last_client_response_size"); - rootContext.unregisterGauge("max_client_response_size"); - rootContext.unregisterGauge("min_client_response_size"); - - rootContext.unregisterGauge("auth_failed_count"); - rootContext.unregisterGauge("non_mtls_remote_conn_count"); - rootContext.unregisterGauge("non_mtls_local_conn_count"); - - rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE); - rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE); - rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE); - rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE); - } - - /** - * Hook into admin server, useful to expose additional data - * that do not represent metrics. - * - * @param response a sink which collects the data. - */ - public void dumpMonitorValues(BiConsumer<String, Object> response) { - ServerStats stats = serverStats(); - response.accept("version", Version.getFullVersion()); - response.accept("server_state", stats.getServerState()); - } - - /** - * Grant or deny authorization to an operation on a node as a function of: - * @param cnxn : the server connection or null for admin server commands - * @param acl : set of ACLs for the node - * @param perm : the permission that the client is requesting - * @param ids : the credentials supplied by the client - * @param path : the ZNode path - * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. - */ - public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException { - if (skipACL) { - return; - } - - LOG.debug("Permission requested: {} ", perm); - LOG.debug("ACLs for node: {}", acl); - LOG.debug("Client credentials: {}", ids); - - if (acl == null || acl.size() == 0) { - return; - } - for (Id authId : ids) { - if (authId.getScheme().equals("super")) { - return; - } - } - for (ACL a : acl) { - Id id = a.getId(); - if ((a.getPerms() & perm) != 0) { - if (id.getScheme().equals("world") && id.getId().equals("anyone")) { - return; - } - ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); - if (ap != null) { - for (Id authId : ids) { - if (authId.getScheme().equals(id.getScheme()) - && ap.matches( - new ServerAuthenticationProvider.ServerObjs(this, cnxn), - new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { - return; - } - } - } - } - } - throw new KeeperException.NoAuthException(); - } - - /** - * check a path whether exceeded the quota. - * - * @param path - * the path of the node, used for the quota prefix check - * @param lastData - * the current node data, {@code null} for none - * @param data - * the data to be set, or {@code null} for none - * @param type - * currently, create and setData need to check quota - */ - public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException { - if (!enforceQuota) { - return; - } - long dataBytes = (data == null) ? 0 : data.length; - ZKDatabase zkDatabase = getZKDatabase(); - String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path); - if (StringUtils.isEmpty(lastPrefix)) { - return; - } - - final String namespace = PathUtils.getTopNamespace(path); - switch (type) { - case OpCode.create: - checkQuota(lastPrefix, dataBytes, 1, namespace); - break; - case OpCode.setData: - checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace); - break; - default: - throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type); - } - } - - /** - * check a path whether exceeded the quota. - * - * @param lastPrefix - the path of the node which has a quota. - * @param bytesDiff - * the diff to be added to number of bytes - * @param countDiff - * the diff to be added to the count - * @param namespace - * the namespace for collecting quota exceeded errors - */ - private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace) - throws KeeperException.QuotaExceededException { - LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff); - - // now check the quota we set - String limitNode = Quotas.limitPath(lastPrefix); - DataNode node = getZKDatabase().getNode(limitNode); - StatsTrack limitStats; - if (node == null) { - // should not happen - LOG.error("Missing limit node for quota {}", limitNode); - return; - } - synchronized (node) { - limitStats = new StatsTrack(node.data); - } - //check the quota - boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1); - boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1); - - if (!checkCountQuota && !checkByteQuota) { - return; - } - - //check the statPath quota - String statNode = Quotas.statPath(lastPrefix); - node = getZKDatabase().getNode(statNode); - - StatsTrack currentStats; - if (node == null) { - // should not happen - LOG.error("Missing node for stat {}", statNode); - return; - } - synchronized (node) { - currentStats = new StatsTrack(node.data); - } - - //check the Count Quota - if (checkCountQuota) { - long newCount = currentStats.getCount() + countDiff; - boolean isCountHardLimit = limitStats.getCountHardLimit() > -1; - long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount(); - - if (newCount > countLimit) { - String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]"; - RATE_LOGGER.rateLimitLog(msg); - if (isCountHardLimit) { - updateQuotaExceededMetrics(namespace); - throw new KeeperException.QuotaExceededException(lastPrefix); - } - } - } - - //check the Byte Quota - if (checkByteQuota) { - long newBytes = currentStats.getBytes() + bytesDiff; - boolean isByteHardLimit = limitStats.getByteHardLimit() > -1; - long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes(); - if (newBytes > byteLimit) { - String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]"; - RATE_LOGGER.rateLimitLog(msg); - if (isByteHardLimit) { - updateQuotaExceededMetrics(namespace); - throw new KeeperException.QuotaExceededException(lastPrefix); - } - } - } - } - - public static boolean isDigestEnabled() { - return digestEnabled; - } - - public static void setDigestEnabled(boolean digestEnabled) { - LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); - ZooKeeperServer.digestEnabled = digestEnabled; - } - - public static boolean isSerializeLastProcessedZxidEnabled() { - return serializeLastProcessedZxidEnabled; - } - - public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { - serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; - LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); - } - - /** - * Trim a path to get the immediate predecessor. - * - * @param path - * @return - * @throws KeeperException.BadArgumentsException - */ - private String parentPath(String path) throws KeeperException.BadArgumentsException { - int lastSlash = path.lastIndexOf('/'); - if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { - throw new KeeperException.BadArgumentsException(path); - } - return lastSlash == 0 ? "/" : path.substring(0, lastSlash); - } - - private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { - boolean mustCheckACL = false; - String path = null; - List<ACL> acl = null; - - switch (request.type) { - case OpCode.create: - case OpCode.create2: { - CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); - if (req != null) { - mustCheckACL = true; - acl = req.getAcl(); - path = parentPath(req.getPath()); - } - break; - } - case OpCode.delete: { - DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); - if (req != null) { - path = parentPath(req.getPath()); - } - break; - } - case OpCode.setData: { - SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); - if (req != null) { - path = req.getPath(); - } - break; - } - case OpCode.setACL: { - SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); - if (req != null) { - mustCheckACL = true; - acl = req.getAcl(); - path = req.getPath(); - } - break; - } - } - - if (mustCheckACL) { - /* we ignore the extrapolated ACL returned by fixupACL because - * we only care about it being well-formed (and if it isn't, an - * exception will be raised). - */ - PrepRequestProcessor.fixupACL(path, request.authInfo, acl); - } - - return path; - } - - private int effectiveACLPerms(Request request) { - switch (request.type) { - case OpCode.create: - case OpCode.create2: - return ZooDefs.Perms.CREATE; - case OpCode.delete: - return ZooDefs.Perms.DELETE; - case OpCode.setData: - return ZooDefs.Perms.WRITE; - case OpCode.setACL: - return ZooDefs.Perms.ADMIN; - default: - return ZooDefs.Perms.ALL; - } - } - - /** - * Check Write Requests for Potential Access Restrictions - * <p> - * Before a request is being proposed to the quorum, lets check it - * against local ACLs. Non-write requests (read, session, etc.) - * are passed along. Invalid requests are sent a response. - * <p> - * While we are at it, if the request will set an ACL: make sure it's - * a valid one. - * - * @param request - * @return true if request is permitted, false if not. - */ - public boolean authWriteRequest(Request request) { - int err; - String pathToCheck; - - if (!enableEagerACLCheck) { - return true; - } - - err = KeeperException.Code.OK.intValue(); - - try { - pathToCheck = effectiveACLPath(request); - if (pathToCheck != null) { - checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); - } - } catch (KeeperException.NoAuthException e) { - LOG.debug("Request failed ACL check", e); - err = e.code().intValue(); - } catch (KeeperException.InvalidACLException e) { - LOG.debug("Request has an invalid ACL check", e); - err = e.code().intValue(); - } catch (KeeperException.NoNodeException e) { - LOG.debug("ACL check against non-existent node: {}", e.getMessage()); - } catch (KeeperException.BadArgumentsException e) { - LOG.debug("ACL check against illegal node path: {}", e.getMessage()); - } catch (Throwable t) { - LOG.error("Uncaught exception in authWriteRequest with: ", t); - throw t; - } finally { - if (err != KeeperException.Code.OK.intValue()) { - /* This request has a bad ACL, so we are dismissing it early. */ - decInProcess(); - ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); - try { - request.cnxn.sendResponse(rh, null, null); - } catch (IOException e) { - LOG.error("IOException : {}", e); - } - } - } - - return err == KeeperException.Code.OK.intValue(); - } - - public int getOutstandingHandshakeNum() { - if (serverCnxnFactory instanceof NettyServerCnxnFactory) { - return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); - } else { - return 0; - } - } - - public boolean isReconfigEnabled() { - return this.reconfigEnabled; - } - - public ZooKeeperServerShutdownHandler getZkShutdownHandler() { - return zkShutdownHandler; - } - - static void updateQuotaExceededMetrics(final String namespace) { - if (namespace == null) { - return; - } - ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1); - } -} - |