// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus; import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.AckToken; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorDataQueue; import com.yahoo.documentapi.VisitorIterator; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage; import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage; import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply; import java.util.List; import java.util.logging.Level; import com.yahoo.messagebus.DestinationSession; import com.yahoo.messagebus.DestinationSessionParams; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.ErrorCode; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.MessageHandler; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.Result; import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.routing.RoutingTable; import com.yahoo.vdslib.VisitorStatistics; import com.yahoo.vdslib.state.ClusterState; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; /** *

* A visitor session for tracking progress for and potentially receiving data from * a visitor using a MessageBus source and destination session. The source session * is used to initiate visiting by sending create visitor messages to storage and * the destination session is used for receiving progress. If the visitor is not * set up to send data to a remote destination, data will also be received through * the destination session. *

*

* Create the visitor session by calling the * DocumentAccess.createVisitorSession method. *

*/ public class MessageBusVisitorSession implements VisitorSession { /** * Abstract away notion of source session into a generic Sender * interface to allow easy mocking. */ public interface Sender { Result send(Message msg); int getPendingCount(); void destroy(); } public interface SenderFactory { Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters); } /** * Abstract away notion of destination session into a generic Receiver * interface to allow easy mocking. * The implementation must be thread safe since reply() can be invoked * from an arbitrary thread. */ public interface Receiver { void reply(Reply reply); void destroy(); /** * Get connection spec that can be used by other clients to send * messages to this Receiver. * @return connection spec */ String getConnectionSpec(); } public interface ReceiverFactory { Receiver createReceiver(MessageHandler messageHandler, String sessionName); } public interface AsyncTaskExecutor { void submitTask(Runnable event); void scheduleTask(Runnable event, long delay, TimeUnit unit); } public interface Clock { long monotonicNanoTime(); } public static class VisitingProgress { private final VisitorIterator iterator; private final ProgressToken token; public VisitingProgress(VisitorIterator iterator, ProgressToken token) { this.iterator = iterator; this.token = token; } public VisitorIterator getIterator() { return iterator; } public ProgressToken getToken() { return token; } } public enum State { NOT_STARTED(false), WORKING(false), COMPLETED(false), ABORTED(true), FAILED(true), TIMED_OUT(true); private final boolean failure; State(boolean failure) { this.failure = failure; } public boolean isFailure() { return failure; } } public class StateDescription { private final State state; private final String description; public StateDescription(State state, String description) { this.state = state; this.description = description; } public StateDescription(State state) { this.state = state; this.description = ""; } public State getState() { return state; } public String getDescription() { return description; } VisitorControlHandler.CompletionCode toCompletionCode() { return switch (state) { case COMPLETED -> VisitorControlHandler.CompletionCode.SUCCESS; case ABORTED -> VisitorControlHandler.CompletionCode.ABORTED; case FAILED -> VisitorControlHandler.CompletionCode.FAILURE; case TIMED_OUT -> VisitorControlHandler.CompletionCode.TIMEOUT; default -> throw new IllegalStateException("Current state did not have a valid value: " + state); }; } public boolean failed() { return state.isFailure(); } public String toString() { return state + ": " + description; } } /** * Message bus implementations of interfaces */ public static class MessageBusSender implements Sender { private final SourceSession sourceSession; public MessageBusSender(SourceSession sourceSession) { this.sourceSession = sourceSession; } @Override public Result send(Message msg) { return sourceSession.send(msg); } @Override public int getPendingCount() { return sourceSession.getPendingCount(); } @Override public void destroy() { sourceSession.destroy(); } } public static class MessageBusSenderFactory implements SenderFactory { private final MessageBus messageBus; public MessageBusSenderFactory(MessageBus messageBus) { this.messageBus = messageBus; } private SourceSessionParams createSourceSessionParams(VisitorParameters visitorParameters) { SourceSessionParams sourceParams = new SourceSessionParams(); if (visitorParameters.getThrottlePolicy() != null) { sourceParams.setThrottlePolicy(visitorParameters.getThrottlePolicy()); } else { sourceParams.setThrottlePolicy(new DynamicThrottlePolicy()); } return sourceParams; } @Override public Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters) { SourceSessionParams sessionParams = createSourceSessionParams(visitorParameters); return new MessageBusSender(messageBus.createSourceSession(replyHandler, sessionParams)); } } public static class MessageBusReceiver implements Receiver { private final DestinationSession destinationSession; public MessageBusReceiver(DestinationSession destinationSession) { this.destinationSession = destinationSession; } @Override public void reply(Reply reply) { destinationSession.reply(reply); } @Override public void destroy() { destinationSession.destroy(); } @Override public String getConnectionSpec() { return destinationSession.getConnectionSpec(); } } public static class MessageBusReceiverFactory implements ReceiverFactory { private final MessageBus messageBus; public MessageBusReceiverFactory(MessageBus messageBus) { this.messageBus = messageBus; } private DestinationSessionParams createDestinationParams(MessageHandler messageHandler, String visitorName) { DestinationSessionParams destparams = new DestinationSessionParams(); destparams.setName(visitorName); destparams.setBroadcastName(false); destparams.setMessageHandler(messageHandler); return destparams; } @Override public Receiver createReceiver(MessageHandler messageHandler, String sessionName) { DestinationSessionParams destinationParams = createDestinationParams(messageHandler, sessionName); return new MessageBusReceiver(messageBus.createDestinationSession(destinationParams)); } } public static class ThreadAsyncTaskExecutor implements AsyncTaskExecutor { private final ScheduledExecutorService executor; public ThreadAsyncTaskExecutor(ScheduledExecutorService executor) { this.executor = executor; } @Override public void submitTask(Runnable task) { executor.submit(task); } @Override public void scheduleTask(Runnable task, long delay, TimeUnit unit) { executor.schedule(task, delay, unit); } } public static class RealClock implements Clock { @Override public long monotonicNanoTime() { return System.nanoTime(); } } private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName()); private static final AtomicLong sessionCounter = new AtomicLong(0); private static long getNextSessionId() { return sessionCounter.incrementAndGet(); } private static String createSessionName() { return "visitor-" + getNextSessionId() + '-' + System.currentTimeMillis(); } private final VisitorParameters params; private final Sender sender; private final Receiver receiver; private final AsyncTaskExecutor taskExecutor; private final VisitingProgress progress; private final VisitorStatistics statistics; private final String sessionName = createSessionName(); private final String dataDestination; private final Clock clock; private final Object replyTrackingMonitor = new Object(); private StateDescription state; private long visitorCounter = 0; private long startTimeNanos = 0; private long scheduledHandleReplyTasks = 0; // Must be protected by replyTrackingMonitor private boolean scheduledSendCreateVisitors = false; private boolean done = false; private boolean destroying = false; // For testing and sanity checking private final Object completionMonitor = new Object(); private final Trace trace; /** * We keep our own track of pending messages since the sender's pending * count cannot be relied on in an async task execution context. This * because it is decremented before the message is actually processed. */ private int pendingMessageCount = 0; public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor taskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable) throws ParseException { this(visitorParameters, taskExecutor, senderFactory, receiverFactory, routingTable, new RealClock()); } public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor taskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable, Clock clock) throws ParseException { this.params = visitorParameters; // TODO(vekterli): make copy? legacy impl does not copy initializeRoute(routingTable); this.sender = senderFactory.createSender(createReplyHandler(), this.params); this.receiver = receiverFactory.createReceiver(createMessageHandler(), sessionName); this.taskExecutor = taskExecutor; this.progress = createVisitingProgress(params); this.statistics = new VisitorStatistics(); this.state = new StateDescription(State.NOT_STARTED); this.clock = clock; initializeHandlers(); trace = new Trace(visitorParameters.getTraceLevel()); dataDestination = (params.getLocalDataHandler() == null ? params.getRemoteDataHandler() : receiver.getConnectionSpec()); validateSessionParameters(); // If we're already done, no need to do anything at all! if (progress.getIterator().isDone()) { markSessionCompleted(); } } public static MessageBusVisitorSession createForMessageBus(final MessageBus mbus, final ScheduledExecutorService scheduledExecutorService, final VisitorParameters params) throws ParseException { final AsyncTaskExecutor executor = new ThreadAsyncTaskExecutor(scheduledExecutorService); final MessageBusSenderFactory senderFactory = new MessageBusSenderFactory(mbus); final MessageBusReceiverFactory receiverFactory = new MessageBusReceiverFactory(mbus); final RoutingTable table = mbus.getRoutingTable(DocumentProtocol.NAME); return new MessageBusVisitorSession(params, executor, senderFactory, receiverFactory, table); } private void validateSessionParameters() { if (dataDestination == null) { throw new IllegalStateException("No data destination specified"); } } public void start() { synchronized (progress.getToken()) { this.startTimeNanos = clock.monotonicNanoTime(); if (progress.getIterator().isDone()) { log.log(Level.FINE, () -> sessionName + ": progress token indicates " + "session is done before it could even start; no-op"); return; } transitionTo(new StateDescription(State.WORKING)); taskExecutor.submitTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(0))); } } private void updateStateUnlessAlreadyFailed(StateDescription newState) { if (!state.failed()) { state = newState; } // else: don't override existing failure state } /** * Attempt to transition to a new state. Depending on the current state, * some transitions may be disallowed, such as transitioning from ABORTED * to COMPLETED, since failures take precedence. Transitioning multiple * times to the same state is a no-op in order to conserve the textual * description given by the first transition to said state (which most * likely is the most useful one for the end-user). * * @param newState State to attempt to transition to. * @return State which is current after the transition. If transition was * successful, will be equal to newState. */ private StateDescription transitionTo(StateDescription newState) { log.log(Level.FINE, () -> sessionName + ": attempting transition to state " + newState); switch (newState.getState()) { case WORKING: assert(state.getState() == State.NOT_STARTED); state = newState; break; case ABORTED: state = newState; break; case COMPLETED: case FAILED: case TIMED_OUT: updateStateUnlessAlreadyFailed(newState); break; default: com.yahoo.protect.Process.logAndDie("Invalid target transition state: " + newState); } log.log(Level.FINE, () -> "Session '" + sessionName + "' is now in state " + state); return state; } private boolean hasScheduledHandleReplyTask() { // This is synchronized instead of an AtomicLong simply because it makes it considerably // easier to reason about happens-before relationships, memory visibility and sequencing // of events across threads when an actual critical section is involved. synchronized (replyTrackingMonitor) { return scheduledHandleReplyTasks != 0; } } private void incrementScheduledHandleReplyTasks() { synchronized (replyTrackingMonitor) { ++scheduledHandleReplyTasks; } } private void decrementScheduleHandleReplyTasks() { synchronized (replyTrackingMonitor) { assert(scheduledHandleReplyTasks > 0); --scheduledHandleReplyTasks; } } private ReplyHandler createReplyHandler() { return (reply) -> { // Generally, handleReply will run in the context of the // underlying transport layer's processing thread(s), so we // schedule our own reply handling task to avoid blocking it. try { // Make concurrent reply handling visible in sender thread, if it's active. // See SendCreateVisitorsTask.run() for a rationale. incrementScheduledHandleReplyTasks(); taskExecutor.submitTask(new HandleReplyTask(reply)); } catch (RejectedExecutionException e) { decrementScheduleHandleReplyTasks(); // We cannot reliably handle reply tasks failing to be submitted, since // the reply task performs all our internal state handling logic. As such, // we just immediately go into a failure destruction mode as soon as this // happens, in which we do not wait for any active messages to be replied // to. log.log(Level.WARNING, "Visitor session '" + sessionName + "': failed to submit reply task to executor service! " + "Session cannot reliably continue; terminating it early.", e); synchronized (progress.getToken()) { transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage())); if (!done) { markSessionCompleted(); } } } }; } private MessageHandler createMessageHandler() { return (message) -> { try { taskExecutor.submitTask(new HandleMessageTask(message)); } catch (RejectedExecutionException e) { Reply reply = ((DocumentMessage)message).createReply(); message.swapState(reply); reply.addError(new Error( DocumentProtocol.ERROR_ABORTED, "Visitor session has been aborted")); receiver.reply(reply); } }; } private void initializeRoute(RoutingTable routingTable) { // If no cluster route has been set by user arguments, attempt to retrieve it from mbus config. if (params.getRoute() == null || !params.getRoute().hasHops()) { params.setRoute(getClusterRoute(routingTable)); log.log(Level.FINE, () -> "No route specified; resolved implicit " + "storage cluster: " + params.getRoute().toString()); } } private String getClusterRoute(RoutingTable routingTable) throws IllegalArgumentException{ String route = null; for (RoutingTable.RouteIterator it = routingTable.getRouteIterator(); it.isValid(); it.next()) { String str = it.getName(); if (str.startsWith("storage/cluster.")) { if (route != null) { throw new IllegalArgumentException( "There are multiple storage clusters in your application, " + "please specify which one to visit."); } route = str; } } if (route == null) { throw new IllegalArgumentException("No storage cluster found in your application."); } return route; } /** * Called from the constructor to ensure control and data handlers * are set and initialized. */ private void initializeHandlers() { if (this.params.getLocalDataHandler() != null) { this.params.getLocalDataHandler().reset(); this.params.getLocalDataHandler().setSession(this); } else if (this.params.getRemoteDataHandler() == null) { this.params.setLocalDataHandler(new VisitorDataQueue()); this.params.getLocalDataHandler().setSession(this); } if (params.getControlHandler() != null) { params.getControlHandler().reset(); } else { params.setControlHandler(new VisitorControlHandler()); } params.getControlHandler().setSession(this); } private VisitingProgress createVisitingProgress(VisitorParameters params) throws ParseException { ProgressToken progressToken; if (params.getResumeToken() != null) { progressToken = params.getResumeToken(); } else { progressToken = new ProgressToken(); } VisitorIterator visitorIterator; if (params.getBucketsToVisit() == null || params.getBucketsToVisit().isEmpty()) { // Use 1 distribution bit as a starting point. This will almost certainly // trigger a ERROR_WRONG_DISTRIBUTION reply immediately, meaning that we'll // get a fresh system state from the start. Since no buckets should ever // return with a OK result in such a case, we recognize this as a special // case in the iterator and simply reset its entire internal state using // the new db count rather than doing any splitting. BucketIdFactory bucketIdFactory = new BucketIdFactory(); visitorIterator = VisitorIterator.createFromDocumentSelection( params.getDocumentSelection(), bucketIdFactory, 1, progressToken, params.getSlices(), params.getSliceId()); } else { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "parameters specify explicit bucket set " + "to visit; using it rather than document selection (" + params.getBucketsToVisit().size() + " buckets given)"); } // Allow override of target buckets iff an explicit set of buckets // to visit is given by the visitor parameters. This was primarily // used for the defunct synchronization functionality, but since it's // so easy to support, don't deprecate it just yet. visitorIterator = VisitorIterator.createFromExplicitBucketSet( params.getBucketsToVisit(), 1, progressToken); } return new VisitingProgress(visitorIterator, progressToken); } private class SendCreateVisitorsTask implements Runnable { // All private methods in this task must be protected by a lock around // the progress token! private final long messageTimeoutMs; SendCreateVisitorsTask(long messageTimeoutMs) { this.messageTimeoutMs = messageTimeoutMs; } private String getNextVisitorId() { StringBuilder sb = new StringBuilder(); ++visitorCounter; sb.append(sessionName).append('-').append(visitorCounter); return sb.toString(); } @SuppressWarnings("removal") // TODO: Remove on Vespa 9 private CreateVisitorMessage createMessage(VisitorIterator.BucketProgress bucket) { CreateVisitorMessage msg = new CreateVisitorMessage( params.getVisitorLibrary(), getNextVisitorId(), receiver.getConnectionSpec(), dataDestination); msg.getTrace().setLevel(params.getTraceLevel()); msg.setTimeRemaining(messageTimeoutMs); msg.setBuckets(List.of(bucket.getSuperbucket(), bucket.getProgress())); msg.setDocumentSelection(params.getDocumentSelection()); msg.setBucketSpace(params.getBucketSpace()); msg.setFromTimestamp(params.getFromTimestamp()); msg.setToTimestamp(params.getToTimestamp()); msg.setMaxPendingReplyCount(params.getMaxPending()); msg.setFieldSet(params.fieldSet()); msg.setVisitInconsistentBuckets(params.visitInconsistentBuckets()); msg.setVisitRemoves(params.visitRemoves()); msg.setParameters(params.getLibraryParameters()); msg.setRoute(params.getRoute()); msg.setMaxBucketsPerVisitor(params.getMaxBucketsPerVisitor()); msg.setPriority(params.getPriority()); // TODO: remove on Vespa 9 msg.setRetryEnabled(false); return msg; } public void run() { // Must sync around token as legacy API exposes it to handlers // and they expect to be able to sync around it. synchronized (progress.getToken()) { try { scheduledSendCreateVisitors = false; if (done) { return; // Session already closed; we must not touch anything else. } // We both send requests and process replies in the context of a dedicated task executor pool. // However, MessageBus sending and reply receiving happens in the context of entirely // separate threads. If the backend responds very quickly to visitor requests (such as // if buckets are empty), this can leave us in the following awkward position: // // 1. Replies arrive from backend, open up the throttle window, reply handling // task gets pushed onto executor queue (but not yet executed). // 2. Send loop below continuously get a free send slot, keeps sending visitors // and filling up the set of pending buckets in the progress token. // 3. Since visitor session is busy-looping in the send task, reply processing is // consequently entirely starved until the MessageBus throttle window is bursting // at the seams. This can effectively nullify the effects of the throttling policy, // especially if it's dynamic. But a static throttle policy with a sufficiently // high max window size will also potentially cause a runaway visitor train since // the active window size keeps getting decreased by backend replies. // // To get around this, we explicitly check for concurrently scheduled message handling // tasks from the transport layer, breaking the loop if at least one handler has been // scheduled. This also has the (positive) effect of draining all reply tasks before we // start sending more work downstream. // // Since visitor session progress is edge-triggered and progresses exclusively by sending // new visitors in reply handling tasks, it's critical that we never end up in a situation // where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively // hanging the session. We must therefore be very careful that we only exit the send loop // if we _know_ we have at least one pending task enqueued that will ensure session progress. // // We're holding the session (token) lock around checking the pending reply tasks count, so // if we observe a change we know that a reply task must have been scheduled and that its // processing must take place sequenced after we have exited the loop, as the reply handling // also takes the session (token) lock. I.e. it should not be possible to end up in a // situation where we stall session progress due to not having any further event edges. while (progress.getIterator().hasNext() && !hasScheduledHandleReplyTask()) { VisitorIterator.BucketProgress bucket = progress.getIterator().getNext(); Result result = sender.send(createMessage(bucket)); if (result.isAccepted()) { log.log(Level.FINE, () -> sessionName + ": sent CreateVisitor for bucket " + bucket.getSuperbucket() + " with progress " + bucket.getProgress()); ++pendingMessageCount; } else { // Must reinsert bucket without progress into iterator since // we failed to send visitor. progress.getIterator().update(bucket.getSuperbucket(), bucket.getProgress()); break; } } } catch (Exception e) { String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while attempting to send visitors"; log.log(Level.WARNING, msg); transitionTo(new StateDescription(State.FAILED, msg)); // It's likely that the exception caused a failure to send a // visitor message, meaning we won't get a reply task in the // future from which we can execute logic to complete the // session. Thusly, we have to do this here and now. continueVisiting(); } catch (Throwable t) { // We can't reliably handle this; take a nosedive com.yahoo.protect.Process.logAndDie("Caught unhandled error when trying to send visitors", t); } } } } private void continueVisiting() { if ( ! scheduleSendCreateVisitorsIfApplicable() && visitingCompleted()) { markSessionCompleted(); } } private void markSessionCompleted() { // 'done' is only ever written when token mutex is held, so safe to check // outside of completionMonitor lock. log.log(Level.FINE, () -> "Visitor session '" + sessionName + "' has completed"); if (params.getLocalDataHandler() != null) { params.getLocalDataHandler().onDone(); } // If skipFatalErrors is set and a fatal error did occur, fail // the session now with the first encountered error message. if (progress.getToken().containsFailedBuckets()) { transitionTo(new StateDescription(State.FAILED, progress.getToken().getFirstErrorMsg())); } // NOTE: transitioning to COMPLETED will not override a failure // state, so it's safe to always do this. transitionTo(new StateDescription(State.COMPLETED)); params.getControlHandler().onDone(state.toCompletionCode(), state.getDescription()); synchronized (completionMonitor) { done = true; completionMonitor.notifyAll(); } } private class HandleReplyTask implements Runnable { private final Reply reply; HandleReplyTask(Reply reply) { this.reply = reply; } @Override public void run() { synchronized (progress.getToken()) { // Decrement pending replies inside same lock as sender task to ensure that if the sender // observes a non-zero number of reply tasks, it's guaranteed that this actually means a // task _will_ be run later at some point. decrementScheduleHandleReplyTasks(); try { assert(pendingMessageCount > 0); --pendingMessageCount; if (reply.hasErrors()) { handleErrorReply(reply); } else if (reply instanceof CreateVisitorReply) { handleCreateVisitorReply((CreateVisitorReply)reply); } else { String msg = "Received reply we do not know how to handle: " + reply.getClass().getName(); log.log(Level.SEVERE, msg); transitionTo(new StateDescription(State.FAILED, msg)); } } catch (Exception e) { String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while processing reply in visitor session"; log.log(Level.WARNING, msg, e); transitionTo(new StateDescription(State.FAILED, msg)); } catch (Throwable t) { // We can't reliably handle this; take a nosedive com.yahoo.protect.Process.logAndDie("Caught unhandled error when running reply task", t); } finally { continueVisiting(); } } } } private class HandleMessageTask implements Runnable { private final Message message; private HandleMessageTask(Message message) { this.message = message; } @Override public void run() { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "Visitor session " + sessionName + ": Received message " + message); } try { if (message instanceof VisitorInfoMessage) { handleVisitorInfoMessage((VisitorInfoMessage)message); // always replies } else { handleDocumentMessage((DocumentMessage)message); // always replies on error } } catch (Throwable t) { com.yahoo.protect.Process.logAndDie("Caught unhandled error when processing message", t); } } } private void handleMessageProcessingException(Reply reply, Exception e, String what) { String errorDesc = formatProcessingException(e, what); String fullMsg = formatIdentifyingVisitorErrorString(errorDesc); log.log(Level.SEVERE, fullMsg, e); int errorCode; synchronized (progress.getToken()) { if (!params.skipBucketsOnFatalErrors()) { errorCode = ErrorCode.APP_FATAL_ERROR; transitionTo(new StateDescription(State.FAILED, errorDesc)); } else { errorCode = DocumentProtocol.ERROR_UNPARSEABLE; } } reply.addError(new Error(errorCode, errorDesc)); } private String formatProcessingException(Exception e, String whileProcessing) { return String.format( "Got exception of type %s with message '%s' while processing %s", e.getClass().getName(), e.getMessage(), whileProcessing); } private String formatIdentifyingVisitorErrorString(String details) { return String.format( "Visitor %s (selection '%s'): %s", sessionName, params.getDocumentSelection(), details); } /** * NOTE: not called from within lock, function must take lock itself */ private void handleVisitorInfoMessage(VisitorInfoMessage msg) { Reply reply = msg.createReply(); msg.swapState(reply); if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "Visitor session " + sessionName + ": Received VisitorInfo with " + msg.getFinishedBuckets().size() + " finished buckets"); } try { if (!msg.getErrorMessage().isEmpty()) { params.getControlHandler().onVisitorError(msg.getErrorMessage()); } synchronized (progress.getToken()) { // NOTE: control handlers shall sync on token themselves if // they want to access it, but recursive locking is OK and by // always locking we make screwing it up harder. if (!isDone()) { params.getControlHandler().onProgress(progress.getToken()); } else { reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "Visitor has been shut down")); } } } catch (Exception e) { handleMessageProcessingException(reply, e, "VisitorInfoMessage"); } finally { receiver.reply(reply); } } private void handleDocumentMessage(DocumentMessage msg) { Reply reply = msg.createReply(); msg.swapState(reply); if (params.getLocalDataHandler() == null) { log.log(Level.SEVERE, sessionName + ": Got visitor data back to client with no local data destination."); reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "Visitor data with no local data destination")); receiver.reply(reply); return; } try { params.getLocalDataHandler().onMessage(msg, new AckToken(reply)); } catch (Exception e) { handleMessageProcessingException(reply, e, "DocumentMessage"); // Immediately reply since we cannot count on AckToken being registered receiver.reply(reply); } } private boolean isFatalError(Reply reply) { Error error = reply.getError(0); switch (error.getCode()) { case ErrorCode.TIMEOUT: case DocumentProtocol.ERROR_BUCKET_NOT_FOUND: case DocumentProtocol.ERROR_WRONG_DISTRIBUTION: return false; } return error.isFatal(); } /** * Return whether a (transient) error shall be exempt from visitor * error reporting. This to prevent spamming handlers and output with * errors for things that are happening naturally in the system. * @return true if the error should be reported */ private boolean shouldReportError(Reply reply) { Error error = reply.getError(0); switch (error.getCode()) { case DocumentProtocol.ERROR_BUCKET_NOT_FOUND: case DocumentProtocol.ERROR_BUCKET_DELETED: return false; } return true; } private static String getErrorMessage(Error r) { return DocumentProtocol.getErrorName(r.getCode()) + ": " + r.getMessage(); } private static boolean isErrorOfType(Reply reply, int errorCode) { return reply.getError(0).getCode() == errorCode; } private void reportVisitorError(String message) { params.getControlHandler().onVisitorError(message); } private void handleErrorReply(Reply reply) { CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage(); // Must reset bucket progress back to what it was before sending. BucketId bucket = msg.getBuckets().get(0); BucketId subProgress = msg.getBuckets().get(1); progress.getIterator().update(bucket, subProgress); String message = getErrorMessage(reply.getError(0)); log.log(Level.FINE, () -> sessionName + ": received error reply for bucket " + bucket + " with message '" + message + "'"); if (isFatalError(reply)) { if (params.skipBucketsOnFatalErrors()) { markBucketProgressAsFailed(bucket, subProgress, message); } else { reportVisitorError(message); transitionTo(new StateDescription(State.FAILED, message)); return; // no additional visitors will be scheduled post-failure } } if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) { handleWrongDistributionReply((WrongDistributionReply) reply); } else { if (shouldReportError(reply)) { reportVisitorError(message); } // Wait 100ms before new visitor task is executed. Will prevent // visitors from being scheduled from caller. scheduleSendCreateVisitorsIfApplicable(100, TimeUnit.MILLISECONDS); } } private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) { progress.getToken().addFailedBucket(bucket, subProgress, message); progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET); } private boolean enoughHitsReceived() { return params.getMaxTotalHits() != -1 && (statistics.getDocumentsReturned() >= params.getMaxTotalHits()); } /** * A session is considered completed if one or more of the following holds true: * - All buckets have been visited (i.e. no active or pending visitors). * - Visiting has failed fatally (or has been aborted) AND there are no * active visitors remaining. 'Active' here means that we're waiting * for a reply. * - We have received sufficient number of documents (set via visitor * parameters) from the buckets already visited AND there are no * active visitors remaining. * @return true if visiting has completed, false otherwise */ private boolean visitingCompleted() { return (pendingMessageCount == 0) && (progress.getIterator().isDone() || state.failed() || enoughHitsReceived()); } private long messageTimeoutMillis() { return !isInfiniteTimeout(params.getTimeoutMs()) ? Math.max(1, params.getTimeoutMs()) : 5 * 60 * 1000; } private long sessionTimeoutMillis() { return params.getSessionTimeoutMs(); } private long elapsedTimeMillis() { return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos); } private static boolean isInfiniteTimeout(long timeoutMillis) { return timeoutMillis < 0; } private long computeBoundedMessageTimeoutMillis(long elapsedMs) { final long messageTimeoutMillis = messageTimeoutMillis(); return ! isInfiniteTimeout(sessionTimeoutMillis()) ? Math.min(Math.max(1, sessionTimeoutMillis() - elapsedMs), messageTimeoutMillis) : messageTimeoutMillis; } /** * Schedule a new SendCreateVisitors task iff there are still buckets to * visit, the visiting has not failed fatally and we haven't already * scheduled such a task. Return whether a visitor was scheduled here. */ private boolean scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) { final long elapsedMillis = elapsedTimeMillis(); if (!isInfiniteTimeout(sessionTimeoutMillis()) && (elapsedMillis >= sessionTimeoutMillis())) { transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", sessionTimeoutMillis()))); } if (!mayScheduleCreateVisitorsTask() || visitingCompleted()) { return false; } final long messageTimeoutMillis = computeBoundedMessageTimeoutMillis(elapsedMillis); taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMillis), delay, unit); scheduledSendCreateVisitors = true; return true; } private boolean mayScheduleCreateVisitorsTask() { return ! (scheduledSendCreateVisitors || !progress.getIterator().hasNext() || state.failed() || enoughHitsReceived()); } private boolean scheduleSendCreateVisitorsIfApplicable() { return scheduleSendCreateVisitorsIfApplicable(0, TimeUnit.MILLISECONDS); } private void handleCreateVisitorReply(CreateVisitorReply reply) { CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage(); BucketId superbucket = msg.getBuckets().get(0); BucketId subBucketProgress = reply.getLastBucket(); log.log(Level.FINE, () -> sessionName + ": received CreateVisitorReply for bucket " + superbucket + " with progress " + subBucketProgress); progress.getIterator().update(superbucket, subBucketProgress); params.getControlHandler().onProgress(progress.getToken()); statistics.add(reply.getVisitorStatistics()); params.getControlHandler().onVisitorStatistics(statistics); // A visitor session might be long lived so we need a safeguard against blowing the memory if tracing // has been enabled. if ( ! reply.getTrace().getRoot().isEmpty() && (trace.getRoot().getNumChildren() < 1000)) { trace.getRoot().addChild(reply.getTrace().getRoot()); } } private void handleWrongDistributionReply(WrongDistributionReply reply) { try { ClusterState newState = new ClusterState(reply.getSystemState()); int stateBits = newState.getDistributionBitCount(); if (stateBits != progress.getIterator().getDistributionBitCount()) { log.log(Level.FINE, () -> "System state changed; now at " + stateBits + " distribution bits"); // Update the internal state of the visitor iterator. If we're increasing // the number of distribution bits, this may lead to splitting of pending // buckets. If we're decreasing, it may lead to merging of pending buckets // and potential loss of sub-bucket progress. In either way, the iterator // will not let any new buckets out before all active buckets have been // updated. progress.getIterator().setDistributionBitCount(stateBits); } } catch (Exception e) { log.log(Level.SEVERE, "Failed to parse new system state string: " + reply.getSystemState()); transitionTo(new StateDescription(State.FAILED, "Failed to parse cluster state '" + reply.getSystemState() + "'")); } } public String getSessionName() { return sessionName; } @Override public boolean isDone() { synchronized (progress.getToken()) { return done; } } @Override public ProgressToken getProgress() { return progress.getToken(); } @Override public Trace getTrace() { return trace; } @Override public boolean waitUntilDone(long timeoutMs) throws InterruptedException { return params.getControlHandler().waitUntilDone(timeoutMs); } @Override public void ack(AckToken token) { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "Visitor session " + sessionName + ": Sending ack " + token.ackObject); } // No locking here; replying should be thread safe in itself receiver.reply((Reply)token.ackObject); } @Override public void abort() { synchronized (progress.getToken()) { transitionTo(new StateDescription(State.ABORTED, "Visitor aborted by user")); } } @Override public VisitorResponse getNext() { if (params.getLocalDataHandler() == null) { throw new IllegalStateException("Data has been routed to external source for this visitor"); } return params.getLocalDataHandler().getNext(); } @Override public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException { if (params.getLocalDataHandler() == null) { throw new IllegalStateException("Data has been routed to external source for this visitor"); } return params.getLocalDataHandler().getNext(timeoutMilliseconds); } /** * For unit test purposes only, not to be used by any external parties. * @return true if destroy() has been--or is being--invoked. */ public boolean isDestroying() { synchronized (completionMonitor) { return destroying; } } @Override public void destroy() { log.log(Level.FINE, () -> sessionName + ": synchronous destroy() called"); try { synchronized (progress.getToken()) { synchronized (completionMonitor) { // If we are destroying the session before it has completed (e.g. because // waitUntilDone timed out or an interactive visiting was interrupted) // set us to aborted state so that we'll cease sending new visitors. if (!done) { transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion")); } } } synchronized (completionMonitor) { assert(!destroying) : "Attempted to destroy VisitorSession more than once"; destroying = true; while (!done) { completionMonitor.wait(); } } } catch (InterruptedException e) { log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed"); } finally { try { sender.destroy(); receiver.destroy(); } catch (Exception e) { log.log(Level.SEVERE, "Caught exception destroying communication interfaces", e); } log.log(Level.FINE, () -> sessionName + ": synchronous destroy() done"); } } }