aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java')
-rw-r--r--zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java353
1 files changed, 353 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
new file mode 100644
index 00000000000..e03e0b07944
--- /dev/null
+++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This RequestProcessor logs requests to disk. It batches the requests to do
+ * the io efficiently. The request is not passed to the next RequestProcessor
+ * until its log has been synced to disk.
+ *
+ * SyncRequestProcessor is used in 3 different cases
+ * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
+ * send ack back to itself.
+ * 2. Follower - Sync request to disk and forward request to
+ * SendAckRequestProcessor which send the packets to leader.
+ * SendAckRequestProcessor is flushable which allow us to force
+ * push packets to leader.
+ * 3. Observer - Sync committed request to disk (received as INFORM packet).
+ * It never send ack back to the leader, so the nextProcessor will
+ * be null. This change the semantic of txnlog on the observer
+ * since it only contains committed txns.
+ */
+public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
+
+ private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
+
+ private static class FlushRequest extends Request {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ public FlushRequest() {
+ super(null, 0, 0, 0, null, null);
+ }
+ }
+
+ private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null);
+ private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null);
+
+ private static class DelayingProcessor implements RequestProcessor, Flushable {
+ private final RequestProcessor next;
+ private Queue<Request> delayed = null;
+ private DelayingProcessor(RequestProcessor next) {
+ this.next = next;
+ }
+ @Override
+ public void flush() throws IOException {
+ if (delayed == null && next instanceof Flushable) {
+ ((Flushable) next).flush();
+ }
+ }
+ @Override
+ public void processRequest(Request request) throws RequestProcessorException {
+ if (delayed == null) {
+ next.processRequest(request);
+ } else {
+ delayed.add(request);
+ }
+ }
+ @Override
+ public void shutdown() {
+ next.shutdown();
+ }
+ private void close() {
+ if (delayed == null) {
+ delayed = new ArrayDeque<>();
+ }
+ }
+ private void open() throws RequestProcessorException {
+ if (delayed != null) {
+ for (Request request : delayed) {
+ next.processRequest(request);
+ }
+ delayed = null;
+ }
+ }
+ }
+
+ /** The number of log entries to log before starting a snapshot */
+ private static int snapCount = ZooKeeperServer.getSnapCount();
+
+ /**
+ * The total size of log entries before starting a snapshot
+ */
+ private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
+
+ /**
+ * Random numbers used to vary snapshot timing
+ */
+ private int randRoll;
+ private long randSize;
+
+ private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+
+ private final Semaphore snapThreadMutex = new Semaphore(1);
+
+ private final ZooKeeperServer zks;
+
+ private final DelayingProcessor nextProcessor;
+
+ /**
+ * Transactions that have been written and are waiting to be flushed to
+ * disk. Basically this is the list of SyncItems whose callbacks will be
+ * invoked after flush returns successfully.
+ */
+ private final Queue<Request> toFlush;
+ private long lastFlushTime;
+
+ public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
+ this.zks = zks;
+ this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
+ this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param count
+ */
+ public static void setSnapCount(int count) {
+ snapCount = count;
+ }
+
+ /**
+ * used by tests to get the snapcount
+ * @return the snapcount
+ */
+ public static int getSnapCount() {
+ return snapCount;
+ }
+
+ private long getRemainingDelay() {
+ long flushDelay = zks.getFlushDelay();
+ long duration = Time.currentElapsedTime() - lastFlushTime;
+ if (duration < flushDelay) {
+ return flushDelay - duration;
+ }
+ return 0;
+ }
+
+ /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
+ * whenever either condition is hit. If only one or the other is
+ * set, flush only when the relevant condition is hit.
+ */
+ private boolean shouldFlush() {
+ long flushDelay = zks.getFlushDelay();
+ long maxBatchSize = zks.getMaxBatchSize();
+ if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
+ return true;
+ }
+ return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
+ }
+
+ /**
+ * used by tests to check for changing
+ * snapcounts
+ * @param size
+ */
+ public static void setSnapSizeInBytes(long size) {
+ snapSizeInBytes = size;
+ }
+
+ private boolean shouldSnapshot() {
+ int logCount = zks.getZKDatabase().getTxnCount();
+ long logSize = zks.getZKDatabase().getTxnSize();
+ return (logCount > (snapCount / 2 + randRoll))
+ || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
+ }
+
+ private void resetSnapshotStats() {
+ randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
+ randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
+ }
+
+ @Override
+ public void run() {
+ try {
+ // we do this in an attempt to ensure that not all of the servers
+ // in the ensemble take a snapshot at the same time
+ resetSnapshotStats();
+ lastFlushTime = Time.currentElapsedTime();
+ while (true) {
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
+ long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
+ Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
+ if (si == null) {
+ /* We timed out looking for more writes to batch, go ahead and flush immediately */
+ flush();
+ si = queuedRequests.take();
+ }
+
+ if (si == REQUEST_OF_DEATH) {
+ break;
+ }
+
+ if (si == turnForwardingDelayOn) {
+ nextProcessor.close();
+ continue;
+ }
+ if (si == turnForwardingDelayOff) {
+ nextProcessor.open();
+ continue;
+ }
+
+ if (si instanceof FlushRequest) {
+ flush();
+ ((FlushRequest) si).latch.countDown();
+ continue;
+ }
+
+ long startProcessTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
+
+ // track the number of records written to the log
+ if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
+ if (shouldSnapshot()) {
+ resetSnapshotStats();
+ // roll the log
+ zks.getZKDatabase().rollLog();
+ // take a snapshot
+ if (!snapThreadMutex.tryAcquire()) {
+ LOG.warn("Too busy to snap, skipping");
+ } else {
+ new ZooKeeperThread("Snapshot Thread") {
+ public void run() {
+ try {
+ zks.takeSnapshot();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception", e);
+ } finally {
+ snapThreadMutex.release();
+ }
+ }
+ }.start();
+ }
+ }
+ } else if (toFlush.isEmpty()) {
+ // optimization for read heavy workloads
+ // iff this is a read or a throttled request(which doesn't need to be written to the disk),
+ // and there are no pending flushes (writes), then just pass this to the next processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(si);
+ nextProcessor.flush();
+ }
+ continue;
+ }
+ toFlush.add(si);
+ if (shouldFlush()) {
+ flush();
+ }
+ ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
+ }
+ } catch (Throwable t) {
+ handleException(this.getName(), t);
+ }
+ LOG.info("SyncRequestProcessor exited!");
+ }
+
+ /** Flushes all pending writes, and waits for this to complete. */
+ public void syncFlush() throws InterruptedException {
+ FlushRequest marker = new FlushRequest();
+ queuedRequests.add(marker);
+ marker.latch.await();
+ }
+
+ public void setDelayForwarding(boolean delayForwarding) {
+ queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff);
+ }
+
+ private void flush() throws IOException, RequestProcessorException {
+ if (this.toFlush.isEmpty()) {
+ return;
+ }
+
+ ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+ long flushStartTime = Time.currentElapsedTime();
+ zks.getZKDatabase().commit();
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
+
+ if (this.nextProcessor == null) {
+ this.toFlush.clear();
+ } else {
+ while (!this.toFlush.isEmpty()) {
+ final Request i = this.toFlush.remove();
+ long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
+ this.nextProcessor.processRequest(i);
+ }
+ nextProcessor.flush();
+ }
+ lastFlushTime = Time.currentElapsedTime();
+ }
+
+ public void shutdown() {
+ LOG.info("Shutting down");
+ queuedRequests.add(REQUEST_OF_DEATH);
+ try {
+ this.join();
+ this.flush();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while wating for {} to finish", this);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ LOG.warn("Got IO exception during shutdown");
+ } catch (RequestProcessorException e) {
+ LOG.warn("Got request processor exception during shutdown");
+ }
+ if (nextProcessor != null) {
+ nextProcessor.shutdown();
+ }
+ }
+
+ public void processRequest(final Request request) {
+ Objects.requireNonNull(request, "Request cannot be null");
+
+ request.syncQueueStartTime = Time.currentElapsedTime();
+ queuedRequests.add(request);
+ ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
+ }
+
+}