diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java')
-rw-r--r-- | zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java | 353 |
1 files changed, 0 insertions, 353 deletions
diff --git a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java deleted file mode 100644 index cf7f4c44015..00000000000 --- a/zookeeper-server/zookeeper-server-3.9.2/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zookeeper.server; - -import java.io.Flushable; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import org.apache.zookeeper.common.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This RequestProcessor logs requests to disk. It batches the requests to do - * the io efficiently. The request is not passed to the next RequestProcessor - * until its log has been synced to disk. - * - * SyncRequestProcessor is used in 3 different cases - * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which - * send ack back to itself. - * 2. Follower - Sync request to disk and forward request to - * SendAckRequestProcessor which send the packets to leader. - * SendAckRequestProcessor is flushable which allow us to force - * push packets to leader. - * 3. Observer - Sync committed request to disk (received as INFORM packet). - * It never send ack back to the leader, so the nextProcessor will - * be null. This change the semantic of txnlog on the observer - * since it only contains committed txns. - */ -public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); - - private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; - - private static class FlushRequest extends Request { - private final CountDownLatch latch = new CountDownLatch(1); - public FlushRequest() { - super(null, 0, 0, 0, null, null); - } - } - - private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); - private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); - - private static class DelayingProcessor implements RequestProcessor, Flushable { - private final RequestProcessor next; - private Queue<Request> delayed = null; - private DelayingProcessor(RequestProcessor next) { - this.next = next; - } - @Override - public void flush() throws IOException { - if (delayed == null && next instanceof Flushable) { - ((Flushable) next).flush(); - } - } - @Override - public void processRequest(Request request) throws RequestProcessorException { - if (delayed == null) { - next.processRequest(request); - } else { - delayed.add(request); - } - } - @Override - public void shutdown() { - next.shutdown(); - } - private void startDelaying() { - if (delayed == null) { - delayed = new ArrayDeque<>(); - } - } - private void flushAndStopDelaying() throws RequestProcessorException { - if (delayed != null) { - for (Request request : delayed) { - next.processRequest(request); - } - delayed = null; - } - } - } - - /** The number of log entries to log before starting a snapshot */ - private static int snapCount = ZooKeeperServer.getSnapCount(); - - /** - * The total size of log entries before starting a snapshot - */ - private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes(); - - /** - * Random numbers used to vary snapshot timing - */ - private int randRoll; - private long randSize; - - private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<>(); - - private final Semaphore snapThreadMutex = new Semaphore(1); - - private final ZooKeeperServer zks; - - private final DelayingProcessor nextProcessor; - - /** - * Transactions that have been written and are waiting to be flushed to - * disk. Basically this is the list of SyncItems whose callbacks will be - * invoked after flush returns successfully. - */ - private final Queue<Request> toFlush; - private long lastFlushTime; - - public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { - super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); - this.zks = zks; - this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); - this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); - } - - /** - * used by tests to check for changing - * snapcounts - * @param count - */ - public static void setSnapCount(int count) { - snapCount = count; - } - - /** - * used by tests to get the snapcount - * @return the snapcount - */ - public static int getSnapCount() { - return snapCount; - } - - private long getRemainingDelay() { - long flushDelay = zks.getFlushDelay(); - long duration = Time.currentElapsedTime() - lastFlushTime; - if (duration < flushDelay) { - return flushDelay - duration; - } - return 0; - } - - /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush - * whenever either condition is hit. If only one or the other is - * set, flush only when the relevant condition is hit. - */ - private boolean shouldFlush() { - long flushDelay = zks.getFlushDelay(); - long maxBatchSize = zks.getMaxBatchSize(); - if ((flushDelay > 0) && (getRemainingDelay() == 0)) { - return true; - } - return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize); - } - - /** - * used by tests to check for changing - * snapcounts - * @param size - */ - public static void setSnapSizeInBytes(long size) { - snapSizeInBytes = size; - } - - private boolean shouldSnapshot() { - int logCount = zks.getZKDatabase().getTxnCount(); - long logSize = zks.getZKDatabase().getTxnSize(); - return (logCount > (snapCount / 2 + randRoll)) - || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); - } - - private void resetSnapshotStats() { - randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); - randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); - } - - @Override - public void run() { - try { - // we do this in an attempt to ensure that not all of the servers - // in the ensemble take a snapshot at the same time - resetSnapshotStats(); - lastFlushTime = Time.currentElapsedTime(); - while (true) { - ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); - - long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); - Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); - if (si == null) { - /* We timed out looking for more writes to batch, go ahead and flush immediately */ - flush(); - si = queuedRequests.take(); - } - - if (si == REQUEST_OF_DEATH) { - break; - } - - if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { - nextProcessor.startDelaying(); - continue; - } - if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { - nextProcessor.flushAndStopDelaying(); - continue; - } - - if (si instanceof FlushRequest) { - flush(); - ((FlushRequest) si).latch.countDown(); - continue; - } - - long startProcessTime = Time.currentElapsedTime(); - ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); - - // track the number of records written to the log - if (!si.isThrottled() && zks.getZKDatabase().append(si)) { - if (shouldSnapshot()) { - resetSnapshotStats(); - // roll the log - zks.getZKDatabase().rollLog(); - // take a snapshot - if (!snapThreadMutex.tryAcquire()) { - LOG.warn("Too busy to snap, skipping"); - } else { - new ZooKeeperThread("Snapshot Thread") { - public void run() { - try { - zks.takeSnapshot(); - } catch (Exception e) { - LOG.warn("Unexpected exception", e); - } finally { - snapThreadMutex.release(); - } - } - }.start(); - } - } - } else if (toFlush.isEmpty()) { - // optimization for read heavy workloads - // iff this is a read or a throttled request(which doesn't need to be written to the disk), - // and there are no pending flushes (writes), then just pass this to the next processor - if (nextProcessor != null) { - nextProcessor.processRequest(si); - nextProcessor.flush(); - } - continue; - } - toFlush.add(si); - if (shouldFlush()) { - flush(); - } - ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); - } - } catch (Throwable t) { - handleException(this.getName(), t); - } - LOG.info("SyncRequestProcessor exited!"); - } - - /** Flushes all pending writes, and waits for this to complete. */ - public void syncFlush() throws InterruptedException { - FlushRequest marker = new FlushRequest(); - queuedRequests.add(marker); - marker.latch.await(); - } - - public void setDelayForwarding(boolean delayForwarding) { - queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); - } - - private void flush() throws IOException, RequestProcessorException { - if (this.toFlush.isEmpty()) { - return; - } - - ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); - - long flushStartTime = Time.currentElapsedTime(); - zks.getZKDatabase().commit(); - ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); - - if (this.nextProcessor == null) { - this.toFlush.clear(); - } else { - while (!this.toFlush.isEmpty()) { - final Request i = this.toFlush.remove(); - long latency = Time.currentElapsedTime() - i.syncQueueStartTime; - ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); - this.nextProcessor.processRequest(i); - } - nextProcessor.flush(); - } - lastFlushTime = Time.currentElapsedTime(); - } - - public void shutdown() { - LOG.info("Shutting down"); - queuedRequests.add(REQUEST_OF_DEATH); - try { - this.join(); - this.flush(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while wating for {} to finish", this); - Thread.currentThread().interrupt(); - } catch (IOException e) { - LOG.warn("Got IO exception during shutdown"); - } catch (RequestProcessorException e) { - LOG.warn("Got request processor exception during shutdown"); - } - if (nextProcessor != null) { - nextProcessor.shutdown(); - } - } - - public void processRequest(final Request request) { - Objects.requireNonNull(request, "Request cannot be null"); - - request.syncQueueStartTime = Time.currentElapsedTime(); - queuedRequests.add(request); - ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); - } - -} |