/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server; import java.io.Flushable; import java.io.IOException; import java.util.ArrayDeque; import java.util.Objects; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.common.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This RequestProcessor logs requests to disk. It batches the requests to do * the io efficiently. The request is not passed to the next RequestProcessor * until its log has been synced to disk. * * SyncRequestProcessor is used in 3 different cases * 1. Leader - Sync request to disk and forward it to AckRequestProcessor which * send ack back to itself. * 2. Follower - Sync request to disk and forward request to * SendAckRequestProcessor which send the packets to leader. * SendAckRequestProcessor is flushable which allow us to force * push packets to leader. * 3. Observer - Sync committed request to disk (received as INFORM packet). * It never send ack back to the leader, so the nextProcessor will * be null. This change the semantic of txnlog on the observer * since it only contains committed txns. */ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class); private static final Request REQUEST_OF_DEATH = Request.requestOfDeath; private static class FlushRequest extends Request { private final CountDownLatch latch = new CountDownLatch(1); public FlushRequest() { super(null, 0, 0, 0, null, null); } } private static final Request TURN_FORWARDING_DELAY_ON_REQUEST = new Request(null, 0, 0, 0, null, null); private static final Request TURN_FORWARDING_DELAY_OFF_REQUEST = new Request(null, 0, 0, 0, null, null); private static class DelayingProcessor implements RequestProcessor, Flushable { private final RequestProcessor next; private Queue delayed = null; private DelayingProcessor(RequestProcessor next) { this.next = next; } @Override public void flush() throws IOException { if (delayed == null && next instanceof Flushable) { ((Flushable) next).flush(); } } @Override public void processRequest(Request request) throws RequestProcessorException { if (delayed == null) { next.processRequest(request); } else { delayed.add(request); } } @Override public void shutdown() { next.shutdown(); } private void startDelaying() { if (delayed == null) { delayed = new ArrayDeque<>(); } } private void flushAndStopDelaying() throws RequestProcessorException { if (delayed != null) { for (Request request : delayed) { next.processRequest(request); } delayed = null; } } } /** The number of log entries to log before starting a snapshot */ private static int snapCount = ZooKeeperServer.getSnapCount(); /** * The total size of log entries before starting a snapshot */ private static long snapSizeInBytes = ZooKeeperServer.getSnapSizeInBytes(); /** * Random numbers used to vary snapshot timing */ private int randRoll; private long randSize; private final BlockingQueue queuedRequests = new LinkedBlockingQueue<>(); private final Semaphore snapThreadMutex = new Semaphore(1); private final ZooKeeperServer zks; private final DelayingProcessor nextProcessor; /** * Transactions that have been written and are waiting to be flushed to * disk. Basically this is the list of SyncItems whose callbacks will be * invoked after flush returns successfully. */ private final Queue toFlush; private long lastFlushTime; public SyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) { super("SyncThread:" + zks.getServerId(), zks.getZooKeeperServerListener()); this.zks = zks; this.nextProcessor = nextProcessor == null ? null : new DelayingProcessor(nextProcessor); this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize()); } /** * used by tests to check for changing * snapcounts * @param count */ public static void setSnapCount(int count) { snapCount = count; } /** * used by tests to get the snapcount * @return the snapcount */ public static int getSnapCount() { return snapCount; } private long getRemainingDelay() { long flushDelay = zks.getFlushDelay(); long duration = Time.currentElapsedTime() - lastFlushTime; if (duration < flushDelay) { return flushDelay - duration; } return 0; } /** If both flushDelay and maxMaxBatchSize are set (bigger than 0), flush * whenever either condition is hit. If only one or the other is * set, flush only when the relevant condition is hit. */ private boolean shouldFlush() { long flushDelay = zks.getFlushDelay(); long maxBatchSize = zks.getMaxBatchSize(); if ((flushDelay > 0) && (getRemainingDelay() == 0)) { return true; } return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize); } /** * used by tests to check for changing * snapcounts * @param size */ public static void setSnapSizeInBytes(long size) { snapSizeInBytes = size; } private boolean shouldSnapshot() { int logCount = zks.getZKDatabase().getTxnCount(); long logSize = zks.getZKDatabase().getTxnSize(); return (logCount > (snapCount / 2 + randRoll)) || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); } private void resetSnapshotStats() { randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); } @Override public void run() { try { // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); if (si == null) { /* We timed out looking for more writes to batch, go ahead and flush immediately */ flush(); si = queuedRequests.take(); } if (si == REQUEST_OF_DEATH) { break; } if (si == TURN_FORWARDING_DELAY_ON_REQUEST) { nextProcessor.startDelaying(); continue; } if (si == TURN_FORWARDING_DELAY_OFF_REQUEST) { nextProcessor.flushAndStopDelaying(); continue; } if (si instanceof FlushRequest) { flush(); ((FlushRequest) si).latch.countDown(); continue; } long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); // track the number of records written to the log if (!si.isThrottled() && zks.getZKDatabase().append(si)) { if (shouldSnapshot()) { resetSnapshotStats(); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read or a throttled request(which doesn't need to be written to the disk), // and there are no pending flushes (writes), then just pass this to the next processor if (nextProcessor != null) { nextProcessor.processRequest(si); nextProcessor.flush(); } continue; } toFlush.add(si); if (shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); } } catch (Throwable t) { handleException(this.getName(), t); } LOG.info("SyncRequestProcessor exited!"); } /** Flushes all pending writes, and waits for this to complete. */ public void syncFlush() throws InterruptedException { FlushRequest marker = new FlushRequest(); queuedRequests.add(marker); marker.latch.await(); } public void setDelayForwarding(boolean delayForwarding) { queuedRequests.add(delayForwarding ? TURN_FORWARDING_DELAY_ON_REQUEST : TURN_FORWARDING_DELAY_OFF_REQUEST); } private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; } ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); long flushStartTime = Time.currentElapsedTime(); zks.getZKDatabase().commit(); ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); if (this.nextProcessor == null) { this.toFlush.clear(); } else { while (!this.toFlush.isEmpty()) { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); this.nextProcessor.processRequest(i); } nextProcessor.flush(); } lastFlushTime = Time.currentElapsedTime(); } public void shutdown() { LOG.info("Shutting down"); queuedRequests.add(REQUEST_OF_DEATH); try { this.join(); this.flush(); } catch (InterruptedException e) { LOG.warn("Interrupted while wating for {} to finish", this); Thread.currentThread().interrupt(); } catch (IOException e) { LOG.warn("Got IO exception during shutdown"); } catch (RequestProcessorException e) { LOG.warn("Got request processor exception during shutdown"); } if (nextProcessor != null) { nextProcessor.shutdown(); } } public void processRequest(final Request request) { Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime(); queuedRequests.add(request); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); } }