aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
blob: 1c729008e2cc4740ac20108abd42ac62edfa23ff (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus;

import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorDataQueue;
import com.yahoo.documentapi.VisitorIterator;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import java.util.logging.Level;
import com.yahoo.messagebus.DestinationSession;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.routing.RoutingTable;
import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vdslib.state.ClusterState;

import java.util.Arrays;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

/**
 * <p>
 * A visitor session for tracking progress for and potentially receiving data from
 * a visitor using a MessageBus source and destination session. The source session
 * is used to initiate visiting by sending create visitor messages to storage and
 * the destination session is used for receiving progress. If the visitor is not
 * set up to send data to a remote destination, data will also be received through
 * the destination session.
 * </p>
 * <p>
 * Create the visitor session by calling the
 * <code>DocumentAccess.createVisitorSession</code> method.
 * </p>
 */
public class MessageBusVisitorSession implements VisitorSession {
    /**
     * Abstract away notion of source session into a generic Sender
     * interface to allow easy mocking.
     */
    public interface Sender {
        Result send(Message msg);
        int getPendingCount();
        void destroy();
    }

    public interface SenderFactory {
        Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters);
    }

    /**
     * Abstract away notion of destination session into a generic Receiver
     * interface to allow easy mocking.
     * The implementation must be thread safe since reply() can be invoked
     * from an arbitrary thread.
     */
    public interface Receiver {
        void reply(Reply reply);
        void destroy();
        /**
         * Get connection spec that can be used by other clients to send
         * messages to this Receiver.
         * @return connection spec
         */
        String getConnectionSpec();
    }

    public interface ReceiverFactory {
        Receiver createReceiver(MessageHandler messageHandler, String sessionName);
    }

    public interface AsyncTaskExecutor {
        void submitTask(Runnable event);
        void scheduleTask(Runnable event, long delay, TimeUnit unit);
    }

    public interface Clock {
        long monotonicNanoTime();
    }

    public static class VisitingProgress {
        private final VisitorIterator iterator;
        private final ProgressToken token;

        public VisitingProgress(VisitorIterator iterator, ProgressToken token) {
            this.iterator = iterator;
            this.token = token;
        }

        public VisitorIterator getIterator() {
            return iterator;
        }

        public ProgressToken getToken() {
            return token;
        }
    }

    public enum State {
        NOT_STARTED(false),
        WORKING(false),
        COMPLETED(false),
        ABORTED(true),
        FAILED(true),
        TIMED_OUT(true);

        private final boolean failure;
        State(boolean failure) {
            this.failure = failure;
        }

        public boolean isFailure() {
            return failure;
        }
    }

    public class StateDescription {
        private final State state;
        private final String description;

        public StateDescription(State state, String description) {
            this.state = state;
            this.description = description;
        }

        public StateDescription(State state) {
            this.state = state;
            this.description = "";
        }

        public State getState() {
            return state;
        }

        public String getDescription() {
            return description;
        }

        VisitorControlHandler.CompletionCode toCompletionCode() {
            switch (state) {
                case COMPLETED: return VisitorControlHandler.CompletionCode.SUCCESS;
                case ABORTED:   return VisitorControlHandler.CompletionCode.ABORTED;
                case FAILED:    return VisitorControlHandler.CompletionCode.FAILURE;
                case TIMED_OUT: return VisitorControlHandler.CompletionCode.TIMEOUT;
                default:
                    throw new IllegalStateException("Current state did not have a valid value: " + state);
            }
        }

        public boolean failed() {
            return state.isFailure();
        }

        public String toString() {
            return state + ": " + description;
        }
    }

    /**
     * Message bus implementations of interfaces
     */

    public static class MessageBusSender implements Sender {
        private final SourceSession sourceSession;

        public MessageBusSender(SourceSession sourceSession) {
            this.sourceSession = sourceSession;
        }

        @Override
        public Result send(Message msg) {
            return sourceSession.send(msg);
        }

        @Override
        public int getPendingCount() {
            return sourceSession.getPendingCount();
        }

        @Override
        public void destroy() {
            sourceSession.destroy();
        }
    }

    public static class MessageBusSenderFactory implements SenderFactory {

        private final MessageBus messageBus;

        public MessageBusSenderFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private SourceSessionParams createSourceSessionParams(VisitorParameters visitorParameters) {
            SourceSessionParams sourceParams = new SourceSessionParams();

            if (visitorParameters.getThrottlePolicy() != null) {
                sourceParams.setThrottlePolicy(visitorParameters.getThrottlePolicy());
            } else {
                sourceParams.setThrottlePolicy(new DynamicThrottlePolicy());
            }

            return sourceParams;
        }

        @Override
        public Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters) {
            SourceSessionParams sessionParams = createSourceSessionParams(visitorParameters);
            return new MessageBusSender(messageBus.createSourceSession(replyHandler, sessionParams));
        }
    }

    public static class MessageBusReceiver implements Receiver {
        private final DestinationSession destinationSession;

        public MessageBusReceiver(DestinationSession destinationSession) {
            this.destinationSession = destinationSession;
        }

        @Override
        public void reply(Reply reply) {
            destinationSession.reply(reply);
        }

        @Override
        public void destroy() {
            destinationSession.destroy();
        }

        @Override
        public String getConnectionSpec() {
            return destinationSession.getConnectionSpec();
        }
    }

    public static class MessageBusReceiverFactory implements ReceiverFactory {
        private final MessageBus messageBus;

        public MessageBusReceiverFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private DestinationSessionParams createDestinationParams(MessageHandler messageHandler, String visitorName) {
            DestinationSessionParams destparams = new DestinationSessionParams();
            destparams.setName(visitorName);
            destparams.setBroadcastName(false);
            destparams.setMessageHandler(messageHandler);
            return destparams;
        }

        @Override
        public Receiver createReceiver(MessageHandler messageHandler, String sessionName) {
            DestinationSessionParams destinationParams = createDestinationParams(messageHandler, sessionName);
            return new MessageBusReceiver(messageBus.createDestinationSession(destinationParams));
        }
    }

    public static class ThreadAsyncTaskExecutor implements AsyncTaskExecutor {
        private final ScheduledExecutorService executor;

        public ThreadAsyncTaskExecutor(ScheduledExecutorService executor) {
            this.executor = executor;
        }

        @Override
        public void submitTask(Runnable task) {
            executor.submit(task);
        }

        @Override
        public void scheduleTask(Runnable task, long delay, TimeUnit unit) {
            executor.schedule(task, delay, unit);
        }
    }

    public static class RealClock implements Clock {
        @Override
        public long monotonicNanoTime() {
            return System.nanoTime();
        }
    }

    private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName());

    private static final AtomicLong sessionCounter = new AtomicLong(0);
    private static long getNextSessionId() {
        return sessionCounter.incrementAndGet();
    }
    private static String createSessionName() {
        StringBuilder sb = new StringBuilder();
        sb.append("visitor-").append(getNextSessionId()).append('-').append(System.currentTimeMillis());
        return sb.toString();
    }

    private final VisitorParameters params;
    private final Sender sender;
    private final Receiver receiver;
    private final AsyncTaskExecutor taskExecutor;
    private final VisitingProgress progress;
    private final VisitorStatistics statistics;
    private final String sessionName = createSessionName();
    private final String dataDestination;
    private final Clock clock;
    private final Object replyTrackingMonitor = new Object();
    private StateDescription state;
    private long visitorCounter = 0;
    private long startTimeNanos = 0;
    private long scheduledHandleReplyTasks = 0; // Must be protected by replyTrackingMonitor
    private boolean scheduledSendCreateVisitors = false;
    private boolean done = false;
    private boolean destroying = false; // For testing and sanity checking
    private final Object completionMonitor = new Object();
    private final Trace trace;
    /**
     * We keep our own track of pending messages since the sender's pending
     * count cannot be relied on in an async task execution context. This
     * because it is decremented before the message is actually processed.
     */
    private int pendingMessageCount = 0;

    public MessageBusVisitorSession(VisitorParameters visitorParameters,
                                    AsyncTaskExecutor taskExecutor,
                                    SenderFactory senderFactory,
                                    ReceiverFactory receiverFactory,
                                    RoutingTable routingTable)
            throws ParseException
    {
        this(visitorParameters, taskExecutor, senderFactory,
             receiverFactory, routingTable, new RealClock());
    }

    public MessageBusVisitorSession(VisitorParameters visitorParameters,
                                    AsyncTaskExecutor taskExecutor,
                                    SenderFactory senderFactory,
                                    ReceiverFactory receiverFactory,
                                    RoutingTable routingTable,
                                    Clock clock)
            throws ParseException
    {
        this.params = visitorParameters; // TODO(vekterli): make copy? legacy impl does not copy
        initializeRoute(routingTable);
        this.sender = senderFactory.createSender(createReplyHandler(), this.params);
        this.receiver = receiverFactory.createReceiver(createMessageHandler(), sessionName);
        this.taskExecutor = taskExecutor;
        this.progress = createVisitingProgress(params);
        this.statistics = new VisitorStatistics();
        this.state = new StateDescription(State.NOT_STARTED);
        this.clock = clock;
        initializeHandlers();
        trace = new Trace(visitorParameters.getTraceLevel());
        dataDestination = (params.getLocalDataHandler() == null
                ? params.getRemoteDataHandler()
                : receiver.getConnectionSpec());

        validateSessionParameters();

        // If we're already done, no need to do anything at all!
        if (progress.getIterator().isDone()) {
            markSessionCompleted();
        }
    }

    public static MessageBusVisitorSession createForMessageBus(final MessageBus mbus,
                                                               final ScheduledExecutorService scheduledExecutorService,
                                                               final VisitorParameters params) throws ParseException {
        final AsyncTaskExecutor executor = new ThreadAsyncTaskExecutor(scheduledExecutorService);
        final MessageBusSenderFactory senderFactory = new MessageBusSenderFactory(mbus);
        final MessageBusReceiverFactory receiverFactory = new MessageBusReceiverFactory(mbus);
        final RoutingTable table = mbus.getRoutingTable(DocumentProtocol.NAME);

        return new MessageBusVisitorSession(params, executor, senderFactory, receiverFactory, table);
    }

    private void validateSessionParameters() {
        if (dataDestination == null) {
            throw new IllegalStateException("No data destination specified");
        }
    }

    public void start() {
        synchronized (progress.getToken()) {
            this.startTimeNanos = clock.monotonicNanoTime();
            if (progress.getIterator().isDone()) {
                log.log(Level.FINE, () -> sessionName + ": progress token indicates " +
                        "session is done before it could even start; no-op");
                return;
            }
            transitionTo(new StateDescription(State.WORKING));
            taskExecutor.submitTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(0)));
        }
    }

    private void updateStateUnlessAlreadyFailed(StateDescription newState) {
        if (!state.failed()) {
            state = newState;
        } // else: don't override existing failure state
    }

    /**
     * Attempt to transition to a new state. Depending on the current state,
     * some transitions may be disallowed, such as transitioning from ABORTED
     * to COMPLETED, since failures take precedence. Transitioning multiple
     * times to the same state is a no-op in order to conserve the textual
     * description given by the first transition to said state (which most
     * likely is the most useful one for the end-user).
     *
     * @param newState State to attempt to transition to.
     * @return State which is current after the transition. If transition was
     *   successful, will be equal to newState.
     */
    private StateDescription transitionTo(StateDescription newState) {
        log.log(Level.FINE, () -> sessionName + ": attempting transition to state " + newState);
        switch (newState.getState()) {
            case WORKING:
                assert(state.getState() == State.NOT_STARTED);
                state = newState;
                break;
            case ABORTED:
                state = newState;
                break;
            case COMPLETED:
            case FAILED:
            case TIMED_OUT:
                updateStateUnlessAlreadyFailed(newState);
                break;
            default:
                com.yahoo.protect.Process.logAndDie("Invalid target transition state: " + newState);
        }
        log.log(Level.FINE, () -> "Session '" + sessionName + "' is now in state " +  state);
        return state;
    }

    private boolean hasScheduledHandleReplyTask() {
        // This is synchronized instead of an AtomicLong simply because it makes it considerably
        // easier to reason about happens-before relationships, memory visibility and sequencing
        // of events across threads when an actual critical section is involved.
        synchronized (replyTrackingMonitor) {
            return scheduledHandleReplyTasks != 0;
        }
    }

    private void incrementScheduledHandleReplyTasks() {
        synchronized (replyTrackingMonitor) {
            ++scheduledHandleReplyTasks;
        }
    }

    private void decrementScheduleHandleReplyTasks() {
        synchronized (replyTrackingMonitor) {
            assert(scheduledHandleReplyTasks > 0);
            --scheduledHandleReplyTasks;
        }
    }

    private ReplyHandler createReplyHandler() {
        return (reply) -> {
            // Generally, handleReply will run in the context of the
            // underlying transport layer's processing thread(s), so we
            // schedule our own reply handling task to avoid blocking it.
            try {
                // Make concurrent reply handling visible in sender thread, if it's active.
                // See SendCreateVisitorsTask.run() for a rationale.
                incrementScheduledHandleReplyTasks();
                taskExecutor.submitTask(new HandleReplyTask(reply));
            } catch (RejectedExecutionException e) {
                decrementScheduleHandleReplyTasks();
                 // We cannot reliably handle reply tasks failing to be submitted, since
                 // the reply task performs all our internal state handling logic. As such,
                 // we just immediately go into a failure destruction mode as soon as this
                 // happens, in which we do not wait for any active messages to be replied
                 // to.
                log.log(Level.WARNING, "Visitor session '" + sessionName +
                        "': failed to submit reply task to executor service! " +
                        "Session cannot reliably continue; terminating it early.", e);

                synchronized (progress.getToken()) {
                    transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
                    if (!done) {
                        markSessionCompleted();
                    }
                }
            }
        };
    }

    private MessageHandler createMessageHandler() {
        return (message) -> {
            try {
                taskExecutor.submitTask(new HandleMessageTask(message));
            } catch (RejectedExecutionException e) {
                Reply reply = ((DocumentMessage)message).createReply();
                message.swapState(reply);
                reply.addError(new Error(
                        DocumentProtocol.ERROR_ABORTED,
                        "Visitor session has been aborted"));
                receiver.reply(reply);
            }
        };
    }

    private void initializeRoute(RoutingTable routingTable) {
        // If no cluster route has been set by user arguments, attempt to retrieve it from mbus config.
        if (params.getRoute() == null || !params.getRoute().hasHops()) {
            params.setRoute(getClusterRoute(routingTable));
            log.log(Level.FINE, () -> "No route specified; resolved implicit " +
                    "storage cluster: " + params.getRoute().toString());
        }
    }

    private String getClusterRoute(RoutingTable routingTable) throws IllegalArgumentException{
        String route = null;
        for (RoutingTable.RouteIterator it = routingTable.getRouteIterator();
             it.isValid(); it.next())
        {
            String str = it.getName();
            if (str.startsWith("storage/cluster.")) {
                if (route != null) {
                    throw new IllegalArgumentException(
                            "There are multiple storage clusters in your application, " +
                                    "please specify which one to visit.");
                }
                route = str;
            }
        }
        if (route == null) {
            throw new IllegalArgumentException("No storage cluster found in your application.");
        }
        return route;
    }

    /**
     * Called from the constructor to ensure control and data handlers
     * are set and initialized.
     */
    private void initializeHandlers() {
        if (this.params.getLocalDataHandler() != null) {
            this.params.getLocalDataHandler().reset();
            this.params.getLocalDataHandler().setSession(this);
        } else if (this.params.getRemoteDataHandler() == null) {
            this.params.setLocalDataHandler(new VisitorDataQueue());
            this.params.getLocalDataHandler().setSession(this);
        }

        if (params.getControlHandler() != null) {
            params.getControlHandler().reset();
        } else {
            params.setControlHandler(new VisitorControlHandler());
        }
        params.getControlHandler().setSession(this);
    }

    private VisitingProgress createVisitingProgress(VisitorParameters params)
            throws ParseException
    {
        ProgressToken progressToken;
        if (params.getResumeToken() != null) {
            progressToken = params.getResumeToken();
        } else {
            progressToken = new ProgressToken();
        }
        VisitorIterator visitorIterator;

        if (params.getBucketsToVisit() == null
            || params.getBucketsToVisit().isEmpty())
        {
            // Use 1 distribution bit as a starting point. This will almost certainly
            // trigger a ERROR_WRONG_DISTRIBUTION reply immediately, meaning that we'll
            // get a fresh system state from the start. Since no buckets should ever
            // return with a OK result in such a case, we recognize this as a special
            // case in the iterator and simply reset its entire internal state using
            // the new db count rather than doing any splitting.
            BucketIdFactory bucketIdFactory = new BucketIdFactory();
            visitorIterator = VisitorIterator.createFromDocumentSelection(
                    params.getDocumentSelection(),
                    bucketIdFactory,
                    1,
                    progressToken,
                    params.getSlices(),
                    params.getSliceId());
        } else {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "parameters specify explicit bucket set " +
                        "to visit; using it rather than document selection (" +
                        params.getBucketsToVisit().size() + " buckets given)");
            }
            // Allow override of target buckets iff an explicit set of buckets
            // to visit is given by the visitor parameters. This was primarily
            // used for the defunct synchronization functionality, but since it's
            // so easy to support, don't deprecate it just yet.
            visitorIterator = VisitorIterator.createFromExplicitBucketSet(
                    params.getBucketsToVisit(),
                    1,
                    progressToken);
        }
        return new VisitingProgress(visitorIterator, progressToken);
    }

    private class SendCreateVisitorsTask implements Runnable {
        // All private methods in this task must be protected by a lock around
        // the progress token!

        private final long messageTimeoutMs;

        SendCreateVisitorsTask(long messageTimeoutMs) {
            this.messageTimeoutMs = messageTimeoutMs;
        }

        private String getNextVisitorId() {
            StringBuilder sb = new StringBuilder();
            ++visitorCounter;
            sb.append(sessionName).append('-').append(visitorCounter);
            return sb.toString();
        }

        @SuppressWarnings("removal") // TODO: Remove on Vespa 9
        private CreateVisitorMessage createMessage(VisitorIterator.BucketProgress bucket) {
            CreateVisitorMessage msg = new CreateVisitorMessage(
                    params.getVisitorLibrary(),
                    getNextVisitorId(),
                    receiver.getConnectionSpec(),
                    dataDestination);

            msg.getTrace().setLevel(params.getTraceLevel());
            msg.setTimeRemaining(messageTimeoutMs);
            msg.setBuckets(Arrays.asList(bucket.getSuperbucket(), bucket.getProgress()));
            msg.setDocumentSelection(params.getDocumentSelection());
            msg.setBucketSpace(params.getBucketSpace());
            msg.setFromTimestamp(params.getFromTimestamp());
            msg.setToTimestamp(params.getToTimestamp());
            msg.setMaxPendingReplyCount(params.getMaxPending());
            msg.setFieldSet(params.fieldSet());
            msg.setVisitInconsistentBuckets(params.visitInconsistentBuckets());
            msg.setVisitRemoves(params.visitRemoves());
            msg.setParameters(params.getLibraryParameters());
            msg.setRoute(params.getRoute());
            msg.setMaxBucketsPerVisitor(params.getMaxBucketsPerVisitor());
            msg.setPriority(params.getPriority()); // TODO: remove on Vespa 9

            msg.setRetryEnabled(false);

            return msg;
        }

        public void run() {
            // Must sync around token as legacy API exposes it to handlers
            // and they expect to be able to sync around it.
            synchronized (progress.getToken()) {
                try {
                    scheduledSendCreateVisitors = false;
                    if (done) {
                        return; // Session already closed; we must not touch anything else.
                    }
                    // We both send requests and process replies in the context of a dedicated task executor pool.
                    // However, MessageBus sending and reply receiving happens in the context of entirely
                    // separate threads. If the backend responds very quickly to visitor requests (such as
                    // if buckets are empty), this can leave us in the following awkward position:
                    //
                    //   1. Replies arrive from backend, open up the throttle window, reply handling
                    //      task gets pushed onto executor queue (but not yet executed).
                    //   2. Send loop below continuously get a free send slot, keeps sending visitors
                    //      and filling up the set of pending buckets in the progress token.
                    //   3. Since visitor session is busy-looping in the send task, reply processing is
                    //      consequently entirely starved until the MessageBus throttle window is bursting
                    //      at the seams. This can effectively nullify the effects of the throttling policy,
                    //      especially if it's dynamic. But a static throttle policy with a sufficiently
                    //      high max window size will also potentially cause a runaway visitor train since
                    //      the active window size keeps getting decreased by backend replies.
                    //
                    // To get around this, we explicitly check for concurrently scheduled message handling
                    // tasks from the transport layer, breaking the loop if at least one handler has been
                    // scheduled. This also has the (positive) effect of draining all reply tasks before we
                    // start sending more work downstream.
                    //
                    // Since visitor session progress is edge-triggered and progresses exclusively by sending
                    // new visitors in reply handling tasks, it's critical that we never end up in a situation
                    // where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively
                    // hanging the session. We must therefore be very careful that we only exit the send loop
                    // if we _know_ we have at least one pending task enqueued that will ensure session progress.
                    //
                    // We're holding the session (token) lock around checking the pending reply tasks count, so
                    // if we observe a change we know that a reply task must have been scheduled and that its
                    // processing must take place sequenced after we have exited the loop, as the reply handling
                    // also takes the session (token) lock. I.e. it should not be possible to end up in a
                    // situation where we stall session progress due to not having any further event edges.
                    while (progress.getIterator().hasNext() && !hasScheduledHandleReplyTask()) {
                        VisitorIterator.BucketProgress bucket = progress.getIterator().getNext();
                        Result result = sender.send(createMessage(bucket));
                        if (result.isAccepted()) {
                            log.log(Level.FINE, () -> sessionName + ": sent CreateVisitor for bucket " +
                                    bucket.getSuperbucket() + " with progress " + bucket.getProgress());
                            ++pendingMessageCount;
                        } else {
                            // Must reinsert bucket without progress into iterator since
                            // we failed to send visitor.
                            progress.getIterator().update(bucket.getSuperbucket(), bucket.getProgress());
                            break;
                        }
                    }
                } catch (Exception e) {
                    String msg = "Got exception of type " + e.getClass().getName() +
                            " with message '" + e.getMessage() +
                            "' while attempting to send visitors";
                    log.log(Level.WARNING, msg);
                    transitionTo(new StateDescription(State.FAILED, msg));
                    // It's likely that the exception caused a failure to send a
                    // visitor message, meaning we won't get a reply task in the
                    // future from which we can execute logic to complete the
                    // session. Thusly, we have to do this here and now.
                    continueVisiting();
                } catch (Throwable t) {
                    // We can't reliably handle this; take a nosedive
                    com.yahoo.protect.Process.logAndDie("Caught unhandled error when trying to send visitors", t);
                }
            }
        }
    }

    private void continueVisiting() {
        if ( ! scheduleSendCreateVisitorsIfApplicable() && visitingCompleted()) {
            markSessionCompleted();
        }
    }

    private void markSessionCompleted() {
        // 'done' is only ever written when token mutex is held, so safe to check
        // outside of completionMonitor lock.
        log.log(Level.FINE, () -> "Visitor session '" + sessionName + "' has completed");
        if (params.getLocalDataHandler() != null) {
            params.getLocalDataHandler().onDone();
        }
        // If skipFatalErrors is set and a fatal error did occur, fail
        // the session now with the first encountered error message.
        if (progress.getToken().containsFailedBuckets()) {
            transitionTo(new StateDescription(State.FAILED, progress.getToken().getFirstErrorMsg()));
        }
        // NOTE: transitioning to COMPLETED will not override a failure
        // state, so it's safe to always do this.
        transitionTo(new StateDescription(State.COMPLETED));
        params.getControlHandler().onDone(state.toCompletionCode(), state.getDescription());
        synchronized (completionMonitor) {
            done = true;
            completionMonitor.notifyAll();
        }
    }

    private class HandleReplyTask implements Runnable {
        private final Reply reply;
        HandleReplyTask(Reply reply) {
            this.reply = reply;
        }

        @Override
        public void run() {
            synchronized (progress.getToken()) {
                // Decrement pending replies inside same lock as sender task to ensure that if the sender
                // observes a non-zero number of reply tasks, it's guaranteed that this actually means a
                // task _will_ be run later at some point.
                decrementScheduleHandleReplyTasks();
                try {
                    assert(pendingMessageCount > 0);
                    --pendingMessageCount;
                    if (reply.hasErrors()) {
                        handleErrorReply(reply);
                    } else if (reply instanceof CreateVisitorReply) {
                        handleCreateVisitorReply((CreateVisitorReply)reply);
                    } else {
                        String msg = "Received reply we do not know how to handle: " +
                                reply.getClass().getName();
                        log.log(Level.SEVERE, msg);
                        transitionTo(new StateDescription(State.FAILED, msg));
                    }
                } catch (Exception e) {
                    String msg = "Got exception of type " + e.getClass().getName() +
                            " with message '" + e.getMessage() +
                            "' while processing reply in visitor session";
                    log.log(Level.WARNING, msg, e);
                    transitionTo(new StateDescription(State.FAILED, msg));
                } catch (Throwable t) {
                    // We can't reliably handle this; take a nosedive
                    com.yahoo.protect.Process.logAndDie("Caught unhandled error when running reply task", t);
                } finally {
                    continueVisiting();
                }
            }
        }
    }

    private class HandleMessageTask implements Runnable {
        private final Message message;

        private HandleMessageTask(Message message) {
            this.message = message;
        }

        @Override
        public void run() {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "Visitor session " + sessionName + ": Received message " + message);
            }
            try {
                if (message instanceof VisitorInfoMessage) {
                    handleVisitorInfoMessage((VisitorInfoMessage)message); // always replies
                } else {
                    handleDocumentMessage((DocumentMessage)message); // always replies on error
                }
            } catch (Throwable t) {
                com.yahoo.protect.Process.logAndDie("Caught unhandled error when processing message", t);
            }
        }
    }

    private void handleMessageProcessingException(Reply reply, Exception e, String what) {
        String errorDesc = formatProcessingException(e, what);
        String fullMsg = formatIdentifyingVisitorErrorString(errorDesc);
        log.log(Level.SEVERE, fullMsg, e);
        int errorCode;
        synchronized (progress.getToken()) {
            if (!params.skipBucketsOnFatalErrors()) {
                errorCode = ErrorCode.APP_FATAL_ERROR;
                transitionTo(new StateDescription(State.FAILED, errorDesc));
            } else {
                errorCode = DocumentProtocol.ERROR_UNPARSEABLE;
            }
        }
        reply.addError(new Error(errorCode, errorDesc));
    }

    private String formatProcessingException(Exception e, String whileProcessing) {
        return String.format(
                    "Got exception of type %s with message '%s' while processing %s",
                    e.getClass().getName(),
                    e.getMessage(),
                    whileProcessing);
    }

    private String formatIdentifyingVisitorErrorString(String details) {
        return String.format(
                    "Visitor %s (selection '%s'): %s",
                    sessionName,
                    params.getDocumentSelection(),
                    details);
    }

    /**
     * NOTE: not called from within lock, function must take lock itself
     */
    private void handleVisitorInfoMessage(VisitorInfoMessage msg) {

        Reply reply = msg.createReply();
        msg.swapState(reply);

        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + sessionName +
                    ": Received VisitorInfo with " +
                    msg.getFinishedBuckets().size() + " finished buckets");
        }

        try {
            if (msg.getErrorMessage().length() > 0) {
                params.getControlHandler().onVisitorError(msg.getErrorMessage());
            }
            synchronized (progress.getToken()) {
                // NOTE: control handlers shall sync on token themselves if
                // they want to access it, but recursive locking is OK and by
                // always locking we make screwing it up harder.
                if (!isDone()) {
                    params.getControlHandler().onProgress(progress.getToken());
                } else {
                    reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "Visitor has been shut down"));
                }
            }
        } catch (Exception e) {
            handleMessageProcessingException(reply, e, "VisitorInfoMessage");
        }  finally {
            receiver.reply(reply);
        }
    }

    private void handleDocumentMessage(DocumentMessage msg) {
        Reply reply = msg.createReply();
        msg.swapState(reply);

        if (params.getLocalDataHandler() == null) {
            log.log(Level.SEVERE, sessionName + ": Got visitor data back to client with no local data destination.");
            reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "Visitor data with no local data destination"));
            receiver.reply(reply);
            return;
        }
        try {
            params.getLocalDataHandler().onMessage(msg, new AckToken(reply));
        } catch (Exception e) {
            handleMessageProcessingException(reply, e, "DocumentMessage");
            // Immediately reply since we cannot count on AckToken being registered
            receiver.reply(reply);
        }
    }

    private boolean isFatalError(Reply reply) {
        Error error = reply.getError(0);
        switch (error.getCode()) {
            case ErrorCode.TIMEOUT:
            case DocumentProtocol.ERROR_BUCKET_NOT_FOUND:
            case DocumentProtocol.ERROR_WRONG_DISTRIBUTION:
                return false;
        }
        return error.isFatal();
    }

    /**
     * Return whether a (transient) error shall be exempt from visitor
     * error reporting. This to prevent spamming handlers and output with
     * errors for things that are happening naturally in the system.
     * @return true if the error should be reported
     */
    private boolean shouldReportError(Reply reply) {
        Error error = reply.getError(0);
        switch (error.getCode()) {
            case DocumentProtocol.ERROR_BUCKET_NOT_FOUND:
            case DocumentProtocol.ERROR_BUCKET_DELETED:
                return false;
        }
        return true;
    }

    private static String getErrorMessage(Error r) {
        return DocumentProtocol.getErrorName(r.getCode()) + ": " + r.getMessage();
    }

    private static boolean isErrorOfType(Reply reply, int errorCode) {
        return reply.getError(0).getCode() == errorCode;
    }

    private void reportVisitorError(String message) {
        params.getControlHandler().onVisitorError(message);
    }

    private void handleErrorReply(Reply reply) {
        CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage();
        // Must reset bucket progress back to what it was before sending.
        BucketId bucket = msg.getBuckets().get(0);
        BucketId subProgress = msg.getBuckets().get(1);
        progress.getIterator().update(bucket, subProgress);

        String message = getErrorMessage(reply.getError(0));
        log.log(Level.FINE, () -> sessionName + ": received error reply for bucket " +
                bucket + " with message '" + message + "'");

        if (isFatalError(reply)) {
            if (params.skipBucketsOnFatalErrors()) {
                markBucketProgressAsFailed(bucket, subProgress, message);
            } else {
                reportVisitorError(message);
                transitionTo(new StateDescription(State.FAILED, message));
                return; // no additional visitors will be scheduled post-failure
            }
        }
        if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) {
            handleWrongDistributionReply((WrongDistributionReply) reply);
        } else {
            if (shouldReportError(reply)) {
                reportVisitorError(message);
            }
            // Wait 100ms before new visitor task is executed. Will prevent
            // visitors from being scheduled from caller.
            scheduleSendCreateVisitorsIfApplicable(100, TimeUnit.MILLISECONDS);
        }
    }

    private void markBucketProgressAsFailed(BucketId bucket, BucketId subProgress, String message) {
        progress.getToken().addFailedBucket(bucket, subProgress, message);
        progress.getIterator().update(bucket, ProgressToken.FINISHED_BUCKET);
    }

    private boolean enoughHitsReceived() {
        return params.getMaxTotalHits() != -1 && (statistics.getDocumentsReturned() >= params.getMaxTotalHits());
    }

    /**
     * A session is considered completed if one or more of the following holds true:
     *   - All buckets have been visited (i.e. no active or pending visitors).
     *   - Visiting has failed fatally (or has been aborted) AND there are no
     *     active visitors remaining. 'Active' here means that we're waiting
     *     for a reply.
     *   - We have received sufficient number of documents (set via visitor
     *     parameters) from the buckets already visited AND there are no
     *     active visitors remaining.
     * @return true if visiting has completed, false otherwise
     */
    private boolean visitingCompleted() {
        return (pendingMessageCount == 0)
                && (progress.getIterator().isDone()
                    || state.failed()
                    || enoughHitsReceived());
    }

    private long messageTimeoutMillis() {
        return !isInfiniteTimeout(params.getTimeoutMs()) ? Math.max(1, params.getTimeoutMs()) : 5 * 60 * 1000;
    }

    private long sessionTimeoutMillis() {
        return params.getSessionTimeoutMs();
    }

    private long elapsedTimeMillis() {
        return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos);
    }

    private static boolean isInfiniteTimeout(long timeoutMillis) {
        return timeoutMillis < 0;
    }

    private long computeBoundedMessageTimeoutMillis(long elapsedMs) {
        final long messageTimeoutMillis = messageTimeoutMillis();
        return ! isInfiniteTimeout(sessionTimeoutMillis())
               ? Math.min(Math.max(1, sessionTimeoutMillis() - elapsedMs),
                          messageTimeoutMillis)
               : messageTimeoutMillis;
    }

    /**
     * Schedule a new SendCreateVisitors task iff there are still buckets to
     * visit, the visiting has not failed fatally and we haven't already
     * scheduled such a task. Return whether a visitor was scheduled here.
     */
    private boolean scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) {
        final long elapsedMillis = elapsedTimeMillis();
        if (!isInfiniteTimeout(sessionTimeoutMillis()) && (elapsedMillis >= sessionTimeoutMillis())) {
            transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", sessionTimeoutMillis())));
        }
        if (!mayScheduleCreateVisitorsTask() || visitingCompleted()) {
            return false;
        }
        final long messageTimeoutMillis = computeBoundedMessageTimeoutMillis(elapsedMillis);
        taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMillis), delay, unit);
        scheduledSendCreateVisitors = true;
        return true;
    }

    private boolean mayScheduleCreateVisitorsTask() {
        return ! (scheduledSendCreateVisitors
                  || !progress.getIterator().hasNext()
                  || state.failed()
                  || enoughHitsReceived());
    }

    private boolean scheduleSendCreateVisitorsIfApplicable() {
        return scheduleSendCreateVisitorsIfApplicable(0, TimeUnit.MILLISECONDS);
    }

    private void handleCreateVisitorReply(CreateVisitorReply reply) {
        CreateVisitorMessage msg = (CreateVisitorMessage)reply.getMessage();

        BucketId superbucket = msg.getBuckets().get(0);
        BucketId subBucketProgress = reply.getLastBucket();

        log.log(Level.FINE, () -> sessionName + ": received CreateVisitorReply for bucket " +
                superbucket + " with progress " + subBucketProgress);

        progress.getIterator().update(superbucket, subBucketProgress);
        params.getControlHandler().onProgress(progress.getToken());
        statistics.add(reply.getVisitorStatistics());
        params.getControlHandler().onVisitorStatistics(statistics);
        // A visitor session might be long lived so we need a safeguard against blowing the memory if tracing
        // has been enabled.
        if ( ! reply.getTrace().getRoot().isEmpty() && (trace.getRoot().getNumChildren() < 1000)) {
            trace.getRoot().addChild(reply.getTrace().getRoot());
        }

    }

    private void handleWrongDistributionReply(WrongDistributionReply reply) {
        try {
            ClusterState newState = new ClusterState(reply.getSystemState());
            int stateBits = newState.getDistributionBitCount();
            if (stateBits != progress.getIterator().getDistributionBitCount()) {
                log.log(Level.FINE, () -> "System state changed; now at " +
                        stateBits + " distribution bits");
                // Update the internal state of the visitor iterator. If we're increasing
                // the number of distribution bits, this may lead to splitting of pending
                // buckets. If we're decreasing, it may lead to merging of pending buckets
                // and potential loss of sub-bucket progress. In either way, the iterator
                // will not let any new buckets out before all active buckets have been
                // updated.
                progress.getIterator().setDistributionBitCount(stateBits);
            }
        } catch (Exception e) {
            log.log(Level.SEVERE, "Failed to parse new system state string: "
                    + reply.getSystemState());
            transitionTo(new StateDescription(State.FAILED, "Failed to parse cluster state '"
                    + reply.getSystemState() + "'"));
        }
    }

    public String getSessionName() {
        return sessionName;
    }

    @Override
    public boolean isDone() {
        synchronized (progress.getToken()) {
            return done;
        }
    }

    @Override
    public ProgressToken getProgress() {
        return progress.getToken();
    }

    @Override
    public Trace getTrace() {
        return trace;
    }

    @Override
    public boolean waitUntilDone(long timeoutMs) throws InterruptedException {
        return params.getControlHandler().waitUntilDone(timeoutMs);
    }

    @Override
    public void ack(AckToken token) {
        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + sessionName +
                    ": Sending ack " + token.ackObject);
        }
        // No locking here; replying should be thread safe in itself
        receiver.reply((Reply)token.ackObject);
    }

    @Override
    public void abort() {
        synchronized (progress.getToken()) {
            transitionTo(new StateDescription(State.ABORTED, "Visitor aborted by user"));
        }
    }

    @Override
    public VisitorResponse getNext() {
        if (params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return params.getLocalDataHandler().getNext();
    }

    @Override
    public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException {
        if (params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return params.getLocalDataHandler().getNext(timeoutMilliseconds);
    }

    /**
     * For unit test purposes only, not to be used by any external parties.
     * @return true if destroy() has been--or is being--invoked.
     */
    public boolean isDestroying() {
        synchronized (completionMonitor) {
            return destroying;
        }
    }

    @Override
    public void destroy() {
        log.log(Level.FINE, () -> sessionName + ": synchronous destroy() called");
        try {
            synchronized (progress.getToken()) {
                synchronized (completionMonitor) {
                    // If we are destroying the session before it has completed (e.g. because
                    // waitUntilDone timed out or an interactive visiting was interrupted)
                    // set us to aborted state so that we'll cease sending new visitors.
                    if (!done) {
                        transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion"));
                    }
                }
            }
            synchronized (completionMonitor) {
                assert(!destroying) : "Attempted to destroy VisitorSession more than once";
                destroying = true;
                while (!done) {
                    completionMonitor.wait();
                }
            }
        } catch (InterruptedException e) {
            log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed");
        } finally {
            try {
                sender.destroy();
                receiver.destroy();
            } catch (Exception e) {
                log.log(Level.SEVERE, "Caught exception destroying communication interfaces", e);
            }
            log.log(Level.FINE, () -> sessionName + ": synchronous destroy() done");
        }
    }

}