summaryrefslogtreecommitdiffstats
path: root/zookeeper-server
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-12-06 10:08:15 +0100
committerjonmv <venstad@gmail.com>2023-12-06 10:08:15 +0100
commit7d57cf528d45b3d320170be6c50429bcfef731be (patch)
tree4efb72ac17151b5d9910ff766af5c19831423f42 /zookeeper-server
parent891c9170a629ff6b598333cd4a981f2b0d74aa06 (diff)
Change default to 3.9.1 and put 3.8.0 as alternative
Diffstat (limited to 'zookeeper-server')
-rw-r--r--zookeeper-server/pom.xml2
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt4
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/pom.xml (renamed from zookeeper-server/zookeeper-server-3.9.1/pom.xml)4
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java)2
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/common/NetUtils.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/NetUtils.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java)21
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java)0
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java)330
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java)14
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java)292
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java)18
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java)13
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java)18
-rw-r--r--zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java)10
-rw-r--r--zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt2
-rw-r--r--zookeeper-server/zookeeper-server/CMakeLists.txt4
-rw-r--r--zookeeper-server/zookeeper-server/pom.xml4
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java2
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/ClientX509Util.java)0
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java21
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java330
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java14
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java292
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java18
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java13
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java (renamed from zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java)0
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java18
-rw-r--r--zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java10
33 files changed, 728 insertions, 728 deletions
diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml
index 1bb2f5971ab..269e0d2479a 100644
--- a/zookeeper-server/pom.xml
+++ b/zookeeper-server/pom.xml
@@ -13,8 +13,8 @@
<version>8-SNAPSHOT</version>
<modules>
<module>zookeeper-server-common</module>
+ <module>zookeeper-server-3.8.0</module>
<module>zookeeper-server</module>
- <module>zookeeper-server-3.9.1</module>
</modules>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt
new file mode 100644
index 00000000000..15d4c2082c4
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.0/CMakeLists.txt
@@ -0,0 +1,4 @@
+# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+install_jar(zookeeper-server-3.8.0-jar-with-dependencies.jar)
+# Make symlink so that we have a default version, should be done only in zookeeper-server module
+install_symlink(lib/jars/zookeeper-server-3.8.0-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server-3.9.1/pom.xml b/zookeeper-server/zookeeper-server-3.8.0/pom.xml
index 77aec63a781..2037b9cf7c5 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/pom.xml
+++ b/zookeeper-server/zookeeper-server-3.8.0/pom.xml
@@ -8,11 +8,11 @@
<version>8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>zookeeper-server-3.9.1</artifactId>
+ <artifactId>zookeeper-server-3.8.0</artifactId>
<packaging>container-plugin</packaging>
<version>8-SNAPSHOT</version>
<properties>
- <zookeeper.version>3.9.1</zookeeper.version>
+ <zookeeper.version>3.8.0</zookeeper.version>
</properties>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
index d986f02d89a..d986f02d89a 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ConfigServerZooKeeperServer.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
index 1b469beb1b8..1b469beb1b8 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/ReconfigurableVespaZooKeeperServer.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
index 100de4894ae..68f7459530e 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
@@ -3,13 +3,11 @@ package com.yahoo.vespa.zookeeper;
import com.yahoo.security.X509SslContext;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
-import javax.net.ssl.KeyManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
index dd5ac4e252b..dd5ac4e252b 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaQuorumPeer.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
index 1f15c758583..1f15c758583 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperAdminImpl.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
index 4a7f85d6985..4a7f85d6985 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/com/yahoo/vespa/zookeeper/VespaZooKeeperServerImpl.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/NetUtils.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/common/NetUtils.java
index baa69f12968..baa69f12968 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/NetUtils.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/common/NetUtils.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index cf7f4c44015..2610b96d94c 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -63,8 +64,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
}
}
- private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null);
- private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null);
+ private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
+ private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);
private static class DelayingProcessor implements RequestProcessor, Flushable {
private final RequestProcessor next;
@@ -90,12 +91,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public void shutdown() {
next.shutdown();
}
- private void startDelaying() {
+ private void close() {
if (delayed == null) {
delayed = new ArrayDeque<>();
}
}
- private void flushAndStopDelaying() throws RequestProcessorException {
+ private void open() throws RequestProcessorException {
if (delayed != null) {
for (Request request : delayed) {
next.processRequest(request);
@@ -119,7 +120,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
private int randRoll;
private long randSize;
- private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>();
+ private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
private final Semaphore snapThreadMutex = new Semaphore(1);
@@ -224,12 +225,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
break;
}
- if (si == TURN_FORWARDING_DELAY_ON_REQUEST) {
- nextProcessor.startDelaying();
+ if (si == turnForwardingDelayOn) {
+ nextProcessor.close();
continue;
}
- if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) {
- nextProcessor.flushAndStopDelaying();
+ if (si == turnForwardingDelayOff) {
+ nextProcessor.open();
continue;
}
@@ -295,7 +296,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
}
public void setDelayForwarding(boolean delayForwarding) {
- queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST);
+ queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
}
private void flush() throws IOException, RequestProcessorException {
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
index 114d2987fe2..114d2987fe2 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/VespaNettyServerCnxnFactory.java
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 895bbeffa5f..12d9d46fc3a 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,33 +19,8 @@
package org.apache.zookeeper.server;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.zip.Adler32;
-import java.util.zip.CheckedInputStream;
-import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
-import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
@@ -96,6 +72,27 @@ import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.security.sasl.SaslException;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
@@ -114,7 +111,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
- static boolean enableEagerACLCheck;
+ static final boolean enableEagerACLCheck;
static final boolean skipACL;
@@ -126,14 +123,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
- public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
- private static boolean serializeLastProcessedZxidEnabled;
-
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
- private volatile CountDownLatch restoreLatch;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -163,20 +156,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
-
- setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
- System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
- }
-
- // @VisibleForTesting
- public static boolean isEnableEagerACLCheck() {
- return enableEagerACLCheck;
- }
-
- // @VisibleForTesting
- public static void setEnableEagerACLCheck(boolean enabled) {
- ZooKeeperServer.enableEagerACLCheck = enabled;
- LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled);
}
public static boolean isCloseSessionTxnEnabled() {
@@ -240,7 +219,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicInteger requestsInProcess = new AtomicInteger(0);
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
- final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
+ final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
@@ -287,7 +266,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
// Connection throttling
- private final BlueThrottle connThrottle = new BlueThrottle();
+ private BlueThrottle connThrottle = new BlueThrottle();
private RequestThrottler requestThrottler;
public static final String SNAP_COUNT = "zookeeper.snapCount";
@@ -298,17 +277,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* too many large requests such that the JVM runs out of usable heap and
* ultimately crashes.
*
- * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
+ * The limit is enforced by the {@link checkRequestSize(int, boolean)}
* method which is called by the connection layer ({@link NIOServerCnxn},
* {@link NettyServerCnxn}) before allocating a byte buffer and pulling
* data off the TCP socket. The limit is then checked again by the
- * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
- * also atomically updates {@link #currentLargeRequestBytes}. The request is
+ * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
+ * also atomically updates {@link currentLargeRequestBytes}. The request is
* then marked as a large request, with the request size stored in the Request
- * object so that it can later be decremented from {@link #currentLargeRequestBytes}.
+ * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
*
* When a request is completed or dropped, the relevant code path calls the
- * {@link #requestFinished(Request)} method which performs the decrement if
+ * {@link requestFinished(Request)} method which performs the decrement if
* needed.
*/
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
@@ -321,7 +300,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
- private final AuthenticationHelper authHelper = new AuthenticationHelper();
+ private AuthenticationHelper authHelper;
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
@@ -337,6 +316,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
listener = new ZooKeeperServerListenerImpl(this);
serverStats = new ServerStats(this);
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
+ this.authHelper = new AuthenticationHelper();
}
/**
@@ -378,8 +358,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
this.initLargeRequestThrottlingSettings();
+ this.authHelper = new AuthenticationHelper();
+
LOG.info(
- "Created server with"
+ "Created patched server with"
+ " tickTime {} ms"
+ " minSessionTimeout {} ms"
+ " maxSessionTimeout {} ms"
@@ -544,100 +526,23 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public File takeSnapshot() throws IOException {
- return takeSnapshot(false);
+ public void takeSnapshot() {
+ takeSnapshot(false);
}
- public File takeSnapshot(boolean syncSnap) throws IOException {
- return takeSnapshot(syncSnap, true, false);
- }
-
- /**
- * Takes a snapshot on the server.
- *
- * @param syncSnap syncSnap sync the snapshot immediately after write
- * @param isSevere if true system exist, otherwise throw IOException
- * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
- *
- * @return file snapshot file object
- * @throws IOException
- */
- public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
+ public void takeSnapshot(boolean syncSnap) {
long start = Time.currentElapsedTime();
- File snapFile = null;
try {
- if (fastForwardFromEdits) {
- zkDb.fastForwardDataBase();
- }
- snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+ txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
- if (isSevere) {
- LOG.error("Severe unrecoverable error, exiting", e);
- // This is a severe error that we cannot recover from,
- // so we need to exit
- ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
- } else {
- throw e;
- }
+ LOG.error("Severe unrecoverable error, exiting", e);
+ // This is a severe error that we cannot recover from,
+ // so we need to exit
+ ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in {} ms", elapsed);
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
- return snapFile;
- }
-
- /**
- * Restores database from a snapshot. It is used by the restore admin server command.
- *
- * @param inputStream input stream of snapshot
- * @return last processed zxid
- */
- public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
- if (inputStream == null) {
- throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
- }
-
- long start = Time.currentElapsedTime();
- LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
- getZKDatabase().getDataTreeLastProcessedZxid(),
- getZKDatabase().dataTree.getNodeCount(),
- getZKDatabase().getSessionCount());
-
- // restore to a new zkDatabase
- final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
- final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
- final InputArchive ia = BinaryInputArchive.getArchive(cis);
- newZKDatabase.deserializeSnapshot(ia, cis);
- LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
- newZKDatabase.getDataTreeLastProcessedZxid(),
- newZKDatabase.dataTree.getNodeCount(),
- newZKDatabase.getSessionCount());
-
- // create a CountDownLatch
- restoreLatch = new CountDownLatch(1);
-
- try {
- // set to the new zkDatabase
- setZKDatabase(newZKDatabase);
-
- // re-create SessionTrack
- createSessionTracker();
- } finally {
- // unblock request submission
- restoreLatch.countDown();
- restoreLatch = null;
- }
-
- LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
- getZKDatabase().getDataTreeLastProcessedZxid(),
- getZKDatabase().dataTree.getNodeCount(),
- getZKDatabase().getSessionCount());
-
- long elapsed = Time.currentElapsedTime() - start;
- LOG.info("Restore taken in {} ms", elapsed);
- ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
-
- return getLastProcessedZxid();
}
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
@@ -830,12 +735,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
protected void startRequestThrottler() {
- requestThrottler = createRequestThrottler();
+ requestThrottler = new RequestThrottler(this);
requestThrottler.start();
- }
- protected RequestThrottler createRequestThrottler() {
- return new RequestThrottler(this);
}
protected void setupRequestProcessors() {
@@ -881,10 +783,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* error events, e.g., SyncRequestProcessor not being able to write a txn to
* disk.</li>
* <li>During shutdown the server sets the state to SHUTDOWN, which
- * corresponds to the server not running.</li>
- *
- * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
- * </li></ul>
+ * corresponds to the server not running.</li></ul>
*
* @param state new server state.
*/
@@ -1099,9 +998,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
- CreateSessionTxn txn = new CreateSessionTxn(timeout);
+ ByteBuffer to = ByteBuffer.allocate(4);
+ to.putInt(timeout);
cnxn.setSessionId(sessionId);
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
+ Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
submitRequest(si);
return sessionId;
}
@@ -1159,12 +1059,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
- valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
- this instanceof ReadOnlyZooKeeperServer);
+ valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
+ if (!cnxn.isOldClient) {
+ bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
+ }
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
@@ -1211,14 +1113,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public void submitRequest(Request si) {
- if (restoreLatch != null) {
- try {
- LOG.info("Blocking request submission while restore is in progress");
- restoreLatch.await();
- } catch (final InterruptedException e) {
- LOG.warn("Unexpected interruption", e);
- }
- }
enqueueRequest(si);
}
@@ -1468,13 +1362,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return connThrottle.getDropChance();
}
- public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
+ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+ throws IOException, ClientCnxnLimitException {
+
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+ ConnectRequest connReq = new ConnectRequest();
+ connReq.deserialize(bia, "connect");
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
- Long.toHexString(request.getLastZxidSeen()));
+ Long.toHexString(connReq.getLastZxidSeen()));
- long sessionId = request.getSessionId();
+ long sessionId = connReq.getSessionId();
int tokensNeeded = 1;
if (connThrottle.isConnectionWeightEnabled()) {
if (sessionId == 0) {
@@ -1492,24 +1391,30 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
- if (!cnxn.protocolManager.isReadonlyAvailable()) {
+ boolean readOnly = false;
+ try {
+ readOnly = bia.readBool("readOnly");
+ cnxn.isOldClient = false;
+ } catch (IOException e) {
+ // this is ok -- just a packet from an old client which
+ // doesn't contain readOnly field
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
-
- if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
+ if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
- if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+ if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
- + Long.toHexString(request.getLastZxidSeen())
+ + Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
@@ -1517,8 +1422,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
- int sessionTimeout = request.getTimeOut();
- byte[] passwd = request.getPasswd();
+ int sessionTimeout = connReq.getTimeOut();
+ byte[] passwd = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
@@ -1536,16 +1441,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.debug(
"Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
- Long.toHexString(request.getLastZxidSeen()),
- request.getTimeOut(),
+ Long.toHexString(connReq.getLastZxidSeen()),
+ connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
- Long.toHexString(request.getLastZxidSeen()),
- request.getTimeOut(),
+ Long.toHexString(connReq.getLastZxidSeen()),
+ connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
@@ -1685,7 +1590,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
+ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+ // We have the request, now process and setup for next
+ InputStream bais = new ByteBufferInputStream(incomingBuffer);
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+ RequestHeader h = new RequestHeader();
+ h.deserialize(bia, "header");
+
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
@@ -1697,12 +1608,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
+ // Through the magic of byte buffers, txn will not be
+ // pointing
+ // to the start of the txn
+ incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
- AuthPacket authPacket = request.readRecord(AuthPacket::new);
+ AuthPacket authPacket = new AuthPacket();
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
- Code authReturn = KeeperException.Code.AUTHFAILED;
+ Code authReturn = Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
@@ -1712,14 +1628,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
- authReturn = KeeperException.Code.AUTHFAILED;
+ authReturn = Code.AUTHFAILED;
}
}
- if (authReturn == KeeperException.Code.OK) {
+ if (authReturn == Code.OK) {
LOG.info("Session 0x{}: auth success for scheme {} and address {}",
Long.toHexString(cnxn.getSessionId()), scheme,
cnxn.getRemoteSocketAddress());
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
@@ -1731,7 +1647,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
@@ -1739,15 +1655,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
return;
} else if (h.getType() == OpCode.sasl) {
- processSasl(request, cnxn, h);
+ processSasl(incomingBuffer, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
- Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
- int length = request.limit();
+ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
+ int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
@@ -1785,9 +1701,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
}
- private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
+ private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
LOG.debug("Responding to client SASL token.");
- GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
+ GetSASLRequest clientTokenRecord = new GetSASLRequest();
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
byte[] clientToken = clientTokenRecord.getToken();
LOG.debug("Size of client SASL token: {}", clientToken.length);
byte[] responseToken = null;
@@ -2062,7 +1979,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
/**
* Grant or deny authorization to an operation on a node as a function of:
- * @param cnxn : the server connection or null for admin server commands
+ * @param cnxn : the server connection
* @param acl : set of ACLs for the node
* @param perm : the permission that the client is requesting
* @param ids : the credentials supplied by the client
@@ -2235,15 +2152,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ZooKeeperServer.digestEnabled = digestEnabled;
}
- public static boolean isSerializeLastProcessedZxidEnabled() {
- return serializeLastProcessedZxidEnabled;
- }
-
- public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
- serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
- LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
- }
-
/**
* Trim a path to get the immediate predecessor.
*
@@ -2267,8 +2175,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
switch (request.type) {
case OpCode.create:
case OpCode.create2: {
- CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
- if (req != null) {
+ CreateRequest req = new CreateRequest();
+ if (buffer2Record(request.request, req)) {
mustCheckACL = true;
acl = req.getAcl();
path = parentPath(req.getPath());
@@ -2276,22 +2184,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
break;
}
case OpCode.delete: {
- DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
- if (req != null) {
+ DeleteRequest req = new DeleteRequest();
+ if (buffer2Record(request.request, req)) {
path = parentPath(req.getPath());
}
break;
}
case OpCode.setData: {
- SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
- if (req != null) {
+ SetDataRequest req = new SetDataRequest();
+ if (buffer2Record(request.request, req)) {
path = req.getPath();
}
break;
}
case OpCode.setACL: {
- SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
- if (req != null) {
+ SetACLRequest req = new SetACLRequest();
+ if (buffer2Record(request.request, req)) {
mustCheckACL = true;
acl = req.getAcl();
path = req.getPath();
@@ -2348,7 +2256,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return true;
}
- err = KeeperException.Code.OK.intValue();
+ err = Code.OK.intValue();
try {
pathToCheck = effectiveACLPath(request);
@@ -2369,7 +2277,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.error("Uncaught exception in authWriteRequest with: ", t);
throw t;
} finally {
- if (err != KeeperException.Code.OK.intValue()) {
+ if (err != Code.OK.intValue()) {
/* This request has a bad ACL, so we are dismissing it early. */
decInProcess();
ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
@@ -2381,7 +2289,19 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- return err == KeeperException.Code.OK.intValue();
+ return err == Code.OK.intValue();
+ }
+
+ private boolean buffer2Record(ByteBuffer request, Record record) {
+ boolean rv = false;
+ try {
+ ByteBufferInputStream.byteBuffer2Record(request, record);
+ request.rewind();
+ rv = true;
+ } catch (IOException ex) {
+ }
+
+ return rv;
}
public int getOutstandingHandshakeNum() {
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 1f629bed73d..a44ebc3f7b8 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,10 +19,6 @@
package org.apache.zookeeper.server.quorum;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import javax.management.JMException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
@@ -36,6 +33,11 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import javax.management.JMException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
/**
*
* Just like the standard ZooKeeperServer. We just replace the request
@@ -175,7 +177,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
this,
getZKDatabase().getSessionWithTimeOuts(),
tickTime,
- self.getMyId(),
+ self.getId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -291,7 +293,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getMyId();
+ return self.getId();
}
@Override
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 8d8b6dabce8..1f5f2a0b225 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -133,7 +134,7 @@ public class Learner {
LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
}
- final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
public int getPendingRevalidationsCount() {
return pendingRevalidations.size();
@@ -254,9 +255,13 @@ public class Learner {
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
- byte[] payload = request.readRequestBytes();
- if (payload != null) {
- oa.write(payload);
+ if (request.request != null) {
+ request.request.rewind();
+ int len = request.request.remaining();
+ byte[] b = new byte[len];
+ request.request.get(b);
+ request.request.rewind();
+ oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
@@ -491,7 +496,7 @@ public class Learner {
/*
* Add sid to payload
*/
- LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
+ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
@@ -543,140 +548,36 @@ public class Learner {
* @throws InterruptedException
*/
protected void syncWithLeader(long newLeaderZxid) throws Exception {
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
+ QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
- QuorumVerifier newLeaderQV = null;
-
- class SyncHelper {
-
- // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot.
- // For SNAP and TRUNC the snapshot is needed to save that history.
- boolean willSnapshot = true;
- boolean syncSnapshot = false;
- // PROPOSALs received during sync, for matching up with COMMITs.
- Deque<PacketInFlight> proposals = new ArrayDeque<>();
-
- // PROPOSALs we delay forwarding to the ZK server until sync is done.
- Deque<PacketInFlight> delayedProposals = new ArrayDeque<>();
-
- // COMMITs we delay forwarding to the ZK server until sync is done.
- Deque<Long> delayedCommits = new ArrayDeque<>();
-
- void syncSnapshot() {
- syncSnapshot = true;
- }
-
- void noSnapshot() {
- willSnapshot = false;
- }
-
- void propose(PacketInFlight pif) {
- proposals.add(pif);
- delayedProposals.add(pif);
- }
-
- PacketInFlight nextProposal() {
- return proposals.peekFirst();
- }
-
- void commit() {
- PacketInFlight packet = proposals.remove();
- if (willSnapshot) {
- zk.processTxn(packet.hdr, packet.rec);
- delayedProposals.remove();
- } else {
- delayedCommits.add(packet.hdr.getZxid());
- }
- }
-
- void writeState() throws IOException, InterruptedException {
- // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER,
- // since this allows the leader to apply those transactions to its served state:
- if (willSnapshot) {
- zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions,
- willSnapshot = false; // but anything after this needs to go to the transaction log; or
- }
-
- self.setCurrentEpoch(newEpoch);
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
-
- // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first.
- fzk.syncProcessor.setDelayForwarding(true);
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- delayedProposals.clear();
- fzk.syncProcessor.syncFlush();
- }
- }
-
- void flushAcks() throws InterruptedException {
- if (zk instanceof FollowerZooKeeperServer) {
- // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState.
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(false);
- fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK.
- }
- }
-
- void applyDelayedPackets() {
- // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs.
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- for (Long zxid : delayedCommits) {
- fzk.commit(zxid);
- }
- } else if (zk instanceof ObserverZooKeeperServer) {
- ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : delayedProposals) {
- Long zxid = delayedCommits.peekFirst();
- if (p.hdr.getZxid() != zxid) {
- // log warning message if there is no matching commit
- // old leader send outstanding proposal to observer
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(zxid),
- Long.toHexString(p.hdr.getZxid()));
- continue;
- }
- delayedCommits.remove();
- Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
- request.setTxnDigest(p.digest);
- ozk.commitRequest(request);
- }
- } else {
- // New server type need to handle in-flight packets
- throw new UnsupportedOperationException("Unknown server type");
- }
- }
-
- }
+ QuorumVerifier newLeaderQV = null;
- SyncHelper helper = new SyncHelper();
- QuorumPacket qp = new QuorumPacket();
+ // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
+ // For SNAP and TRUNC the snapshot is needed to save that history
+ boolean snapshotNeeded = true;
+ boolean syncSnapshot = false;
readPacket(qp);
+ Deque<Long> packetsCommitted = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
+ Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
- helper.syncSnapshot();
+ snapshotNeeded = true;
+ syncSnapshot = true;
} else {
- helper.noSnapshot();
+ snapshotNeeded = false;
}
} else if (qp.getType() == Leader.SNAP) {
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
+ // db is clear as part of deserializeSnapshot()
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
@@ -692,18 +593,20 @@ public class Learner {
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- // Immediately persist the latest snapshot when there is txn log gap
- helper.syncSnapshot();
+ // immediately persist the latest snapshot when there is txn log gap
+ syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
- // We need to truncate the log to the lastZxid of the leader
+ //we need to truncate the log to the lastzxid of the leader
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
+ // not able to truncate the log
LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
+
} else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
@@ -711,10 +614,17 @@ public class Learner {
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
-
- // we are now going to start getting transactions to apply followed by an UPTODATE
long lastQueued = 0;
+
+ // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
+ // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
+ // we need to make sure that we don't take the snapshot twice.
+ boolean isPreZAB1_0 = true;
+ //If we are not going to take the snapshot be sure the transactions are not applied in memory
+ // but written out to the transaction log
+ boolean writeToTxnLog = !snapshotNeeded;
TxnLogEntry logEntry;
+ // we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
readPacket(qp);
@@ -738,11 +648,13 @@ public class Learner {
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
self.setLastSeenQuorumVerifier(qv, true);
}
- helper.propose(pif);
+
+ packetsNotLogged.add(pif);
+ packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
- pif = helper.nextProposal();
+ pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
@@ -750,24 +662,43 @@ public class Learner {
Long.toHexString(pif.hdr.getZxid()));
} else {
if (qp.getType() == Leader.COMMITANDACTIVATE) {
- tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid());
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(
+ qv,
+ ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
+ true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
+ if (!writeToTxnLog) {
+ zk.processTxn(pif.hdr, pif.rec);
+ packetsNotLogged.remove();
+ packetsNotCommitted.remove();
+ } else {
+ packetsNotCommitted.remove();
+ packetsCommitted.add(qp.getZxid());
}
- helper.commit();
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
+
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
- byte[] remainingData = new byte[buffer.remaining()];
- buffer.get(remainingData);
- logEntry = SerializeUtils.deserializeTxn(remainingData);
+ byte[] remainingdata = new byte[buffer.remaining()];
+ buffer.get(remainingdata);
+ logEntry = SerializeUtils.deserializeTxn(remainingdata);
packet.hdr = logEntry.getHeader();
packet.rec = logEntry.getTxn();
packet.digest = logEntry.getDigest();
- tryReconfig(packet, suggestedLeaderId, qp.getZxid());
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
} else {
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packet.rec = logEntry.getTxn();
@@ -782,8 +713,14 @@ public class Learner {
}
lastQueued = packet.hdr.getZxid();
}
- helper.propose(packet);
- helper.commit();
+ if (!writeToTxnLog) {
+ // Apply to db directly if we haven't taken the snapshot
+ zk.processTxn(packet.hdr, packet.rec);
+ } else {
+ packetsNotLogged.add(packet);
+ packetsCommitted.add(qp.getZxid());
+ }
+
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
@@ -793,11 +730,15 @@ public class Learner {
throw new Exception("changes proposed in reconfig");
}
}
- helper.flushAcks();
+ if (isPreZAB1_0) {
+ zk.takeSnapshot(syncSnapshot);
+ self.setCurrentEpoch(newEpoch);
+ }
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
+ // means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData() != null && qp.getData().length > 1) {
try {
@@ -809,13 +750,40 @@ public class Learner {
}
}
- helper.writeState();
+ if (snapshotNeeded) {
+ zk.takeSnapshot(syncSnapshot);
+ }
+
+ self.setCurrentEpoch(newEpoch);
+ writeToTxnLog = true;
+ //Anything after this needs to go to the transaction log, not applied directly in memory
+ isPreZAB1_0 = false;
+
+ // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(true);
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ packetsNotLogged.clear();
+ fzk.syncProcessor.syncFlush();
+ }
+
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(false);
+ fzk.syncProcessor.syncFlush();
+ }
break;
}
}
}
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startServing();
@@ -828,14 +796,40 @@ public class Learner {
*/
self.updateElectionVote(newEpoch);
- helper.applyDelayedPackets();
- }
-
- private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception {
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(qv, newLeader, zxid, true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
+ // We need to log the stuff that came in between the snapshot and the uptodate
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ for (Long zxid : packetsCommitted) {
+ fzk.commit(zxid);
+ }
+ } else if (zk instanceof ObserverZooKeeperServer) {
+ // Similar to follower, we need to log requests between the snapshot
+ // and UPTODATE
+ ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+ for (PacketInFlight p : packetsNotLogged) {
+ Long zxid = packetsCommitted.peekFirst();
+ if (p.hdr.getZxid() != zxid) {
+ // log warning message if there is no matching commit
+ // old leader send outstanding proposal to observer
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(zxid),
+ Long.toHexString(p.hdr.getZxid()));
+ continue;
+ }
+ packetsCommitted.remove();
+ Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
+ request.setTxn(p.rec);
+ request.setHdr(p.hdr);
+ request.setTxnDigest(p.digest);
+ ozk.commitRequest(request);
+ }
+ } else {
+ // New server type need to handle in-flight packets
+ throw new UnsupportedOperationException("Unknown server type");
}
}
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
index 8ea94fd4daf..c1dc5cf2b8c 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,9 +19,6 @@
package org.apache.zookeeper.server.quorum;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.ServerCnxn;
@@ -29,6 +27,10 @@ import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
/**
* Parent class for all ZooKeeperServers for Learners
*/
@@ -71,7 +73,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getMyId();
+ return self.getId();
}
@Override
@@ -80,7 +82,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
this,
getZKDatabase().getSessionWithTimeOuts(),
this.tickTime,
- self.getMyId(),
+ self.getId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -155,7 +157,8 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- } else {
+ }
+ else {
LOG.info("Shutting down");
try {
if (syncProcessor != null) {
@@ -167,7 +170,8 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
// that contains entries we have already written to our transaction log.
syncProcessor.shutdown();
}
- } catch (Exception e) {
+ }
+ catch (Exception e) {
LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
}
}
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
index 1a44a98e6e7..37ca16ed52b 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,9 +19,6 @@
package org.apache.zookeeper.server.quorum;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiConsumer;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -30,6 +28,10 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
+
/**
* A ZooKeeperServer for the Observer node type. Not much is different, but
* we anticipate specializing the request processors in the future.
@@ -48,7 +50,7 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
/*
* Pending sync requests
- */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<>();
+ */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
@@ -107,6 +109,9 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
+ else {
+ syncProcessor = null;
+ }
}
/*
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
index a96a395b03b..b74ca0d716b 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,10 +19,6 @@
package org.apache.zookeeper.server.quorum;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Objects;
-import java.util.stream.Collectors;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
@@ -35,6 +32,11 @@ import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
/**
* A ZooKeeperServer which comes into play when peer is partitioned from the
* majority. Handles read-only clients, but drops connections from not-read-only
@@ -88,7 +90,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(
this, getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime, self.getMyId(), self.areLocalSessionsEnabled(),
+ this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -186,14 +188,16 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getMyId();
+ return self.getId();
}
@Override
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
+ super.shutdown(fullyShutDown);
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- } else {
+ }
+ else {
shutdown = true;
unregisterJMX(this);
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
index d65ead216f0..ec4c326e9aa 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++ b/zookeeper-server/zookeeper-server-3.8.0/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -1,3 +1,4 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,9 +19,6 @@
package org.apache.zookeeper.server.quorum;
-import java.io.Flushable;
-import java.io.IOException;
-import java.net.Socket;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -28,11 +26,15 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.Socket;
+
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
- final Learner learner;
+ Learner learner;
SendAckRequestProcessor(Learner peer) {
this.learner = peer;
diff --git a/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt b/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt
deleted file mode 100644
index 295693f22d7..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.1/CMakeLists.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-install_jar(zookeeper-server-3.9.1-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server/CMakeLists.txt b/zookeeper-server/zookeeper-server/CMakeLists.txt
index 15d4c2082c4..295693f22d7 100644
--- a/zookeeper-server/zookeeper-server/CMakeLists.txt
+++ b/zookeeper-server/zookeeper-server/CMakeLists.txt
@@ -1,4 +1,2 @@
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-install_jar(zookeeper-server-3.8.0-jar-with-dependencies.jar)
-# Make symlink so that we have a default version, should be done only in zookeeper-server module
-install_symlink(lib/jars/zookeeper-server-3.8.0-jar-with-dependencies.jar lib/jars/zookeeper-server-jar-with-dependencies.jar)
+install_jar(zookeeper-server-3.9.1-jar-with-dependencies.jar)
diff --git a/zookeeper-server/zookeeper-server/pom.xml b/zookeeper-server/zookeeper-server/pom.xml
index 2037b9cf7c5..77aec63a781 100644
--- a/zookeeper-server/zookeeper-server/pom.xml
+++ b/zookeeper-server/zookeeper-server/pom.xml
@@ -8,11 +8,11 @@
<version>8-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>zookeeper-server-3.8.0</artifactId>
+ <artifactId>zookeeper-server-3.9.1</artifactId>
<packaging>container-plugin</packaging>
<version>8-SNAPSHOT</version>
<properties>
- <zookeeper.version>3.8.0</zookeeper.version>
+ <zookeeper.version>3.9.1</zookeeper.version>
</properties>
<dependencies>
<dependency>
diff --git a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
index 68f7459530e..100de4894ae 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/com/yahoo/vespa/zookeeper/VespaMtlsAuthenticationProvider.java
@@ -3,11 +3,13 @@ package com.yahoo.vespa.zookeeper;
import com.yahoo.security.X509SslContext;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import javax.net.ssl.KeyManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/ClientX509Util.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
index c0034a4723f..c0034a4723f 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/common/ClientX509Util.java
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 2610b96d94c..cf7f4c44015 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -64,8 +63,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
}
}
- private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
- private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);
+ private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null);
+ private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null);
private static class DelayingProcessor implements RequestProcessor, Flushable {
private final RequestProcessor next;
@@ -91,12 +90,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
public void shutdown() {
next.shutdown();
}
- private void close() {
+ private void startDelaying() {
if (delayed == null) {
delayed = new ArrayDeque<>();
}
}
- private void open() throws RequestProcessorException {
+ private void flushAndStopDelaying() throws RequestProcessorException {
if (delayed != null) {
for (Request request : delayed) {
next.processRequest(request);
@@ -120,7 +119,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
private int randRoll;
private long randSize;
- private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+ private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>();
private final Semaphore snapThreadMutex = new Semaphore(1);
@@ -225,12 +224,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
break;
}
- if (si == turnForwardingDelayOn) {
- nextProcessor.close();
+ if (si == TURN_FORWARDING_DELAY_ON_REQUEST) {
+ nextProcessor.startDelaying();
continue;
}
- if (si == turnForwardingDelayOff) {
- nextProcessor.open();
+ if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) {
+ nextProcessor.flushAndStopDelaying();
continue;
}
@@ -296,7 +295,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements Req
}
public void setDelayForwarding(boolean delayForwarding) {
- queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
+ queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST);
}
private void flush() throws IOException, RequestProcessorException {
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 12d9d46fc3a..895bbeffa5f 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,8 +18,33 @@
package org.apache.zookeeper.server;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
@@ -72,27 +96,6 @@ import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.sasl.SaslException;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
@@ -111,7 +114,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
- static final boolean enableEagerACLCheck;
+ static boolean enableEagerACLCheck;
static final boolean skipACL;
@@ -123,10 +126,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
+ public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
+ private static boolean serializeLastProcessedZxidEnabled;
+
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
+ private volatile CountDownLatch restoreLatch;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -156,6 +163,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+
+ setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
+ System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
+ }
+
+ // @VisibleForTesting
+ public static boolean isEnableEagerACLCheck() {
+ return enableEagerACLCheck;
+ }
+
+ // @VisibleForTesting
+ public static void setEnableEagerACLCheck(boolean enabled) {
+ ZooKeeperServer.enableEagerACLCheck = enabled;
+ LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled);
}
public static boolean isCloseSessionTxnEnabled() {
@@ -219,7 +240,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicInteger requestsInProcess = new AtomicInteger(0);
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
- final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();
+ final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
@@ -266,7 +287,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
// Connection throttling
- private BlueThrottle connThrottle = new BlueThrottle();
+ private final BlueThrottle connThrottle = new BlueThrottle();
private RequestThrottler requestThrottler;
public static final String SNAP_COUNT = "zookeeper.snapCount";
@@ -277,17 +298,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* too many large requests such that the JVM runs out of usable heap and
* ultimately crashes.
*
- * The limit is enforced by the {@link checkRequestSize(int, boolean)}
+ * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
* method which is called by the connection layer ({@link NIOServerCnxn},
* {@link NettyServerCnxn}) before allocating a byte buffer and pulling
* data off the TCP socket. The limit is then checked again by the
- * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
- * also atomically updates {@link currentLargeRequestBytes}. The request is
+ * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
+ * also atomically updates {@link #currentLargeRequestBytes}. The request is
* then marked as a large request, with the request size stored in the Request
- * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
+ * object so that it can later be decremented from {@link #currentLargeRequestBytes}.
*
* When a request is completed or dropped, the relevant code path calls the
- * {@link requestFinished(Request)} method which performs the decrement if
+ * {@link #requestFinished(Request)} method which performs the decrement if
* needed.
*/
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
@@ -300,7 +321,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
- private AuthenticationHelper authHelper;
+ private final AuthenticationHelper authHelper = new AuthenticationHelper();
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
@@ -316,7 +337,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
listener = new ZooKeeperServerListenerImpl(this);
serverStats = new ServerStats(this);
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
- this.authHelper = new AuthenticationHelper();
}
/**
@@ -358,10 +378,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
this.initLargeRequestThrottlingSettings();
- this.authHelper = new AuthenticationHelper();
-
LOG.info(
- "Created patched server with"
+ "Created server with"
+ " tickTime {} ms"
+ " minSessionTimeout {} ms"
+ " maxSessionTimeout {} ms"
@@ -526,23 +544,100 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
takeSnapshot();
}
- public void takeSnapshot() {
- takeSnapshot(false);
+ public File takeSnapshot() throws IOException {
+ return takeSnapshot(false);
}
- public void takeSnapshot(boolean syncSnap) {
+ public File takeSnapshot(boolean syncSnap) throws IOException {
+ return takeSnapshot(syncSnap, true, false);
+ }
+
+ /**
+ * Takes a snapshot on the server.
+ *
+ * @param syncSnap syncSnap sync the snapshot immediately after write
+ * @param isSevere if true system exist, otherwise throw IOException
+ * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
+ *
+ * @return file snapshot file object
+ * @throws IOException
+ */
+ public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
long start = Time.currentElapsedTime();
+ File snapFile = null;
try {
- txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
+ if (fastForwardFromEdits) {
+ zkDb.fastForwardDataBase();
+ }
+ snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
- LOG.error("Severe unrecoverable error, exiting", e);
- // This is a severe error that we cannot recover from,
- // so we need to exit
- ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ if (isSevere) {
+ LOG.error("Severe unrecoverable error, exiting", e);
+ // This is a severe error that we cannot recover from,
+ // so we need to exit
+ ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
+ } else {
+ throw e;
+ }
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in {} ms", elapsed);
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
+ return snapFile;
+ }
+
+ /**
+ * Restores database from a snapshot. It is used by the restore admin server command.
+ *
+ * @param inputStream input stream of snapshot
+ * @return last processed zxid
+ */
+ public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
+ if (inputStream == null) {
+ throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
+ }
+
+ long start = Time.currentElapsedTime();
+ LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ // restore to a new zkDatabase
+ final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
+ final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
+ final InputArchive ia = BinaryInputArchive.getArchive(cis);
+ newZKDatabase.deserializeSnapshot(ia, cis);
+ LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ newZKDatabase.getDataTreeLastProcessedZxid(),
+ newZKDatabase.dataTree.getNodeCount(),
+ newZKDatabase.getSessionCount());
+
+ // create a CountDownLatch
+ restoreLatch = new CountDownLatch(1);
+
+ try {
+ // set to the new zkDatabase
+ setZKDatabase(newZKDatabase);
+
+ // re-create SessionTrack
+ createSessionTracker();
+ } finally {
+ // unblock request submission
+ restoreLatch.countDown();
+ restoreLatch = null;
+ }
+
+ LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+ getZKDatabase().getDataTreeLastProcessedZxid(),
+ getZKDatabase().dataTree.getNodeCount(),
+ getZKDatabase().getSessionCount());
+
+ long elapsed = Time.currentElapsedTime() - start;
+ LOG.info("Restore taken in {} ms", elapsed);
+ ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
+
+ return getLastProcessedZxid();
}
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
@@ -735,9 +830,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
protected void startRequestThrottler() {
- requestThrottler = new RequestThrottler(this);
+ requestThrottler = createRequestThrottler();
requestThrottler.start();
+ }
+ protected RequestThrottler createRequestThrottler() {
+ return new RequestThrottler(this);
}
protected void setupRequestProcessors() {
@@ -783,7 +881,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
* error events, e.g., SyncRequestProcessor not being able to write a txn to
* disk.</li>
* <li>During shutdown the server sets the state to SHUTDOWN, which
- * corresponds to the server not running.</li></ul>
+ * corresponds to the server not running.</li>
+ *
+ * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
+ * </li></ul>
*
* @param state new server state.
*/
@@ -998,10 +1099,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
- ByteBuffer to = ByteBuffer.allocate(4);
- to.putInt(timeout);
+ CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+ Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
@@ -1059,14 +1159,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
- valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
+ valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
+ this instanceof ReadOnlyZooKeeperServer);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
- if (!cnxn.isOldClient) {
- bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
- }
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
@@ -1113,6 +1211,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public void submitRequest(Request si) {
+ if (restoreLatch != null) {
+ try {
+ LOG.info("Blocking request submission while restore is in progress");
+ restoreLatch.await();
+ } catch (final InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ }
enqueueRequest(si);
}
@@ -1362,18 +1468,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return connThrottle.getDropChance();
}
- public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
- throws IOException, ClientCnxnLimitException {
-
- BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
- ConnectRequest connReq = new ConnectRequest();
- connReq.deserialize(bia, "connect");
+ public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
- Long.toHexString(connReq.getLastZxidSeen()));
+ Long.toHexString(request.getLastZxidSeen()));
- long sessionId = connReq.getSessionId();
+ long sessionId = request.getSessionId();
int tokensNeeded = 1;
if (connThrottle.isConnectionWeightEnabled()) {
if (sessionId == 0) {
@@ -1391,30 +1492,24 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
-
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
- boolean readOnly = false;
- try {
- readOnly = bia.readBool("readOnly");
- cnxn.isOldClient = false;
- } catch (IOException e) {
- // this is ok -- just a packet from an old client which
- // doesn't contain readOnly field
+ if (!cnxn.protocolManager.isReadonlyAvailable()) {
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
- if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
+
+ if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
- if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+ if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
- + Long.toHexString(connReq.getLastZxidSeen())
+ + Long.toHexString(request.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
@@ -1422,8 +1517,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
- int sessionTimeout = connReq.getTimeOut();
- byte[] passwd = connReq.getPasswd();
+ int sessionTimeout = request.getTimeOut();
+ byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
@@ -1441,16 +1536,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.debug(
"Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
- Long.toHexString(connReq.getLastZxidSeen()),
- connReq.getTimeOut(),
+ Long.toHexString(request.getLastZxidSeen()),
+ request.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
- Long.toHexString(connReq.getLastZxidSeen()),
- connReq.getTimeOut(),
+ Long.toHexString(request.getLastZxidSeen()),
+ request.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
@@ -1590,13 +1685,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
- // We have the request, now process and setup for next
- InputStream bais = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
- RequestHeader h = new RequestHeader();
- h.deserialize(bia, "header");
-
+ public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
@@ -1608,17 +1697,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
- // Through the magic of byte buffers, txn will not be
- // pointing
- // to the start of the txn
- incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
- AuthPacket authPacket = new AuthPacket();
- ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
+ AuthPacket authPacket = request.readRecord(AuthPacket::new);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
- Code authReturn = Code.AUTHFAILED;
+ Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
@@ -1628,14 +1712,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
- authReturn = Code.AUTHFAILED;
+ authReturn = KeeperException.Code.AUTHFAILED;
}
}
- if (authReturn == Code.OK) {
+ if (authReturn == KeeperException.Code.OK) {
LOG.info("Session 0x{}: auth success for scheme {} and address {}",
Long.toHexString(cnxn.getSessionId()), scheme,
cnxn.getRemoteSocketAddress());
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.OK.intValue());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
@@ -1647,7 +1731,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
- ReplyHeader rh = new ReplyHeader(h.getXid(), 0, Code.AUTHFAILED.intValue());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
@@ -1655,15 +1739,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
return;
} else if (h.getType() == OpCode.sasl) {
- processSasl(incomingBuffer, cnxn, h);
+ processSasl(request, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
- Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
- int length = incomingBuffer.limit();
+ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
+ int length = request.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
@@ -1701,10 +1785,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
}
- private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
+ private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
LOG.debug("Responding to client SASL token.");
- GetSASLRequest clientTokenRecord = new GetSASLRequest();
- ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
+ GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
byte[] clientToken = clientTokenRecord.getToken();
LOG.debug("Size of client SASL token: {}", clientToken.length);
byte[] responseToken = null;
@@ -1979,7 +2062,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
/**
* Grant or deny authorization to an operation on a node as a function of:
- * @param cnxn : the server connection
+ * @param cnxn : the server connection or null for admin server commands
* @param acl : set of ACLs for the node
* @param perm : the permission that the client is requesting
* @param ids : the credentials supplied by the client
@@ -2152,6 +2235,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ZooKeeperServer.digestEnabled = digestEnabled;
}
+ public static boolean isSerializeLastProcessedZxidEnabled() {
+ return serializeLastProcessedZxidEnabled;
+ }
+
+ public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
+ serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
+ LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
+ }
+
/**
* Trim a path to get the immediate predecessor.
*
@@ -2175,8 +2267,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
switch (request.type) {
case OpCode.create:
case OpCode.create2: {
- CreateRequest req = new CreateRequest();
- if (buffer2Record(request.request, req)) {
+ CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
+ if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = parentPath(req.getPath());
@@ -2184,22 +2276,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
break;
}
case OpCode.delete: {
- DeleteRequest req = new DeleteRequest();
- if (buffer2Record(request.request, req)) {
+ DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
+ if (req != null) {
path = parentPath(req.getPath());
}
break;
}
case OpCode.setData: {
- SetDataRequest req = new SetDataRequest();
- if (buffer2Record(request.request, req)) {
+ SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
+ if (req != null) {
path = req.getPath();
}
break;
}
case OpCode.setACL: {
- SetACLRequest req = new SetACLRequest();
- if (buffer2Record(request.request, req)) {
+ SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
+ if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = req.getPath();
@@ -2256,7 +2348,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return true;
}
- err = Code.OK.intValue();
+ err = KeeperException.Code.OK.intValue();
try {
pathToCheck = effectiveACLPath(request);
@@ -2277,7 +2369,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
LOG.error("Uncaught exception in authWriteRequest with: ", t);
throw t;
} finally {
- if (err != Code.OK.intValue()) {
+ if (err != KeeperException.Code.OK.intValue()) {
/* This request has a bad ACL, so we are dismissing it early. */
decInProcess();
ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
@@ -2289,19 +2381,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- return err == Code.OK.intValue();
- }
-
- private boolean buffer2Record(ByteBuffer request, Record record) {
- boolean rv = false;
- try {
- ByteBufferInputStream.byteBuffer2Record(request, record);
- request.rewind();
- rv = true;
- } catch (IOException ex) {
- }
-
- return rv;
+ return err == KeeperException.Code.OK.intValue();
}
public int getOutstandingHandshakeNum() {
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index a44ebc3f7b8..1f629bed73d 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +18,10 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.management.JMException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
@@ -33,11 +36,6 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import javax.management.JMException;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
/**
*
* Just like the standard ZooKeeperServer. We just replace the request
@@ -177,7 +175,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
this,
getZKDatabase().getSessionWithTimeOuts(),
tickTime,
- self.getId(),
+ self.getMyId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -293,7 +291,7 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getId();
+ return self.getMyId();
}
@Override
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 1f5f2a0b225..8d8b6dabce8 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -134,7 +133,7 @@ public class Learner {
LOG.info("{} = {}", LEARNER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
}
- final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
+ final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<>();
public int getPendingRevalidationsCount() {
return pendingRevalidations.size();
@@ -255,13 +254,9 @@ public class Learner {
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
- if (request.request != null) {
- request.request.rewind();
- int len = request.request.remaining();
- byte[] b = new byte[len];
- request.request.get(b);
- request.request.rewind();
- oa.write(b);
+ byte[] payload = request.readRequestBytes();
+ if (payload != null) {
+ oa.write(payload);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
@@ -496,7 +491,7 @@ public class Learner {
/*
* Add sid to payload
*/
- LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
+ LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
@@ -548,36 +543,140 @@ public class Learner {
* @throws InterruptedException
*/
protected void syncWithLeader(long newLeaderZxid) throws Exception {
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
- QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
-
QuorumVerifier newLeaderQV = null;
- // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
- // For SNAP and TRUNC the snapshot is needed to save that history
- boolean snapshotNeeded = true;
- boolean syncSnapshot = false;
+ class SyncHelper {
+
+ // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot.
+ // For SNAP and TRUNC the snapshot is needed to save that history.
+ boolean willSnapshot = true;
+ boolean syncSnapshot = false;
+
+ // PROPOSALs received during sync, for matching up with COMMITs.
+ Deque<PacketInFlight> proposals = new ArrayDeque<>();
+
+ // PROPOSALs we delay forwarding to the ZK server until sync is done.
+ Deque<PacketInFlight> delayedProposals = new ArrayDeque<>();
+
+ // COMMITs we delay forwarding to the ZK server until sync is done.
+ Deque<Long> delayedCommits = new ArrayDeque<>();
+
+ void syncSnapshot() {
+ syncSnapshot = true;
+ }
+
+ void noSnapshot() {
+ willSnapshot = false;
+ }
+
+ void propose(PacketInFlight pif) {
+ proposals.add(pif);
+ delayedProposals.add(pif);
+ }
+
+ PacketInFlight nextProposal() {
+ return proposals.peekFirst();
+ }
+
+ void commit() {
+ PacketInFlight packet = proposals.remove();
+ if (willSnapshot) {
+ zk.processTxn(packet.hdr, packet.rec);
+ delayedProposals.remove();
+ } else {
+ delayedCommits.add(packet.hdr.getZxid());
+ }
+ }
+
+ void writeState() throws IOException, InterruptedException {
+ // Ensure all received transaction PROPOSALs are written before we ACK the NEWLEADER,
+ // since this allows the leader to apply those transactions to its served state:
+ if (willSnapshot) {
+ zk.takeSnapshot(syncSnapshot); // either, the snapshot contains the transactions,
+ willSnapshot = false; // but anything after this needs to go to the transaction log; or
+ }
+
+ self.setCurrentEpoch(newEpoch);
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
+
+ // if we're a follower, we need to ensure the transactions are safely logged before ACK'ing.
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ // The leader expects the NEWLEADER ACK to precede all the PROPOSAL ACKs, so we only write them first.
+ fzk.syncProcessor.setDelayForwarding(true);
+ for (PacketInFlight p : delayedProposals) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ delayedProposals.clear();
+ fzk.syncProcessor.syncFlush();
+ }
+ }
+
+ void flushAcks() throws InterruptedException {
+ if (zk instanceof FollowerZooKeeperServer) {
+ // The NEWLEADER is ACK'ed, and we can now ACK the PROPOSALs we wrote in writeState.
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ fzk.syncProcessor.setDelayForwarding(false);
+ fzk.syncProcessor.syncFlush(); // Ensure these are all ACK'ed before the UPTODATE ACK.
+ }
+ }
+
+ void applyDelayedPackets() {
+ // Any delayed packets must now be applied: all PROPOSALs first, then any COMMITs.
+ if (zk instanceof FollowerZooKeeperServer) {
+ FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
+ for (PacketInFlight p : delayedProposals) {
+ fzk.logRequest(p.hdr, p.rec, p.digest);
+ }
+ for (Long zxid : delayedCommits) {
+ fzk.commit(zxid);
+ }
+ } else if (zk instanceof ObserverZooKeeperServer) {
+ ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
+ for (PacketInFlight p : delayedProposals) {
+ Long zxid = delayedCommits.peekFirst();
+ if (p.hdr.getZxid() != zxid) {
+ // log warning message if there is no matching commit
+ // old leader send outstanding proposal to observer
+ LOG.warn(
+ "Committing 0x{}, but next proposal is 0x{}",
+ Long.toHexString(zxid),
+ Long.toHexString(p.hdr.getZxid()));
+ continue;
+ }
+ delayedCommits.remove();
+ Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
+ request.setTxnDigest(p.digest);
+ ozk.commitRequest(request);
+ }
+ } else {
+ // New server type need to handle in-flight packets
+ throw new UnsupportedOperationException("Unknown server type");
+ }
+ }
+
+ }
+
+ SyncHelper helper = new SyncHelper();
+ QuorumPacket qp = new QuorumPacket();
readPacket(qp);
- Deque<Long> packetsCommitted = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
- Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
if (zk.shouldForceWriteInitialSnapshotAfterLeaderElection()) {
LOG.info("Forcing a snapshot write as part of upgrading from an older Zookeeper. This should only happen while upgrading.");
- snapshotNeeded = true;
- syncSnapshot = true;
+ helper.syncSnapshot();
} else {
- snapshotNeeded = false;
+ helper.noSnapshot();
}
} else if (qp.getType() == Leader.SNAP) {
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
- // db is clear as part of deserializeSnapshot()
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
@@ -593,20 +692,18 @@ public class Learner {
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- // immediately persist the latest snapshot when there is txn log gap
- syncSnapshot = true;
+ // Immediately persist the latest snapshot when there is txn log gap
+ helper.syncSnapshot();
} else if (qp.getType() == Leader.TRUNC) {
- //we need to truncate the log to the lastzxid of the leader
+ // We need to truncate the log to the lastZxid of the leader
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
- // not able to truncate the log
LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-
} else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
@@ -614,17 +711,10 @@ public class Learner {
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
- long lastQueued = 0;
- // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
- // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
- // we need to make sure that we don't take the snapshot twice.
- boolean isPreZAB1_0 = true;
- //If we are not going to take the snapshot be sure the transactions are not applied in memory
- // but written out to the transaction log
- boolean writeToTxnLog = !snapshotNeeded;
- TxnLogEntry logEntry;
// we are now going to start getting transactions to apply followed by an UPTODATE
+ long lastQueued = 0;
+ TxnLogEntry logEntry;
outerLoop:
while (self.isRunning()) {
readPacket(qp);
@@ -648,13 +738,11 @@ public class Learner {
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData(), UTF_8));
self.setLastSeenQuorumVerifier(qv, true);
}
-
- packetsNotLogged.add(pif);
- packetsNotCommitted.add(pif);
+ helper.propose(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
- pif = packetsNotCommitted.peekFirst();
+ pif = helper.nextProposal();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
@@ -662,43 +750,24 @@ public class Learner {
Long.toHexString(pif.hdr.getZxid()));
} else {
if (qp.getType() == Leader.COMMITANDACTIVATE) {
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(
- qv,
- ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
- true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
- }
- if (!writeToTxnLog) {
- zk.processTxn(pif.hdr, pif.rec);
- packetsNotLogged.remove();
- packetsNotCommitted.remove();
- } else {
- packetsNotCommitted.remove();
- packetsCommitted.add(qp.getZxid());
+ tryReconfig(pif, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid());
}
+ helper.commit();
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
-
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
- byte[] remainingdata = new byte[buffer.remaining()];
- buffer.get(remainingdata);
- logEntry = SerializeUtils.deserializeTxn(remainingdata);
+ byte[] remainingData = new byte[buffer.remaining()];
+ buffer.get(remainingData);
+ logEntry = SerializeUtils.deserializeTxn(remainingData);
packet.hdr = logEntry.getHeader();
packet.rec = logEntry.getTxn();
packet.digest = logEntry.getDigest();
- QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData(), UTF_8));
- boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
- if (majorChange) {
- throw new Exception("changes proposed in reconfig");
- }
+ tryReconfig(packet, suggestedLeaderId, qp.getZxid());
} else {
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packet.rec = logEntry.getTxn();
@@ -713,14 +782,8 @@ public class Learner {
}
lastQueued = packet.hdr.getZxid();
}
- if (!writeToTxnLog) {
- // Apply to db directly if we haven't taken the snapshot
- zk.processTxn(packet.hdr, packet.rec);
- } else {
- packetsNotLogged.add(packet);
- packetsCommitted.add(qp.getZxid());
- }
-
+ helper.propose(packet);
+ helper.commit();
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
@@ -730,15 +793,11 @@ public class Learner {
throw new Exception("changes proposed in reconfig");
}
}
- if (isPreZAB1_0) {
- zk.takeSnapshot(syncSnapshot);
- self.setCurrentEpoch(newEpoch);
- }
+ helper.flushAcks();
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
- // means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData() != null && qp.getData().length > 1) {
try {
@@ -750,40 +809,13 @@ public class Learner {
}
}
- if (snapshotNeeded) {
- zk.takeSnapshot(syncSnapshot);
- }
-
- self.setCurrentEpoch(newEpoch);
- writeToTxnLog = true;
- //Anything after this needs to go to the transaction log, not applied directly in memory
- isPreZAB1_0 = false;
-
- // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(true);
- for (PacketInFlight p : packetsNotLogged) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- packetsNotLogged.clear();
- fzk.syncProcessor.syncFlush();
- }
-
+ helper.writeState();
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
-
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- fzk.syncProcessor.setDelayForwarding(false);
- fzk.syncProcessor.syncFlush();
- }
break;
}
}
}
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
zk.startServing();
@@ -796,40 +828,14 @@ public class Learner {
*/
self.updateElectionVote(newEpoch);
- // We need to log the stuff that came in between the snapshot and the uptodate
- if (zk instanceof FollowerZooKeeperServer) {
- FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
- fzk.logRequest(p.hdr, p.rec, p.digest);
- }
- for (Long zxid : packetsCommitted) {
- fzk.commit(zxid);
- }
- } else if (zk instanceof ObserverZooKeeperServer) {
- // Similar to follower, we need to log requests between the snapshot
- // and UPTODATE
- ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotLogged) {
- Long zxid = packetsCommitted.peekFirst();
- if (p.hdr.getZxid() != zxid) {
- // log warning message if there is no matching commit
- // old leader send outstanding proposal to observer
- LOG.warn(
- "Committing 0x{}, but next proposal is 0x{}",
- Long.toHexString(zxid),
- Long.toHexString(p.hdr.getZxid()));
- continue;
- }
- packetsCommitted.remove();
- Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
- request.setTxn(p.rec);
- request.setHdr(p.hdr);
- request.setTxnDigest(p.digest);
- ozk.commitRequest(request);
- }
- } else {
- // New server type need to handle in-flight packets
- throw new UnsupportedOperationException("Unknown server type");
+ helper.applyDelayedPackets();
+ }
+
+ private void tryReconfig(PacketInFlight pif, long newLeader, long zxid) throws Exception {
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
+ boolean majorChange = self.processReconfig(qv, newLeader, zxid, true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
}
}
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
index c1dc5cf2b8c..8ea94fd4daf 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +18,9 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.ServerCnxn;
@@ -27,10 +29,6 @@ import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-
/**
* Parent class for all ZooKeeperServers for Learners
*/
@@ -73,7 +71,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getId();
+ return self.getMyId();
}
@Override
@@ -82,7 +80,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
this,
getZKDatabase().getSessionWithTimeOuts(),
this.tickTime,
- self.getId(),
+ self.getMyId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -157,8 +155,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- }
- else {
+ } else {
LOG.info("Shutting down");
try {
if (syncProcessor != null) {
@@ -170,8 +167,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
// that contains entries we have already written to our transaction log.
syncProcessor.shutdown();
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
}
}
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
index 37ca16ed52b..1a44a98e6e7 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +18,9 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiConsumer;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -28,10 +30,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiConsumer;
-
/**
* A ZooKeeperServer for the Observer node type. Not much is different, but
* we anticipate specializing the request processors in the future.
@@ -50,7 +48,7 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
/*
* Pending sync requests
- */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
+ */ ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<>();
ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, self.clientPortListenBacklog, zkDb, self);
@@ -109,9 +107,6 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
- else {
- syncProcessor = null;
- }
}
/*
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index f6fc87d7716..f6fc87d7716 100644
--- a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
index b74ca0d716b..a96a395b03b 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +18,10 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
@@ -32,11 +35,6 @@ import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
/**
* A ZooKeeperServer which comes into play when peer is partitioned from the
* majority. Handles read-only clients, but drops connections from not-read-only
@@ -90,7 +88,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(
this, getZKDatabase().getSessionWithTimeOuts(),
- this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
+ this.tickTime, self.getMyId(), self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
@@ -188,16 +186,14 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
*/
@Override
public long getServerId() {
- return self.getId();
+ return self.getMyId();
}
@Override
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
- super.shutdown(fullyShutDown);
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
- }
- else {
+ } else {
shutdown = true;
unregisterJMX(this);
diff --git a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
index ec4c326e9aa..d65ead216f0 100644
--- a/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
+++ b/zookeeper-server/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -1,4 +1,3 @@
-// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,6 +18,9 @@
package org.apache.zookeeper.server.quorum;
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.Socket;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -26,15 +28,11 @@ import org.apache.zookeeper.server.ServerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Flushable;
-import java.io.IOException;
-import java.net.Socket;
-
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
- Learner learner;
+ final Learner learner;
SendAckRequestProcessor(Learner peer) {
this.learner = peer;