aboutsummaryrefslogtreecommitdiffstats
path: root/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java')
-rw-r--r--zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java353
1 files changed, 0 insertions, 353 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
deleted file mode 100644
index cf7f4c44015..00000000000
--- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zookeeper.server;
-
-import java.io.Flushable;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import org.apache.zookeeper.common.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This RequestProcessor logs requests to disk. It batches the requests to do
- * the io efficiently. The request is not passed to the next RequestProcessor
- * until its log has been synced to disk.
- *
- * SyncRequestProcessor is used in 3 different cases
- * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which
- * send ack back to itself.
- * 2. Follower - Sync request to disk and forward request to
- * SendAckRequestProcessor which send the packets to leader.
- * SendAckRequestProcessor is flushable which allow us to force
- * push packets to leader.
- * 3. Observer - Sync committed request to disk (received as INFORM packet).
- * It never send ack back to the leader, so the nextProcessor will
- * be null. This change the semantic of txnlog on the observer
- * since it only contains committed txns.
- */
-public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
-
- private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
-
- private static class FlushRequest extends Request {
- private final CountDownLatch latch = new CountDownLatch(1);
- public FlushRequest() {
- super(null, 0, 0, 0, null, null);
- }
- }
-
- private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null);
- private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null);
-
- private static class DelayingProcessor implements RequestProcessor, Flushable {
- private final RequestProcessor next;
- private Queue<Request> delayed = null;
- private DelayingProcessor(RequestProcessor next) {
- this.next = next;
- }
- @Override
- public void flush() throws IOException {
- if (delayed == null && next instanceof Flushable) {
- ((Flushable) next).flush();
- }
- }
- @Override
- public void processRequest(Request request) throws RequestProcessorException {
- if (delayed == null) {
- next.processRequest(request);
- } else {
- delayed.add(request);
- }
- }
- @Override
- public void shutdown() {
- next.shutdown();
- }
- private void startDelaying() {
- if (delayed == null) {
- delayed = new ArrayDeque<>();
- }
- }
- private void flushAndStopDelaying() throws RequestProcessorException {
- if (delayed != null) {
- for (Request request : delayed) {
- next.processRequest(request);
- }
- delayed = null;
- }
- }
- }
-
- /** The number of log entries to log before starting a snapshot */
- private static int snapCount = ZooKeeperServer.getSnapCount();
-
- /**
- * The total size of log entries before starting a snapshot
- */
- private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes();
-
- /**
- * Random numbers used to vary snapshot timing
- */
- private int randRoll;
- private long randSize;
-
- private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>();
-
- private final Semaphore snapThreadMutex = new Semaphore(1);
-
- private final ZooKeeperServer zks;
-
- private final DelayingProcessor nextProcessor;
-
- /**
- * Transactions that have been written and are waiting to be flushed to
- * disk. Basically this is the list of SyncItems whose callbacks will be
- * invoked after flush returns successfully.
- */
- private final Queue<Request> toFlush;
- private long lastFlushTime;
-
- public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
- super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener());
- this.zks = zks;
- this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor);
- this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
- }
-
- /**
- * used by tests to check for changing
- * snapcounts
- * @param count
- */
- public static void setSnapCount(int count) {
- snapCount = count;
- }
-
- /**
- * used by tests to get the snapcount
- * @return the snapcount
- */
- public static int getSnapCount() {
- return snapCount;
- }
-
- private long getRemainingDelay() {
- long flushDelay = zks.getFlushDelay();
- long duration = Time.currentElapsedTime() - lastFlushTime;
- if (duration < flushDelay) {
- return flushDelay - duration;
- }
- return 0;
- }
-
- /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush
- * whenever either condition is hit. If only one or the other is
- * set, flush only when the relevant condition is hit.
- */
- private boolean shouldFlush() {
- long flushDelay = zks.getFlushDelay();
- long maxBatchSize = zks.getMaxBatchSize();
- if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
- return true;
- }
- return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
- }
-
- /**
- * used by tests to check for changing
- * snapcounts
- * @param size
- */
- public static void setSnapSizeInBytes(long size) {
- snapSizeInBytes = size;
- }
-
- private boolean shouldSnapshot() {
- int logCount = zks.getZKDatabase().getTxnCount();
- long logSize = zks.getZKDatabase().getTxnSize();
- return (logCount > (snapCount / 2 + randRoll))
- || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));
- }
-
- private void resetSnapshotStats() {
- randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
- randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
- }
-
- @Override
- public void run() {
- try {
- // we do this in an attempt to ensure that not all of the servers
- // in the ensemble take a snapshot at the same time
- resetSnapshotStats();
- lastFlushTime = Time.currentElapsedTime();
- while (true) {
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
-
- long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
- Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
- if (si == null) {
- /* We timed out looking for more writes to batch, go ahead and flush immediately */
- flush();
- si = queuedRequests.take();
- }
-
- if (si == REQUEST_OF_DEATH) {
- break;
- }
-
- if (si == TURN_FORWARDING_DELAY_ON_REQUEST) {
- nextProcessor.startDelaying();
- continue;
- }
- if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) {
- nextProcessor.flushAndStopDelaying();
- continue;
- }
-
- if (si instanceof FlushRequest) {
- flush();
- ((FlushRequest) si).latch.countDown();
- continue;
- }
-
- long startProcessTime = Time.currentElapsedTime();
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
-
- // track the number of records written to the log
- if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
- if (shouldSnapshot()) {
- resetSnapshotStats();
- // roll the log
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (!snapThreadMutex.tryAcquire()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- new ZooKeeperThread("Snapshot Thread") {
- public void run() {
- try {
- zks.takeSnapshot();
- } catch (Exception e) {
- LOG.warn("Unexpected exception", e);
- } finally {
- snapThreadMutex.release();
- }
- }
- }.start();
- }
- }
- } else if (toFlush.isEmpty()) {
- // optimization for read heavy workloads
- // iff this is a read or a throttled request(which doesn't need to be written to the disk),
- // and there are no pending flushes (writes), then just pass this to the next processor
- if (nextProcessor != null) {
- nextProcessor.processRequest(si);
- nextProcessor.flush();
- }
- continue;
- }
- toFlush.add(si);
- if (shouldFlush()) {
- flush();
- }
- ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
- }
- } catch (Throwable t) {
- handleException(this.getName(), t);
- }
- LOG.info("SyncRequestProcessor exited!");
- }
-
- /** Flushes all pending writes, and waits for this to complete. */
- public void syncFlush() throws InterruptedException {
- FlushRequest marker = new FlushRequest();
- queuedRequests.add(marker);
- marker.latch.await();
- }
-
- public void setDelayForwarding(boolean delayForwarding) {
- queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST);
- }
-
- private void flush() throws IOException, RequestProcessorException {
- if (this.toFlush.isEmpty()) {
- return;
- }
-
- ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
-
- long flushStartTime = Time.currentElapsedTime();
- zks.getZKDatabase().commit();
- ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
-
- if (this.nextProcessor == null) {
- this.toFlush.clear();
- } else {
- while (!this.toFlush.isEmpty()) {
- final Request i = this.toFlush.remove();
- long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
- this.nextProcessor.processRequest(i);
- }
- nextProcessor.flush();
- }
- lastFlushTime = Time.currentElapsedTime();
- }
-
- public void shutdown() {
- LOG.info("Shutting down");
- queuedRequests.add(REQUEST_OF_DEATH);
- try {
- this.join();
- this.flush();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while wating for {} to finish", this);
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- LOG.warn("Got IO exception during shutdown");
- } catch (RequestProcessorException e) {
- LOG.warn("Got request processor exception during shutdown");
- }
- if (nextProcessor != null) {
- nextProcessor.shutdown();
- }
- }
-
- public void processRequest(final Request request) {
- Objects.requireNonNull(request, "Request cannot be null");
-
- request.syncQueueStartTime = Time.currentElapsedTime();
- queuedRequests.add(request);
- ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
- }
-
-}