diff options
Diffstat (limited to 'zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java')
-rw-r--r-- | zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java new file mode 100644 index 00000000000..e03e0b07944 --- /dev/null +++ b/zookeeper-server/zookeeper-server-3.8.1/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.Flushable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This RequestProcessor logs requests to disk. It batches the requests to do + * the io efficiently. The request is not passed to the next RequestProcessor + * until its log has been synced to disk. + * + * SyncRequestProcessor is used in 3 different cases + * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which + * send ack back to itself. + * 2. Follower - Sync request to disk and forward request to + * SendAckRequestProcessor which send the packets to leader. + * SendAckRequestProcessor is flushable which allow us to force + * push packets to leader. + * 3. Observer - Sync committed request to disk (received as INFORM packet). + * It never send ack back to the leader, so the nextProcessor will + * be null. This change the semantic of txnlog on the observer + * since it only contains committed txns. + */ +public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); + + private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; + + private static class FlushRequest extends Request { + private final CountDownLatch latch = new CountDownLatch(1); + public FlushRequest() { + super(null, 0, 0, 0, null, null); + } + } + + private static final Request turnForwardingDelayOn = new Request(null, 0, 0, 0, null, null); + private static final Request turnForwardingDelayOff = new Request(null, 0, 0, 0, null, null); + + private static class DelayingProcessor implements RequestProcessor, Flushable { + private final RequestProcessor next; + private Queue<Request> delayed = null; + private DelayingProcessor(RequestProcessor next) { + this.next = next; + } + @Override + public void flush() throws IOException { + if (delayed == null && next instanceof Flushable) { + ((Flushable) next).flush(); + } + } + @Override + public void processRequest(Request request) throws RequestProcessorException { + if (delayed == null) { + next.processRequest(request); + } else { + delayed.add(request); + } + } + @Override + public void shutdown() { + next.shutdown(); + } + private void close() { + if (delayed == null) { + delayed = new ArrayDeque<>(); + } + } + private void open() throws RequestProcessorException { + if (delayed != null) { + for (Request request : delayed) { + next.processRequest(request); + } + delayed = null; + } + } + } + + /** The number of log entries to log before starting a snapshot */ + private static int snapCount = ZooKeeperServer.getSnapCount(); + + /** + * The total size of log entries before starting a snapshot + */ + private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes(); + + /** + * Random numbers used to vary snapshot timing + */ + private int randRoll; + private long randSize; + + private final BlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); + + private final Semaphore snapThreadMutex = new Semaphore(1); + + private final ZooKeeperServer zks; + + private final DelayingProcessor nextProcessor; + + /** + * Transactions that have been written and are waiting to be flushed to + * disk. Basically this is the list of SyncItems whose callbacks will be + * invoked after flush returns successfully. + */ + private final Queue<Request> toFlush; + private long lastFlushTime; + + public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { + super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); + this.zks = zks; + this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); + this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); + } + + /** + * used by tests to check for changing + * snapcounts + * @param count + */ + public static void setSnapCount(int count) { + snapCount = count; + } + + /** + * used by tests to get the snapcount + * @return the snapcount + */ + public static int getSnapCount() { + return snapCount; + } + + private long getRemainingDelay() { + long flushDelay = zks.getFlushDelay(); + long duration = Time.currentElapsedTime() - lastFlushTime; + if (duration < flushDelay) { + return flushDelay - duration; + } + return 0; + } + + /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush + * whenever either condition is hit. If only one or the other is + * set, flush only when the relevant condition is hit. + */ + private boolean shouldFlush() { + long flushDelay = zks.getFlushDelay(); + long maxBatchSize = zks.getMaxBatchSize(); + if ((flushDelay > 0) && (getRemainingDelay() == 0)) { + return true; + } + return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize); + } + + /** + * used by tests to check for changing + * snapcounts + * @param size + */ + public static void setSnapSizeInBytes(long size) { + snapSizeInBytes = size; + } + + private boolean shouldSnapshot() { + int logCount = zks.getZKDatabase().getTxnCount(); + long logSize = zks.getZKDatabase().getTxnSize(); + return (logCount > (snapCount / 2 + randRoll)) + || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); + } + + private void resetSnapshotStats() { + randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); + randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); + } + + @Override + public void run() { + try { + // we do this in an attempt to ensure that not all of the servers + // in the ensemble take a snapshot at the same time + resetSnapshotStats(); + lastFlushTime = Time.currentElapsedTime(); + while (true) { + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); + + long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); + Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); + if (si == null) { + /* We timed out looking for more writes to batch, go ahead and flush immediately */ + flush(); + si = queuedRequests.take(); + } + + if (si == REQUEST_OF_DEATH) { + break; + } + + if (si == turnForwardingDelayOn) { + nextProcessor.close(); + continue; + } + if (si == turnForwardingDelayOff) { + nextProcessor.open(); + continue; + } + + if (si instanceof FlushRequest) { + flush(); + ((FlushRequest) si).latch.countDown(); + continue; + } + + long startProcessTime = Time.currentElapsedTime(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); + + // track the number of records written to the log + if (!si.isThrottled() && zks.getZKDatabase().append(si)) { + if (shouldSnapshot()) { + resetSnapshotStats(); + // roll the log + zks.getZKDatabase().rollLog(); + // take a snapshot + if (!snapThreadMutex.tryAcquire()) { + LOG.warn("Too busy to snap, skipping"); + } else { + new ZooKeeperThread("Snapshot Thread") { + public void run() { + try { + zks.takeSnapshot(); + } catch (Exception e) { + LOG.warn("Unexpected exception", e); + } finally { + snapThreadMutex.release(); + } + } + }.start(); + } + } + } else if (toFlush.isEmpty()) { + // optimization for read heavy workloads + // iff this is a read or a throttled request(which doesn't need to be written to the disk), + // and there are no pending flushes (writes), then just pass this to the next processor + if (nextProcessor != null) { + nextProcessor.processRequest(si); + nextProcessor.flush(); + } + continue; + } + toFlush.add(si); + if (shouldFlush()) { + flush(); + } + ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); + } + } catch (Throwable t) { + handleException(this.getName(), t); + } + LOG.info("SyncRequestProcessor exited!"); + } + + /** Flushes all pending writes, and waits for this to complete. */ + public void syncFlush() throws InterruptedException { + FlushRequest marker = new FlushRequest(); + queuedRequests.add(marker); + marker.latch.await(); + } + + public void setDelayForwarding(boolean delayForwarding) { + queuedRequests.add(delayForwarding ? turnForwardingDelayOn : turnForwardingDelayOff); + } + + private void flush() throws IOException, RequestProcessorException { + if (this.toFlush.isEmpty()) { + return; + } + + ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); + + long flushStartTime = Time.currentElapsedTime(); + zks.getZKDatabase().commit(); + ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); + + if (this.nextProcessor == null) { + this.toFlush.clear(); + } else { + while (!this.toFlush.isEmpty()) { + final Request i = this.toFlush.remove(); + long latency = Time.currentElapsedTime() - i.syncQueueStartTime; + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); + this.nextProcessor.processRequest(i); + } + nextProcessor.flush(); + } + lastFlushTime = Time.currentElapsedTime(); + } + + public void shutdown() { + LOG.info("Shutting down"); + queuedRequests.add(REQUEST_OF_DEATH); + try { + this.join(); + this.flush(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while wating for {} to finish", this); + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOG.warn("Got IO exception during shutdown"); + } catch (RequestProcessorException e) { + LOG.warn("Got request processor exception during shutdown"); + } + if (nextProcessor != null) { + nextProcessor.shutdown(); + } + } + + public void processRequest(final Request request) { + Objects.requireNonNull(request, "Request cannot be null"); + + request.syncQueueStartTime = Time.currentElapsedTime(); + queuedRequests.add(request); + ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); + } + +} |