summaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java')
-rw-r--r--zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java83
1 files changed, 83 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java b/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
new file mode 100644
index 00000000000..d65ead216f0
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.9.1/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.Socket;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SendAckRequestProcessor implements RequestProcessor, Flushable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SendAckRequestProcessor.class);
+
+ final Learner learner;
+
+ SendAckRequestProcessor(Learner peer) {
+ this.learner = peer;
+ }
+
+ public void processRequest(Request si) {
+ if (si.type != OpCode.sync) {
+ QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
+ try {
+ si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
+
+ learner.writePacket(qp, false);
+ } catch (IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ if (!learner.sock.isClosed()) {
+ learner.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
+ }
+ }
+
+ public void flush() throws IOException {
+ try {
+ learner.writePacket(null, true);
+ } catch (IOException e) {
+ LOG.warn("Closing connection to leader, exception during packet send", e);
+ try {
+ Socket socket = learner.sock;
+ if (socket != null && !socket.isClosed()) {
+ learner.sock.close();
+ }
+ } catch (IOException e1) {
+ // Nothing to do, we are shutting things down, so an exception here is irrelevant
+ LOG.debug("Ignoring error closing the connection", e1);
+ }
+ }
+ }
+
+ public void shutdown() {
+ // Nothing needed
+ }
+
+}