diff options
Diffstat (limited to 'zookeeper-server')
33 files changed, 728 insertions, 728 deletions
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index 1bb2f5971ab..269e0d2479a 100644 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -13,8 +13,8 @@ <version>8-SNAPSHOT</version> <modules> <module>zookeeper-server-common</module> + <module>zookeeper-server-3.8.0</module> <module>zookeeper-server</module> - <module>zookeeper-server-3.9.1</module> </modules> <dependencies> <dependency> diff --git a/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt new file mode 100644 index 00000000000..15d4c2082c4 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt @@ -0,0 +1,4 @@ +# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +install_jar(zookeeper-server-3.8.0-jar-with-dependencies.jar) +# Make symlink so that we have a default version, should be done only in zookeeper-server module +install_symlink(lib/jars/zookeeper-server-3.8.0-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar) diff --git a/zookeeper-server/zookeeper-server-3.9.1/pom.xml b/zookeeper-server/zookeeper-server-3.8.0/pom.xml index 77aec63a781..2037b9cf7c5 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/pom.xml +++ b/zookeeper-server/zookeeper-server-3.8.0/pom.xml @@ -8,11 +8,11 @@ <version>8-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <artifactId>zookeeper-server-3.9.1</artifactId> + <artifactId>zookeeper-server-3.8.0</artifactId> <packaging>container-plugin</packaging> <version>8-SNAPSHOT</version> <properties> - <zookeeper.version>3.9.1</zookeeper.version> + <zookeeper.version>3.8.0</zookeeper.version> </properties> <dependencies> <dependency> diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java index d986f02d89a..d986f02d89a 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java index 1b469beb1b8..1b469beb1b8 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java index 100de4894ae..68f7459530e 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java @@ -3,13 +3,11 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.security.X509SslContext; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.auth.AuthenticationProvider; import org.apache.zookeeper.server.auth.X509AuthenticationProvider; -import javax.net.ssl.KeyManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; import java.security.cert.X509Certificate; diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java index dd5ac4e252b..dd5ac4e252b 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java index 1f15c758583..1f15c758583 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java index 4a7f85d6985..4a7f85d6985 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/NetUtils.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/common/NetUtils.java index baa69f12968..baa69f12968 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/NetUtils.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/common/NetUtils.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index cf7f4c44015..2610b96d94c 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -63,8 +64,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req } } - private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); - private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); private static class DelayingProcessor implements RequestProcessor, Flushable { private final RequestProcessor next; @@ -90,12 +91,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public void shutdown() { next.shutdown(); } - private void startDelaying() { + private void close() { if (delayed == null) { delayed = new ArrayDeque<>(); } } - private void flushAndStopDelaying() throws RequestProcessorException { + private void open() throws RequestProcessorException { if (delayed != null) { for (Request request : delayed) { next.processRequest(request); @@ -119,7 +120,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private int randRoll; private long randSize; - private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>(); + private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); private final Semaphore snapThreadMutex = new Semaphore(1); @@ -224,12 +225,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req break; } - if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { - nextProcessor.startDelaying(); + if (si == turnForwardingDelayOn) { + nextProcessor.close(); continue; } - if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { - nextProcessor.flushAndStopDelaying(); + if (si == turnForwardingDelayOff) { + nextProcessor.open(); continue; } @@ -295,7 +296,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req } public void setDelayForwarding(boolean delayForwarding) { - queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); } private void flush() throws IOException, RequestProcessorException { diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java index 114d2987fe2..114d2987fe2 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 895bbeffa5f..12d9d46fc3a 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,33 +19,8 @@ package org.apache.zookeeper.server; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.zip.Adler32; -import java.util.zip.CheckedInputStream; -import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; -import org.apache.jute.InputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; @@ -96,6 +72,27 @@ import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.sasl.SaslException; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + /** * This class implements a simple standalone ZooKeeperServer. It sets up the * following chain of RequestProcessors to process requests: @@ -114,7 +111,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // When enabled, will check ACL constraints appertained to the requests first, // before sending the requests to the quorum. - static boolean enableEagerACLCheck; + static final boolean enableEagerACLCheck; static final boolean skipACL; @@ -126,14 +123,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; - public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; - private static boolean serializeLastProcessedZxidEnabled; - // Add a enable/disable option for now, we should remove this one when // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; private static boolean closeSessionTxnEnabled = true; - private volatile CountDownLatch restoreLatch; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -163,20 +156,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); - - setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( - System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); - } - - // @VisibleForTesting - public static boolean isEnableEagerACLCheck() { - return enableEagerACLCheck; - } - - // @VisibleForTesting - public static void setEnableEagerACLCheck(boolean enabled) { - ZooKeeperServer.enableEagerACLCheck = enabled; - LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled); } public static boolean isCloseSessionTxnEnabled() { @@ -240,7 +219,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private final AtomicInteger requestsInProcess = new AtomicInteger(0); final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); // this data structure must be accessed under the outstandingChanges lock - final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>(); + final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; @@ -287,7 +266,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } // Connection throttling - private final BlueThrottle connThrottle = new BlueThrottle(); + private BlueThrottle connThrottle = new BlueThrottle(); private RequestThrottler requestThrottler; public static final String SNAP_COUNT = "zookeeper.snapCount"; @@ -298,17 +277,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * too many large requests such that the JVM runs out of usable heap and * ultimately crashes. * - * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} + * The limit is enforced by the {@link checkRequestSize(int, boolean)} * method which is called by the connection layer ({@link NIOServerCnxn}, * {@link NettyServerCnxn}) before allocating a byte buffer and pulling * data off the TCP socket. The limit is then checked again by the - * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which - * also atomically updates {@link #currentLargeRequestBytes}. The request is + * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which + * also atomically updates {@link currentLargeRequestBytes}. The request is * then marked as a large request, with the request size stored in the Request - * object so that it can later be decremented from {@link #currentLargeRequestBytes}. + * object so that it can later be decremented from {@link currentLargeRequestsBytes}. * * When a request is completed or dropped, the relevant code path calls the - * {@link #requestFinished(Request)} method which performs the decrement if + * {@link requestFinished(Request)} method which performs the decrement if * needed. */ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; @@ -321,7 +300,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); - private final AuthenticationHelper authHelper = new AuthenticationHelper(); + private AuthenticationHelper authHelper; void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); @@ -337,6 +316,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { listener = new ZooKeeperServerListenerImpl(this); serverStats = new ServerStats(this); this.requestPathMetricsCollector = new RequestPathMetricsCollector(); + this.authHelper = new AuthenticationHelper(); } /** @@ -378,8 +358,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { this.initLargeRequestThrottlingSettings(); + this.authHelper = new AuthenticationHelper(); + LOG.info( - "Created server with" + "Created patched server with" + " tickTime {} ms" + " minSessionTimeout {} ms" + " maxSessionTimeout {} ms" @@ -544,100 +526,23 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public File takeSnapshot() throws IOException { - return takeSnapshot(false); + public void takeSnapshot() { + takeSnapshot(false); } - public File takeSnapshot(boolean syncSnap) throws IOException { - return takeSnapshot(syncSnap, true, false); - } - - /** - * Takes a snapshot on the server. - * - * @param syncSnap syncSnap sync the snapshot immediately after write - * @param isSevere if true system exist, otherwise throw IOException - * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions - * - * @return file snapshot file object - * @throws IOException - */ - public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { + public void takeSnapshot(boolean syncSnap) { long start = Time.currentElapsedTime(); - File snapFile = null; try { - if (fastForwardFromEdits) { - zkDb.fastForwardDataBase(); - } - snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { - if (isSevere) { - LOG.error("Severe unrecoverable error, exiting", e); - // This is a severe error that we cannot recover from, - // so we need to exit - ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); - } else { - throw e; - } + LOG.error("Severe unrecoverable error, exiting", e); + // This is a severe error that we cannot recover from, + // so we need to exit + ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in {} ms", elapsed); ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); - return snapFile; - } - - /** - * Restores database from a snapshot. It is used by the restore admin server command. - * - * @param inputStream input stream of snapshot - * @return last processed zxid - */ - public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { - if (inputStream == null) { - throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); - } - - long start = Time.currentElapsedTime(); - LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", - getZKDatabase().getDataTreeLastProcessedZxid(), - getZKDatabase().dataTree.getNodeCount(), - getZKDatabase().getSessionCount()); - - // restore to a new zkDatabase - final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); - final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); - final InputArchive ia = BinaryInputArchive.getArchive(cis); - newZKDatabase.deserializeSnapshot(ia, cis); - LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", - newZKDatabase.getDataTreeLastProcessedZxid(), - newZKDatabase.dataTree.getNodeCount(), - newZKDatabase.getSessionCount()); - - // create a CountDownLatch - restoreLatch = new CountDownLatch(1); - - try { - // set to the new zkDatabase - setZKDatabase(newZKDatabase); - - // re-create SessionTrack - createSessionTracker(); - } finally { - // unblock request submission - restoreLatch.countDown(); - restoreLatch = null; - } - - LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", - getZKDatabase().getDataTreeLastProcessedZxid(), - getZKDatabase().dataTree.getNodeCount(), - getZKDatabase().getSessionCount()); - - long elapsed = Time.currentElapsedTime() - start; - LOG.info("Restore taken in {} ms", elapsed); - ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); - - return getLastProcessedZxid(); } public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { @@ -830,12 +735,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } protected void startRequestThrottler() { - requestThrottler = createRequestThrottler(); + requestThrottler = new RequestThrottler(this); requestThrottler.start(); - } - protected RequestThrottler createRequestThrottler() { - return new RequestThrottler(this); } protected void setupRequestProcessors() { @@ -881,10 +783,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * error events, e.g., SyncRequestProcessor not being able to write a txn to * disk.</li> * <li>During shutdown the server sets the state to SHUTDOWN, which - * corresponds to the server not running.</li> - * - * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE - * </li></ul> + * corresponds to the server not running.</li></ul> * * @param state new server state. */ @@ -1099,9 +998,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); - CreateSessionTxn txn = new CreateSessionTxn(timeout); + ByteBuffer to = ByteBuffer.allocate(4); + to.putInt(timeout); cnxn.setSessionId(sessionId); - Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); submitRequest(si); return sessionId; } @@ -1159,12 +1059,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid - valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], - this instanceof ReadOnlyZooKeeperServer); + valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); + if (!cnxn.isOldClient) { + bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); + } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); @@ -1211,14 +1113,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public void submitRequest(Request si) { - if (restoreLatch != null) { - try { - LOG.info("Blocking request submission while restore is in progress"); - restoreLatch.await(); - } catch (final InterruptedException e) { - LOG.warn("Unexpected interruption", e); - } - } enqueueRequest(si); } @@ -1468,13 +1362,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return connThrottle.getDropChance(); } - public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { + public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) + throws IOException, ClientCnxnLimitException { + + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); + ConnectRequest connReq = new ConnectRequest(); + connReq.deserialize(bia, "connect"); LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), - Long.toHexString(request.getLastZxidSeen())); + Long.toHexString(connReq.getLastZxidSeen())); - long sessionId = request.getSessionId(); + long sessionId = connReq.getSessionId(); int tokensNeeded = 1; if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { @@ -1492,24 +1391,30 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); + ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - if (!cnxn.protocolManager.isReadonlyAvailable()) { + boolean readOnly = false; + try { + readOnly = bia.readBool("readOnly"); + cnxn.isOldClient = false; + } catch (IOException e) { + // this is ok -- just a packet from an old client which + // doesn't contain readOnly field LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } - - if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { + if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } - if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { + if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" - + Long.toHexString(request.getLastZxidSeen()) + + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; @@ -1517,8 +1422,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } - int sessionTimeout = request.getTimeOut(); - byte[] passwd = request.getPasswd(); + int sessionTimeout = connReq.getTimeOut(); + byte[] passwd = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; @@ -1536,16 +1441,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), - Long.toHexString(request.getLastZxidSeen()), - request.getTimeOut(), + Long.toHexString(connReq.getLastZxidSeen()), + connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { validateSession(cnxn, sessionId); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(sessionId), - Long.toHexString(request.getLastZxidSeen()), - request.getTimeOut(), + Long.toHexString(connReq.getLastZxidSeen()), + connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); @@ -1685,7 +1590,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } - public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { + public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { + // We have the request, now process and setup for next + InputStream bais = new ByteBufferInputStream(incomingBuffer); + BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); + RequestHeader h = new RequestHeader(); + h.deserialize(bia, "header"); + // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. @@ -1697,12 +1608,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); + // Through the magic of byte buffers, txn will not be + // pointing + // to the start of the txn + incomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); - AuthPacket authPacket = request.readRecord(AuthPacket::new); + AuthPacket authPacket = new AuthPacket(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); - Code authReturn = KeeperException.Code.AUTHFAILED; + Code authReturn = Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose @@ -1712,14 +1628,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); - authReturn = KeeperException.Code.AUTHFAILED; + authReturn = Code.AUTHFAILED; } } - if (authReturn == KeeperException.Code.OK) { + if (authReturn == Code.OK) { LOG.info("Session 0x{}: auth success for scheme {} and address {}", Long.toHexString(cnxn.getSessionId()), scheme, cnxn.getRemoteSocketAddress()); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { @@ -1731,7 +1647,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); @@ -1739,15 +1655,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } return; } else if (h.getType() == OpCode.sasl) { - processSasl(request, cnxn, h); + processSasl(incomingBuffer, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); - int length = request.limit(); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); + int length = incomingBuffer.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); @@ -1785,9 +1701,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); } - private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { + private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { LOG.debug("Responding to client SASL token."); - GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); + GetSASLRequest clientTokenRecord = new GetSASLRequest(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: {}", clientToken.length); byte[] responseToken = null; @@ -2062,7 +1979,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { /** * Grant or deny authorization to an operation on a node as a function of: - * @param cnxn : the server connection or null for admin server commands + * @param cnxn : the server connection * @param acl : set of ACLs for the node * @param perm : the permission that the client is requesting * @param ids : the credentials supplied by the client @@ -2235,15 +2152,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ZooKeeperServer.digestEnabled = digestEnabled; } - public static boolean isSerializeLastProcessedZxidEnabled() { - return serializeLastProcessedZxidEnabled; - } - - public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { - serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; - LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); - } - /** * Trim a path to get the immediate predecessor. * @@ -2267,8 +2175,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { switch (request.type) { case OpCode.create: case OpCode.create2: { - CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); - if (req != null) { + CreateRequest req = new CreateRequest(); + if (buffer2Record(request.request, req)) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); @@ -2276,22 +2184,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { break; } case OpCode.delete: { - DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); - if (req != null) { + DeleteRequest req = new DeleteRequest(); + if (buffer2Record(request.request, req)) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { - SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); - if (req != null) { + SetDataRequest req = new SetDataRequest(); + if (buffer2Record(request.request, req)) { path = req.getPath(); } break; } case OpCode.setACL: { - SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); - if (req != null) { + SetACLRequest req = new SetACLRequest(); + if (buffer2Record(request.request, req)) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); @@ -2348,7 +2256,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return true; } - err = KeeperException.Code.OK.intValue(); + err = Code.OK.intValue(); try { pathToCheck = effectiveACLPath(request); @@ -2369,7 +2277,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.error("Uncaught exception in authWriteRequest with: ", t); throw t; } finally { - if (err != KeeperException.Code.OK.intValue()) { + if (err != Code.OK.intValue()) { /* This request has a bad ACL, so we are dismissing it early. */ decInProcess(); ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); @@ -2381,7 +2289,19 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } - return err == KeeperException.Code.OK.intValue(); + return err == Code.OK.intValue(); + } + + private boolean buffer2Record(ByteBuffer request, Record record) { + boolean rv = false; + try { + ByteBufferInputStream.byteBuffer2Record(request, record); + request.rewind(); + rv = true; + } catch (IOException ex) { + } + + return rv; } public int getOutstandingHandshakeNum() { diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index 1f629bed73d..a44ebc3f7b8 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,10 +19,6 @@ package org.apache.zookeeper.server.quorum; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import javax.management.JMException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; @@ -36,6 +33,11 @@ import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import javax.management.JMException; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + /** * * Just like the standard ZooKeeperServer. We just replace the request @@ -175,7 +177,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { this, getZKDatabase().getSessionWithTimeOuts(), tickTime, - self.getMyId(), + self.getId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -291,7 +293,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { */ @Override public long getServerId() { - return self.getMyId(); + return self.getId(); } @Override diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 8d8b6dabce8..1f5f2a0b225 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -133,7 +134,7 @@ public class Learner { LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); } - final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>(); + final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); public int getPendingRevalidationsCount() { return pendingRevalidations.size(); @@ -254,9 +255,13 @@ public class Learner { oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - oa.write(payload); + if (request.request != null) { + request.request.rewind(); + int len = request.request.remaining(); + byte[] b = new byte[len]; + request.request.get(b); + request.request.rewind(); + oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); @@ -491,7 +496,7 @@ public class Learner { /* * Add sid to payload */ - LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); + LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); @@ -543,140 +548,36 @@ public class Learner { * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); + QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - QuorumVerifier newLeaderQV = null; - - class SyncHelper { - - // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot. - // For SNAP and TRUNC the snapshot is needed to save that history. - boolean willSnapshot = true; - boolean syncSnapshot = false; - // PROPOSALs received during sync, for matching up with COMMITs. - Deque<PacketInFlight> proposals = new ArrayDeque<>(); - - // PROPOSALs we delay forwarding to the ZK server until sync is done. - Deque<PacketInFlight> delayedProposals = new ArrayDeque<>(); - - // COMMITs we delay forwarding to the ZK server until sync is done. - Deque<Long> delayedCommits = new ArrayDeque<>(); - - void syncSnapshot() { - syncSnapshot = true; - } - - void noSnapshot() { - willSnapshot = false; - } - - void propose(PacketInFlight pif) { - proposals.add(pif); - delayedProposals.add(pif); - } - - PacketInFlight nextProposal() { - return proposals.peekFirst(); - } - - void commit() { - PacketInFlight packet = proposals.remove(); - if (willSnapshot) { - zk.processTxn(packet.hdr, packet.rec); - delayedProposals.remove(); - } else { - delayedCommits.add(packet.hdr.getZxid()); - } - } - - void writeState() throws IOException, InterruptedException { - // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, - // since this allows the leader to apply those transactions to its served state: - if (willSnapshot) { - zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions, - willSnapshot = false; // but anything after this needs to go to the transaction log; or - } - - self.setCurrentEpoch(newEpoch); - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - - // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing. - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first. - fzk.syncProcessor.setDelayForwarding(true); - for (PacketInFlight p : delayedProposals) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - delayedProposals.clear(); - fzk.syncProcessor.syncFlush(); - } - } - - void flushAcks() throws InterruptedException { - if (zk instanceof FollowerZooKeeperServer) { - // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(false); - fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK. - } - } - - void applyDelayedPackets() { - // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : delayedProposals) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - for (Long zxid : delayedCommits) { - fzk.commit(zxid); - } - } else if (zk instanceof ObserverZooKeeperServer) { - ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : delayedProposals) { - Long zxid = delayedCommits.peekFirst(); - if (p.hdr.getZxid() != zxid) { - // log warning message if there is no matching commit - // old leader send outstanding proposal to observer - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(zxid), - Long.toHexString(p.hdr.getZxid())); - continue; - } - delayedCommits.remove(); - Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); - request.setTxnDigest(p.digest); - ozk.commitRequest(request); - } - } else { - // New server type need to handle in-flight packets - throw new UnsupportedOperationException("Unknown server type"); - } - } - - } + QuorumVerifier newLeaderQV = null; - SyncHelper helper = new SyncHelper(); - QuorumPacket qp = new QuorumPacket(); + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot + // For SNAP and TRUNC the snapshot is needed to save that history + boolean snapshotNeeded = true; + boolean syncSnapshot = false; readPacket(qp); + Deque<Long> packetsCommitted = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); + Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); - helper.syncSnapshot(); + snapshotNeeded = true; + syncSnapshot = true; } else { - helper.noSnapshot(); + snapshotNeeded = false; } } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database + // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential @@ -692,18 +593,20 @@ public class Learner { } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - // Immediately persist the latest snapshot when there is txn log gap - helper.syncSnapshot(); + // immediately persist the latest snapshot when there is txn log gap + syncSnapshot = true; } else if (qp.getType() == Leader.TRUNC) { - // We need to truncate the log to the lastZxid of the leader + //we need to truncate the log to the lastzxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { + // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); + } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); @@ -711,10 +614,17 @@ public class Learner { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); - - // we are now going to start getting transactions to apply followed by an UPTODATE long lastQueued = 0; + + // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 + // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) + // we need to make sure that we don't take the snapshot twice. + boolean isPreZAB1_0 = true; + //If we are not going to take the snapshot be sure the transactions are not applied in memory + // but written out to the transaction log + boolean writeToTxnLog = !snapshotNeeded; TxnLogEntry logEntry; + // we are now going to start getting transactions to apply followed by an UPTODATE outerLoop: while (self.isRunning()) { readPacket(qp); @@ -738,11 +648,13 @@ public class Learner { QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); self.setLastSeenQuorumVerifier(qv, true); } - helper.propose(pif); + + packetsNotLogged.add(pif); + packetsNotCommitted.add(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: - pif = helper.nextProposal(); + pif = packetsNotCommitted.peekFirst(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", @@ -750,24 +662,43 @@ public class Learner { Long.toHexString(pif.hdr.getZxid())); } else { if (qp.getType() == Leader.COMMITANDACTIVATE) { - tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid()); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig( + qv, + ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), + true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } + } + if (!writeToTxnLog) { + zk.processTxn(pif.hdr, pif.rec); + packetsNotLogged.remove(); + packetsNotCommitted.remove(); + } else { + packetsNotCommitted.remove(); + packetsCommitted.add(qp.getZxid()); } - helper.commit(); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); + if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - byte[] remainingData = new byte[buffer.remaining()]; - buffer.get(remainingData); - logEntry = SerializeUtils.deserializeTxn(remainingData); + byte[] remainingdata = new byte[buffer.remaining()]; + buffer.get(remainingdata); + logEntry = SerializeUtils.deserializeTxn(remainingdata); packet.hdr = logEntry.getHeader(); packet.rec = logEntry.getTxn(); packet.digest = logEntry.getDigest(); - tryReconfig(packet, suggestedLeaderId, qp.getZxid()); + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); + } } else { logEntry = SerializeUtils.deserializeTxn(qp.getData()); packet.rec = logEntry.getTxn(); @@ -782,8 +713,14 @@ public class Learner { } lastQueued = packet.hdr.getZxid(); } - helper.propose(packet); - helper.commit(); + if (!writeToTxnLog) { + // Apply to db directly if we haven't taken the snapshot + zk.processTxn(packet.hdr, packet.rec); + } else { + packetsNotLogged.add(packet); + packetsCommitted.add(qp.getZxid()); + } + break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); @@ -793,11 +730,15 @@ public class Learner { throw new Exception("changes proposed in reconfig"); } } - helper.flushAcks(); + if (isPreZAB1_0) { + zk.takeSnapshot(syncSnapshot); + self.setCurrentEpoch(newEpoch); + } self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery + // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { @@ -809,13 +750,40 @@ public class Learner { } } - helper.writeState(); + if (snapshotNeeded) { + zk.takeSnapshot(syncSnapshot); + } + + self.setCurrentEpoch(newEpoch); + writeToTxnLog = true; + //Anything after this needs to go to the transaction log, not applied directly in memory + isPreZAB1_0 = false; + + // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + packetsNotLogged.clear(); + fzk.syncProcessor.syncFlush(); + } + writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); + + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); + } break; } } } - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startServing(); @@ -828,14 +796,40 @@ public class Learner { */ self.updateElectionVote(newEpoch); - helper.applyDelayedPackets(); - } - - private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig(qv, newLeader, zxid, true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); + // We need to log the stuff that came in between the snapshot and the uptodate + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : packetsCommitted) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + // Similar to follower, we need to log requests between the snapshot + // and UPTODATE + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : packetsNotLogged) { + Long zxid = packetsCommitted.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + packetsCommitted.remove(); + Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); + request.setTxn(p.rec); + request.setHdr(p.hdr); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); } } diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index 8ea94fd4daf..c1dc5cf2b8c 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,9 +19,6 @@ package org.apache.zookeeper.server.quorum; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.ServerCnxn; @@ -29,6 +27,10 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + /** * Parent class for all ZooKeeperServers for Learners */ @@ -71,7 +73,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { */ @Override public long getServerId() { - return self.getMyId(); + return self.getId(); } @Override @@ -80,7 +82,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { this, getZKDatabase().getSessionWithTimeOuts(), this.tickTime, - self.getMyId(), + self.getId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -155,7 +157,8 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } else { + } + else { LOG.info("Shutting down"); try { if (syncProcessor != null) { @@ -167,7 +170,8 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { // that contains entries we have already written to our transaction log. syncProcessor.shutdown(); } - } catch (Exception e) { + } + catch (Exception e) { LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); } } diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 1a44a98e6e7..37ca16ed52b 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,9 +19,6 @@ package org.apache.zookeeper.server.quorum; -import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BiConsumer; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -30,6 +28,10 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; + /** * A ZooKeeperServer for the Observer node type. Not much is different, but * we anticipate specializing the request processors in the future. @@ -48,7 +50,7 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer { /* * Pending sync requests - */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<>(); + */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>(); ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); @@ -107,6 +109,9 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } + else { + syncProcessor = null; + } } /* diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index a96a395b03b..b74ca0d716b 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,10 +19,6 @@ package org.apache.zookeeper.server.quorum; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Objects; -import java.util.stream.Collectors; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; @@ -35,6 +32,11 @@ import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Objects; +import java.util.stream.Collectors; + /** * A ZooKeeperServer which comes into play when peer is partitioned from the * majority. Handles read-only clients, but drops connections from not-read-only @@ -88,7 +90,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer { public void createSessionTracker() { sessionTracker = new LearnerSessionTracker( this, getZKDatabase().getSessionWithTimeOuts(), - this.tickTime, self.getMyId(), self.areLocalSessionsEnabled(), + this.tickTime, self.getId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -186,14 +188,16 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer { */ @Override public long getServerId() { - return self.getMyId(); + return self.getId(); } @Override public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { + super.shutdown(fullyShutDown); LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } else { + } + else { shutdown = true; unregisterJMX(this); diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index d65ead216f0..ec4c326e9aa 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -1,3 +1,4 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,9 +19,6 @@ package org.apache.zookeeper.server.quorum; -import java.io.Flushable; -import java.io.IOException; -import java.net.Socket; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -28,11 +26,15 @@ import org.apache.zookeeper.server.ServerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Flushable; +import java.io.IOException; +import java.net.Socket; + public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - final Learner learner; + Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; diff --git a/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt deleted file mode 100644 index 295693f22d7..00000000000 --- a/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -install_jar(zookeeper-server-3.9.1-jar-with-dependencies.jar) diff --git a/zookeeper-server/zookeeper-server/CMakeLists.txt b/zookeeper-server/zookeeper-server/CMakeLists.txt index 15d4c2082c4..295693f22d7 100644 --- a/zookeeper-server/zookeeper-server/CMakeLists.txt +++ b/zookeeper-server/zookeeper-server/CMakeLists.txt @@ -1,4 +1,2 @@ # Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -install_jar(zookeeper-server-3.8.0-jar-with-dependencies.jar) -# Make symlink so that we have a default version, should be done only in zookeeper-server module -install_symlink(lib/jars/zookeeper-server-3.8.0-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar) +install_jar(zookeeper-server-3.9.1-jar-with-dependencies.jar) diff --git a/zookeeper-server/zookeeper-server/pom.xml b/zookeeper-server/zookeeper-server/pom.xml index 2037b9cf7c5..77aec63a781 100644 --- a/zookeeper-server/zookeeper-server/pom.xml +++ b/zookeeper-server/zookeeper-server/pom.xml @@ -8,11 +8,11 @@ <version>8-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <artifactId>zookeeper-server-3.8.0</artifactId> + <artifactId>zookeeper-server-3.9.1</artifactId> <packaging>container-plugin</packaging> <version>8-SNAPSHOT</version> <properties> - <zookeeper.version>3.8.0</zookeeper.version> + <zookeeper.version>3.9.1</zookeeper.version> </properties> <dependencies> <dependency> diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java index 68f7459530e..100de4894ae 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java +++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java @@ -3,11 +3,13 @@ package com.yahoo.vespa.zookeeper; import com.yahoo.security.X509SslContext; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.auth.AuthenticationProvider; import org.apache.zookeeper.server.auth.X509AuthenticationProvider; +import javax.net.ssl.KeyManager; import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509TrustManager; import java.security.cert.X509Certificate; diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/ClientX509Util.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java index c0034a4723f..c0034a4723f 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/ClientX509Util.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java index 2610b96d94c..cf7f4c44015 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -64,8 +63,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req } } - private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); - private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); + private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); private static class DelayingProcessor implements RequestProcessor, Flushable { private final RequestProcessor next; @@ -91,12 +90,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req public void shutdown() { next.shutdown(); } - private void close() { + private void startDelaying() { if (delayed == null) { delayed = new ArrayDeque<>(); } } - private void open() throws RequestProcessorException { + private void flushAndStopDelaying() throws RequestProcessorException { if (delayed != null) { for (Request request : delayed) { next.processRequest(request); @@ -120,7 +119,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req private int randRoll; private long randSize; - private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); + private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>(); private final Semaphore snapThreadMutex = new Semaphore(1); @@ -225,12 +224,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req break; } - if (si == turnForwardingDelayOn) { - nextProcessor.close(); + if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { + nextProcessor.startDelaying(); continue; } - if (si == turnForwardingDelayOff) { - nextProcessor.open(); + if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { + nextProcessor.flushAndStopDelaying(); continue; } @@ -296,7 +295,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req } public void setDelayForwarding(boolean delayForwarding) { - queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); } private void flush() throws IOException, RequestProcessorException { diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 12d9d46fc3a..895bbeffa5f 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,8 +18,33 @@ package org.apache.zookeeper.server; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; +import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.InputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; @@ -72,27 +96,6 @@ import org.apache.zookeeper.util.ServiceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.security.sasl.SaslException; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; - /** * This class implements a simple standalone ZooKeeperServer. It sets up the * following chain of RequestProcessors to process requests: @@ -111,7 +114,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // When enabled, will check ACL constraints appertained to the requests first, // before sending the requests to the quorum. - static final boolean enableEagerACLCheck; + static boolean enableEagerACLCheck; static final boolean skipACL; @@ -123,10 +126,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; private static boolean digestEnabled; + public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; + private static boolean serializeLastProcessedZxidEnabled; + // Add a enable/disable option for now, we should remove this one when // this feature is confirmed to be stable public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; private static boolean closeSessionTxnEnabled = true; + private volatile CountDownLatch restoreLatch; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); @@ -156,6 +163,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { closeSessionTxnEnabled = Boolean.parseBoolean( System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); + + setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( + System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); + } + + // @VisibleForTesting + public static boolean isEnableEagerACLCheck() { + return enableEagerACLCheck; + } + + // @VisibleForTesting + public static void setEnableEagerACLCheck(boolean enabled) { + ZooKeeperServer.enableEagerACLCheck = enabled; + LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled); } public static boolean isCloseSessionTxnEnabled() { @@ -219,7 +240,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private final AtomicInteger requestsInProcess = new AtomicInteger(0); final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); // this data structure must be accessed under the outstandingChanges lock - final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); + final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>(); protected ServerCnxnFactory serverCnxnFactory; protected ServerCnxnFactory secureServerCnxnFactory; @@ -266,7 +287,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } // Connection throttling - private BlueThrottle connThrottle = new BlueThrottle(); + private final BlueThrottle connThrottle = new BlueThrottle(); private RequestThrottler requestThrottler; public static final String SNAP_COUNT = "zookeeper.snapCount"; @@ -277,17 +298,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * too many large requests such that the JVM runs out of usable heap and * ultimately crashes. * - * The limit is enforced by the {@link checkRequestSize(int, boolean)} + * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} * method which is called by the connection layer ({@link NIOServerCnxn}, * {@link NettyServerCnxn}) before allocating a byte buffer and pulling * data off the TCP socket. The limit is then checked again by the - * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which - * also atomically updates {@link currentLargeRequestBytes}. The request is + * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which + * also atomically updates {@link #currentLargeRequestBytes}. The request is * then marked as a large request, with the request size stored in the Request - * object so that it can later be decremented from {@link currentLargeRequestsBytes}. + * object so that it can later be decremented from {@link #currentLargeRequestBytes}. * * When a request is completed or dropped, the relevant code path calls the - * {@link requestFinished(Request)} method which performs the decrement if + * {@link #requestFinished(Request)} method which performs the decrement if * needed. */ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; @@ -300,7 +321,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); - private AuthenticationHelper authHelper; + private final AuthenticationHelper authHelper = new AuthenticationHelper(); void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); @@ -316,7 +337,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { listener = new ZooKeeperServerListenerImpl(this); serverStats = new ServerStats(this); this.requestPathMetricsCollector = new RequestPathMetricsCollector(); - this.authHelper = new AuthenticationHelper(); } /** @@ -358,10 +378,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { this.initLargeRequestThrottlingSettings(); - this.authHelper = new AuthenticationHelper(); - LOG.info( - "Created patched server with" + "Created server with" + " tickTime {} ms" + " minSessionTimeout {} ms" + " maxSessionTimeout {} ms" @@ -526,23 +544,100 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { takeSnapshot(); } - public void takeSnapshot() { - takeSnapshot(false); + public File takeSnapshot() throws IOException { + return takeSnapshot(false); } - public void takeSnapshot(boolean syncSnap) { + public File takeSnapshot(boolean syncSnap) throws IOException { + return takeSnapshot(syncSnap, true, false); + } + + /** + * Takes a snapshot on the server. + * + * @param syncSnap syncSnap sync the snapshot immediately after write + * @param isSevere if true system exist, otherwise throw IOException + * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions + * + * @return file snapshot file object + * @throws IOException + */ + public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { long start = Time.currentElapsedTime(); + File snapFile = null; try { - txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); + if (fastForwardFromEdits) { + zkDb.fastForwardDataBase(); + } + snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { - LOG.error("Severe unrecoverable error, exiting", e); - // This is a severe error that we cannot recover from, - // so we need to exit - ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + if (isSevere) { + LOG.error("Severe unrecoverable error, exiting", e); + // This is a severe error that we cannot recover from, + // so we need to exit + ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); + } else { + throw e; + } } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in {} ms", elapsed); ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); + return snapFile; + } + + /** + * Restores database from a snapshot. It is used by the restore admin server command. + * + * @param inputStream input stream of snapshot + * @return last processed zxid + */ + public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { + if (inputStream == null) { + throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); + } + + long start = Time.currentElapsedTime(); + LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + // restore to a new zkDatabase + final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); + final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); + final InputArchive ia = BinaryInputArchive.getArchive(cis); + newZKDatabase.deserializeSnapshot(ia, cis); + LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + newZKDatabase.getDataTreeLastProcessedZxid(), + newZKDatabase.dataTree.getNodeCount(), + newZKDatabase.getSessionCount()); + + // create a CountDownLatch + restoreLatch = new CountDownLatch(1); + + try { + // set to the new zkDatabase + setZKDatabase(newZKDatabase); + + // re-create SessionTrack + createSessionTracker(); + } finally { + // unblock request submission + restoreLatch.countDown(); + restoreLatch = null; + } + + LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", + getZKDatabase().getDataTreeLastProcessedZxid(), + getZKDatabase().dataTree.getNodeCount(), + getZKDatabase().getSessionCount()); + + long elapsed = Time.currentElapsedTime() - start; + LOG.info("Restore taken in {} ms", elapsed); + ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); + + return getLastProcessedZxid(); } public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { @@ -735,9 +830,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } protected void startRequestThrottler() { - requestThrottler = new RequestThrottler(this); + requestThrottler = createRequestThrottler(); requestThrottler.start(); + } + protected RequestThrottler createRequestThrottler() { + return new RequestThrottler(this); } protected void setupRequestProcessors() { @@ -783,7 +881,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { * error events, e.g., SyncRequestProcessor not being able to write a txn to * disk.</li> * <li>During shutdown the server sets the state to SHUTDOWN, which - * corresponds to the server not running.</li></ul> + * corresponds to the server not running.</li> + * + * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE + * </li></ul> * * @param state new server state. */ @@ -998,10 +1099,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); - ByteBuffer to = ByteBuffer.allocate(4); - to.putInt(timeout); + CreateSessionTxn txn = new CreateSessionTxn(timeout); cnxn.setSessionId(sessionId); - Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); submitRequest(si); return sessionId; } @@ -1059,14 +1159,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid - valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); + valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], + this instanceof ReadOnlyZooKeeperServer); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); - if (!cnxn.isOldClient) { - bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); - } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); @@ -1113,6 +1211,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } public void submitRequest(Request si) { + if (restoreLatch != null) { + try { + LOG.info("Blocking request submission while restore is in progress"); + restoreLatch.await(); + } catch (final InterruptedException e) { + LOG.warn("Unexpected interruption", e); + } + } enqueueRequest(si); } @@ -1362,18 +1468,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return connThrottle.getDropChance(); } - public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) - throws IOException, ClientCnxnLimitException { - - BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); - ConnectRequest connReq = new ConnectRequest(); - connReq.deserialize(bia, "connect"); + public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), - Long.toHexString(connReq.getLastZxidSeen())); + Long.toHexString(request.getLastZxidSeen())); - long sessionId = connReq.getSessionId(); + long sessionId = request.getSessionId(); int tokensNeeded = 1; if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { @@ -1391,30 +1492,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); - ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - boolean readOnly = false; - try { - readOnly = bia.readBool("readOnly"); - cnxn.isOldClient = false; - } catch (IOException e) { - // this is ok -- just a packet from an old client which - // doesn't contain readOnly field + if (!cnxn.protocolManager.isReadonlyAvailable()) { LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } - if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { + + if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } - if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { + if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" - + Long.toHexString(connReq.getLastZxidSeen()) + + Long.toHexString(request.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; @@ -1422,8 +1517,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } - int sessionTimeout = connReq.getTimeOut(); - byte[] passwd = connReq.getPasswd(); + int sessionTimeout = request.getTimeOut(); + byte[] passwd = request.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; @@ -1441,16 +1536,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), - Long.toHexString(connReq.getLastZxidSeen()), - connReq.getTimeOut(), + Long.toHexString(request.getLastZxidSeen()), + request.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { validateSession(cnxn, sessionId); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(sessionId), - Long.toHexString(connReq.getLastZxidSeen()), - connReq.getTimeOut(), + Long.toHexString(request.getLastZxidSeen()), + request.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); @@ -1590,13 +1685,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } - public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { - // We have the request, now process and setup for next - InputStream bais = new ByteBufferInputStream(incomingBuffer); - BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); - RequestHeader h = new RequestHeader(); - h.deserialize(bia, "header"); - + public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. @@ -1608,17 +1697,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); - // Through the magic of byte buffers, txn will not be - // pointing - // to the start of the txn - incomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); - AuthPacket authPacket = new AuthPacket(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); + AuthPacket authPacket = request.readRecord(AuthPacket::new); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); - Code authReturn = Code.AUTHFAILED; + Code authReturn = KeeperException.Code.AUTHFAILED; if (ap != null) { try { // handleAuthentication may close the connection, to allow the client to choose @@ -1628,14 +1712,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { authPacket.getAuth()); } catch (RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); - authReturn = Code.AUTHFAILED; + authReturn = KeeperException.Code.AUTHFAILED; } } - if (authReturn == Code.OK) { + if (authReturn == KeeperException.Code.OK) { LOG.info("Session 0x{}: auth success for scheme {} and address {}", Long.toHexString(cnxn.getSessionId()), scheme, cnxn.getRemoteSocketAddress()); - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue()); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } else { if (ap == null) { @@ -1647,7 +1731,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.warn("Authentication failed for scheme: {}", scheme); } // send a response... - ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue()); + ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); @@ -1655,15 +1739,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } return; } else if (h.getType() == OpCode.sasl) { - processSasl(incomingBuffer, cnxn, h); + processSasl(request, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); - int length = incomingBuffer.limit(); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); + int length = request.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); @@ -1701,10 +1785,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); } - private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { + private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { LOG.debug("Responding to client SASL token."); - GetSASLRequest clientTokenRecord = new GetSASLRequest(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); + GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: {}", clientToken.length); byte[] responseToken = null; @@ -1979,7 +2062,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { /** * Grant or deny authorization to an operation on a node as a function of: - * @param cnxn : the server connection + * @param cnxn : the server connection or null for admin server commands * @param acl : set of ACLs for the node * @param perm : the permission that the client is requesting * @param ids : the credentials supplied by the client @@ -2152,6 +2235,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { ZooKeeperServer.digestEnabled = digestEnabled; } + public static boolean isSerializeLastProcessedZxidEnabled() { + return serializeLastProcessedZxidEnabled; + } + + public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { + serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; + LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); + } + /** * Trim a path to get the immediate predecessor. * @@ -2175,8 +2267,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { switch (request.type) { case OpCode.create: case OpCode.create2: { - CreateRequest req = new CreateRequest(); - if (buffer2Record(request.request, req)) { + CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); @@ -2184,22 +2276,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { break; } case OpCode.delete: { - DeleteRequest req = new DeleteRequest(); - if (buffer2Record(request.request, req)) { + DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); + if (req != null) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { - SetDataRequest req = new SetDataRequest(); - if (buffer2Record(request.request, req)) { + SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); + if (req != null) { path = req.getPath(); } break; } case OpCode.setACL: { - SetACLRequest req = new SetACLRequest(); - if (buffer2Record(request.request, req)) { + SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); @@ -2256,7 +2348,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { return true; } - err = Code.OK.intValue(); + err = KeeperException.Code.OK.intValue(); try { pathToCheck = effectiveACLPath(request); @@ -2277,7 +2369,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { LOG.error("Uncaught exception in authWriteRequest with: ", t); throw t; } finally { - if (err != Code.OK.intValue()) { + if (err != KeeperException.Code.OK.intValue()) { /* This request has a bad ACL, so we are dismissing it early. */ decInProcess(); ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); @@ -2289,19 +2381,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { } } - return err == Code.OK.intValue(); - } - - private boolean buffer2Record(ByteBuffer request, Record record) { - boolean rv = false; - try { - ByteBufferInputStream.byteBuffer2Record(request, record); - request.rewind(); - rv = true; - } catch (IOException ex) { - } - - return rv; + return err == KeeperException.Code.OK.intValue(); } public int getOutstandingHandshakeNum() { diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java index a44ebc3f7b8..1f629bed73d 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +18,10 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.management.JMException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; @@ -33,11 +36,6 @@ import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import javax.management.JMException; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - /** * * Just like the standard ZooKeeperServer. We just replace the request @@ -177,7 +175,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { this, getZKDatabase().getSessionWithTimeOuts(), tickTime, - self.getId(), + self.getMyId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -293,7 +291,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { */ @Override public long getServerId() { - return self.getId(); + return self.getMyId(); } @Override diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 1f5f2a0b225..8d8b6dabce8 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -134,7 +133,7 @@ public class Learner { LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync); } - final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>(); + final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>(); public int getPendingRevalidationsCount() { return pendingRevalidations.size(); @@ -255,13 +254,9 @@ public class Learner { oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); - if (request.request != null) { - request.request.rewind(); - int len = request.request.remaining(); - byte[] b = new byte[len]; - request.request.get(b); - request.request.rewind(); - oa.write(b); + byte[] payload = request.readRequestBytes(); + if (payload != null) { + oa.write(payload); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); @@ -496,7 +491,7 @@ public class Learner { /* * Add sid to payload */ - LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion()); + LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); @@ -548,36 +543,140 @@ public class Learner { * @throws InterruptedException */ protected void syncWithLeader(long newLeaderZxid) throws Exception { - QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); - QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); - QuorumVerifier newLeaderQV = null; - // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot - // For SNAP and TRUNC the snapshot is needed to save that history - boolean snapshotNeeded = true; - boolean syncSnapshot = false; + class SyncHelper { + + // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot. + // For SNAP and TRUNC the snapshot is needed to save that history. + boolean willSnapshot = true; + boolean syncSnapshot = false; + + // PROPOSALs received during sync, for matching up with COMMITs. + Deque<PacketInFlight> proposals = new ArrayDeque<>(); + + // PROPOSALs we delay forwarding to the ZK server until sync is done. + Deque<PacketInFlight> delayedProposals = new ArrayDeque<>(); + + // COMMITs we delay forwarding to the ZK server until sync is done. + Deque<Long> delayedCommits = new ArrayDeque<>(); + + void syncSnapshot() { + syncSnapshot = true; + } + + void noSnapshot() { + willSnapshot = false; + } + + void propose(PacketInFlight pif) { + proposals.add(pif); + delayedProposals.add(pif); + } + + PacketInFlight nextProposal() { + return proposals.peekFirst(); + } + + void commit() { + PacketInFlight packet = proposals.remove(); + if (willSnapshot) { + zk.processTxn(packet.hdr, packet.rec); + delayedProposals.remove(); + } else { + delayedCommits.add(packet.hdr.getZxid()); + } + } + + void writeState() throws IOException, InterruptedException { + // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER, + // since this allows the leader to apply those transactions to its served state: + if (willSnapshot) { + zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions, + willSnapshot = false; // but anything after this needs to go to the transaction log; or + } + + self.setCurrentEpoch(newEpoch); + sock.setSoTimeout(self.tickTime * self.syncLimit); + self.setSyncMode(QuorumPeer.SyncMode.NONE); + zk.startupWithoutServing(); + + // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first. + fzk.syncProcessor.setDelayForwarding(true); + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + delayedProposals.clear(); + fzk.syncProcessor.syncFlush(); + } + } + + void flushAcks() throws InterruptedException { + if (zk instanceof FollowerZooKeeperServer) { + // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState. + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + fzk.syncProcessor.setDelayForwarding(false); + fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK. + } + } + + void applyDelayedPackets() { + // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs. + if (zk instanceof FollowerZooKeeperServer) { + FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + fzk.logRequest(p.hdr, p.rec, p.digest); + } + for (Long zxid : delayedCommits) { + fzk.commit(zxid); + } + } else if (zk instanceof ObserverZooKeeperServer) { + ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; + for (PacketInFlight p : delayedProposals) { + Long zxid = delayedCommits.peekFirst(); + if (p.hdr.getZxid() != zxid) { + // log warning message if there is no matching commit + // old leader send outstanding proposal to observer + LOG.warn( + "Committing 0x{}, but next proposal is 0x{}", + Long.toHexString(zxid), + Long.toHexString(p.hdr.getZxid())); + continue; + } + delayedCommits.remove(); + Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); + request.setTxnDigest(p.digest); + ozk.commitRequest(request); + } + } else { + // New server type need to handle in-flight packets + throw new UnsupportedOperationException("Unknown server type"); + } + } + + } + + SyncHelper helper = new SyncHelper(); + QuorumPacket qp = new QuorumPacket(); readPacket(qp); - Deque<Long> packetsCommitted = new ArrayDeque<>(); - Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>(); - Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>(); synchronized (zk) { if (qp.getType() == Leader.DIFF) { LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid())); self.setSyncMode(QuorumPeer.SyncMode.DIFF); if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) { LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading."); - snapshotNeeded = true; - syncSnapshot = true; + helper.syncSnapshot(); } else { - snapshotNeeded = false; + helper.noSnapshot(); } } else if (qp.getType() == Leader.SNAP) { self.setSyncMode(QuorumPeer.SyncMode.SNAP); LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid())); // The leader is going to dump the database - // db is clear as part of deserializeSnapshot() zk.getZKDatabase().deserializeSnapshot(leaderIs); // ZOOKEEPER-2819: overwrite config node content extracted // from leader snapshot with local config, to avoid potential @@ -593,20 +692,18 @@ public class Learner { } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - // immediately persist the latest snapshot when there is txn log gap - syncSnapshot = true; + // Immediately persist the latest snapshot when there is txn log gap + helper.syncSnapshot(); } else if (qp.getType() == Leader.TRUNC) { - //we need to truncate the log to the lastzxid of the leader + // We need to truncate the log to the lastZxid of the leader self.setSyncMode(QuorumPeer.SyncMode.TRUNC); LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid())); boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid()); if (!truncated) { - // not able to truncate the log LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid())); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); } zk.getZKDatabase().setlastProcessedZxid(qp.getZxid()); - } else { LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp)); ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue()); @@ -614,17 +711,10 @@ public class Learner { zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); zk.createSessionTracker(); - long lastQueued = 0; - // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0 - // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER) - // we need to make sure that we don't take the snapshot twice. - boolean isPreZAB1_0 = true; - //If we are not going to take the snapshot be sure the transactions are not applied in memory - // but written out to the transaction log - boolean writeToTxnLog = !snapshotNeeded; - TxnLogEntry logEntry; // we are now going to start getting transactions to apply followed by an UPTODATE + long lastQueued = 0; + TxnLogEntry logEntry; outerLoop: while (self.isRunning()) { readPacket(qp); @@ -648,13 +738,11 @@ public class Learner { QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8)); self.setLastSeenQuorumVerifier(qv, true); } - - packetsNotLogged.add(pif); - packetsNotCommitted.add(pif); + helper.propose(pif); break; case Leader.COMMIT: case Leader.COMMITANDACTIVATE: - pif = packetsNotCommitted.peekFirst(); + pif = helper.nextProposal(); if (pif.hdr.getZxid() != qp.getZxid()) { LOG.warn( "Committing 0x{}, but next proposal is 0x{}", @@ -662,43 +750,24 @@ public class Learner { Long.toHexString(pif.hdr.getZxid())); } else { if (qp.getType() == Leader.COMMITANDACTIVATE) { - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig( - qv, - ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), - true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } - } - if (!writeToTxnLog) { - zk.processTxn(pif.hdr, pif.rec); - packetsNotLogged.remove(); - packetsNotCommitted.remove(); - } else { - packetsNotCommitted.remove(); - packetsCommitted.add(qp.getZxid()); + tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid()); } + helper.commit(); } break; case Leader.INFORM: case Leader.INFORMANDACTIVATE: PacketInFlight packet = new PacketInFlight(); - if (qp.getType() == Leader.INFORMANDACTIVATE) { ByteBuffer buffer = ByteBuffer.wrap(qp.getData()); long suggestedLeaderId = buffer.getLong(); - byte[] remainingdata = new byte[buffer.remaining()]; - buffer.get(remainingdata); - logEntry = SerializeUtils.deserializeTxn(remainingdata); + byte[] remainingData = new byte[buffer.remaining()]; + buffer.get(remainingData); + logEntry = SerializeUtils.deserializeTxn(remainingData); packet.hdr = logEntry.getHeader(); packet.rec = logEntry.getTxn(); packet.digest = logEntry.getDigest(); - QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8)); - boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true); - if (majorChange) { - throw new Exception("changes proposed in reconfig"); - } + tryReconfig(packet, suggestedLeaderId, qp.getZxid()); } else { logEntry = SerializeUtils.deserializeTxn(qp.getData()); packet.rec = logEntry.getTxn(); @@ -713,14 +782,8 @@ public class Learner { } lastQueued = packet.hdr.getZxid(); } - if (!writeToTxnLog) { - // Apply to db directly if we haven't taken the snapshot - zk.processTxn(packet.hdr, packet.rec); - } else { - packetsNotLogged.add(packet); - packetsCommitted.add(qp.getZxid()); - } - + helper.propose(packet); + helper.commit(); break; case Leader.UPTODATE: LOG.info("Learner received UPTODATE message"); @@ -730,15 +793,11 @@ public class Learner { throw new Exception("changes proposed in reconfig"); } } - if (isPreZAB1_0) { - zk.takeSnapshot(syncSnapshot); - self.setCurrentEpoch(newEpoch); - } + helper.flushAcks(); self.setZooKeeperServer(zk); self.adminServer.setZooKeeperServer(zk); break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery - // means this is Zab 1.0 LOG.info("Learner received NEWLEADER message"); if (qp.getData() != null && qp.getData().length > 1) { try { @@ -750,40 +809,13 @@ public class Learner { } } - if (snapshotNeeded) { - zk.takeSnapshot(syncSnapshot); - } - - self.setCurrentEpoch(newEpoch); - writeToTxnLog = true; - //Anything after this needs to go to the transaction log, not applied directly in memory - isPreZAB1_0 = false; - - // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER). - sock.setSoTimeout(self.tickTime * self.syncLimit); - self.setSyncMode(QuorumPeer.SyncMode.NONE); - zk.startupWithoutServing(); - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(true); - for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - packetsNotLogged.clear(); - fzk.syncProcessor.syncFlush(); - } - + helper.writeState(); writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); - - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - fzk.syncProcessor.setDelayForwarding(false); - fzk.syncProcessor.syncFlush(); - } break; } } } + QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); zk.startServing(); @@ -796,40 +828,14 @@ public class Learner { */ self.updateElectionVote(newEpoch); - // We need to log the stuff that came in between the snapshot and the uptodate - if (zk instanceof FollowerZooKeeperServer) { - FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk; - for (PacketInFlight p : packetsNotLogged) { - fzk.logRequest(p.hdr, p.rec, p.digest); - } - for (Long zxid : packetsCommitted) { - fzk.commit(zxid); - } - } else if (zk instanceof ObserverZooKeeperServer) { - // Similar to follower, we need to log requests between the snapshot - // and UPTODATE - ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk; - for (PacketInFlight p : packetsNotLogged) { - Long zxid = packetsCommitted.peekFirst(); - if (p.hdr.getZxid() != zxid) { - // log warning message if there is no matching commit - // old leader send outstanding proposal to observer - LOG.warn( - "Committing 0x{}, but next proposal is 0x{}", - Long.toHexString(zxid), - Long.toHexString(p.hdr.getZxid())); - continue; - } - packetsCommitted.remove(); - Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); - request.setTxn(p.rec); - request.setHdr(p.hdr); - request.setTxnDigest(p.digest); - ozk.commitRequest(request); - } - } else { - // New server type need to handle in-flight packets - throw new UnsupportedOperationException("Unknown server type"); + helper.applyDelayedPackets(); + } + + private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception { + QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8)); + boolean majorChange = self.processReconfig(qv, newLeader, zxid, true); + if (majorChange) { + throw new Exception("changes proposed in reconfig"); } } diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java index c1dc5cf2b8c..8ea94fd4daf 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +18,9 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.ServerCnxn; @@ -27,10 +29,6 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; - /** * Parent class for all ZooKeeperServers for Learners */ @@ -73,7 +71,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { */ @Override public long getServerId() { - return self.getId(); + return self.getMyId(); } @Override @@ -82,7 +80,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { this, getZKDatabase().getSessionWithTimeOuts(), this.tickTime, - self.getId(), + self.getMyId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -157,8 +155,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } - else { + } else { LOG.info("Shutting down"); try { if (syncProcessor != null) { @@ -170,8 +167,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer { // that contains entries we have already written to our transaction log. syncProcessor.shutdown(); } - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e); } } diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java index 37ca16ed52b..1a44a98e6e7 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +18,9 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -28,10 +30,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.function.BiConsumer; - /** * A ZooKeeperServer for the Observer node type. Not much is different, but * we anticipate specializing the request processors in the future. @@ -50,7 +48,7 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer { /* * Pending sync requests - */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>(); + */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<>(); ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException { super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self); @@ -109,9 +107,6 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } - else { - syncProcessor = null; - } } /* diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index f6fc87d7716..f6fc87d7716 100644 --- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index b74ca0d716b..a96a395b03b 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +18,10 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Objects; +import java.util.stream.Collectors; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; @@ -32,11 +35,6 @@ import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerBean; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.Objects; -import java.util.stream.Collectors; - /** * A ZooKeeperServer which comes into play when peer is partitioned from the * majority. Handles read-only clients, but drops connections from not-read-only @@ -90,7 +88,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer { public void createSessionTracker() { sessionTracker = new LearnerSessionTracker( this, getZKDatabase().getSessionWithTimeOuts(), - this.tickTime, self.getId(), self.areLocalSessionsEnabled(), + this.tickTime, self.getMyId(), self.areLocalSessionsEnabled(), getZooKeeperServerListener()); } @@ -188,16 +186,14 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer { */ @Override public long getServerId() { - return self.getId(); + return self.getMyId(); } @Override public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { - super.shutdown(fullyShutDown); LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); - } - else { + } else { shutdown = true; unregisterJMX(this); diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java index ec4c326e9aa..d65ead216f0 100644 --- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java +++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java @@ -1,4 +1,3 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +18,9 @@ package org.apache.zookeeper.server.quorum; +import java.io.Flushable; +import java.io.IOException; +import java.net.Socket; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; @@ -26,15 +28,11 @@ import org.apache.zookeeper.server.ServerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Flushable; -import java.io.IOException; -import java.net.Socket; - public class SendAckRequestProcessor implements RequestProcessor, Flushable { private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class); - Learner learner; + final Learner learner; SendAckRequestProcessor(Learner peer) { this.learner = peer; |