aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/ContentPolicy.java
blob: 61d2873b8743231c04717e2f60d912c18b848f1c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;

import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingContext;
import com.yahoo.messagebus.routing.RoutingNodeIterator;
import com.yahoo.messagebus.routing.VerbatimDirective;
import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.config.content.DistributionConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
 * Routing policy to determine which distributor in a content cluster to send data to.
 * Using different key=value parameters separated by semicolon (";"), the user can control which cluster to send to.
 *
 * cluster=[clusterName] (Mandatory, determines the cluster name)
 * config=[config] (Optional, a comma separated list of config servers to use. Used to talk to clusters not defined in this vespa application)
 * clusterconfigid=[id] (Optional, use given config id for distribution instead of default)
 *
 * @author Haakon Humberset
 */
public class ContentPolicy extends SlobrokPolicy {

    private static final Logger log = Logger.getLogger(ContentPolicy.class.getName());
    public static final String owningBucketStates = "uim";
    private static final String upStates = "ui";

    /** This class merely generates a slobrok host pattern for a given distributor. */
    static class SlobrokHostPatternGenerator {

        private final String base;

        SlobrokHostPatternGenerator(String clusterName) {
            this.base = "storage/cluster." + clusterName + "/distributor/";
        }

        /**
         * Find host pattern of the hosts that are valid targets for this request.
         *
         * @param distributor Set to null if any distributor is valid target.
         */
        String getDistributorHostPattern(Integer distributor) {
            return base + (distributor == null ? "*" : distributor) + "/default";
        }

    }

    /** Helper class to match a host pattern with node to use. */
    public abstract static class HostFetcher {

        private static class Targets {
            private final AtomicReference<List<Integer>> list = new AtomicReference<>();
            final int total;
            Targets() {
                this(List.of(), 0);
            }
            Targets(List<Integer> list, int total) {
                this.list.set(List.copyOf(list));
                this.total = Math.max(1, total);
            }
            Integer get(Random randomizer) {
                List<Integer> snapshot = list.get();
                return snapshot.get(randomizer.nextInt(snapshot.size()));
            }
            synchronized void remove(Integer v) {
                List<Integer> snapshot = list.get();
                if (snapshot.contains(v)) {
                    list.set(snapshot.stream().filter((item) -> !v.equals(item)).toList());
                }
            }
            int size() {
                return list.get().size();
            }
        }

        private final int requiredUpPercentageToSendToKnownGoodNodes;
        private final AtomicReference<Targets> validTargets = new AtomicReference<>(new Targets());
        protected final Random randomizer = new Random(12345); // Use same randomizer each time to make unit testing easy.

        protected HostFetcher(int percent) {
            requiredUpPercentageToSendToKnownGoodNodes = percent;
        }

        void updateValidTargets(ClusterState state) {
            List<Integer> validRandomTargets = new ArrayList<>();
            for (int i=0; i<state.getNodeCount(NodeType.DISTRIBUTOR); ++i) {
                if (state.getNodeState(new Node(NodeType.DISTRIBUTOR, i)).getState().oneOf(upStates)) validRandomTargets.add(i);
            }
            validTargets.set(new Targets(validRandomTargets, state.getNodeCount(NodeType.DISTRIBUTOR)));
        }
        public abstract String getTargetSpec(Integer distributor, RoutingContext context);
        String getRandomTargetSpec(RoutingContext context) {
            Targets targets = validTargets.get();
            // Try to use list of random targets, if at least X % of the nodes are up
            while (100 * targets.size() >= requiredUpPercentageToSendToKnownGoodNodes * targets.total)
            {
                Integer distributor = targets.get(randomizer);
                String targetSpec = getTargetSpec(distributor, context);
                if (targetSpec != null) {
                    context.trace(3, "Sending to random node seen up in cluster state");
                    return targetSpec;
                }
                targets.remove(distributor);
            }
            context.trace(3, "Too few nodes seen up in state. Sending totally random.");
            return getTargetSpec(null, context);
        }
        public void close() {}
    }

    /** Host fetcher using a slobrok mirror to find the hosts. */
    public static class SlobrokHostFetcher extends HostFetcher {
        private final SlobrokHostPatternGenerator patternGenerator;
        private final SlobrokPolicy policy;

        SlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
            super(percent);
            this.patternGenerator = patternGenerator;
            this.policy = policy;
        }

        private List<Mirror.Entry> getEntries(String hostPattern, RoutingContext context) {
            return policy.lookup(context, hostPattern);
        }

        private String convertSlobrokNameToSessionName(String slobrokName) { return slobrokName + "/default"; }

        public IMirror getMirror(RoutingContext context) { return context.getMirror(); }

        @Override
        public String getTargetSpec(Integer distributor, RoutingContext context) {
            List<Mirror.Entry> arr = getEntries(patternGenerator.getDistributorHostPattern(distributor), context);
            if (arr.isEmpty()) return null;
            if (distributor != null) {
                if (arr.size() == 1) {
                    return convertSlobrokNameToSessionName(arr.get(0).getSpecString());
                } else {
                    log.log(Level.WARNING, "Got " + arr.size() + " matches for a distributor.");
                }
            } else {
                return convertSlobrokNameToSessionName(arr.get(randomizer.nextInt(arr.size())).getSpecString());
            }
            return null;
        }
    }

    static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {

        /**
         * Distributor index to resolved RPC spec cache for a single given Slobrok
         * update generation. Uses a thread safe COW map which will grow until stable.
         */
        private static class GenerationCache {
            private final int generation;
            private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();

            GenerationCache(int generation) {
                this.generation = generation;
            }

            public int generation() { return this.generation; }

            public String get(Integer index) {
                return targets.get(index);
            }
            public void put(Integer index, String target) {
                targets.put(index, target);
            }
        }

        private final AtomicReference<GenerationCache> generationCache = new AtomicReference<>(null);

        TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, SlobrokPolicy policy, int percent) {
            super(patternGenerator, policy, percent);
        }

        @Override
        public String getTargetSpec(Integer distributor, RoutingContext context) {
            GenerationCache cache = generationCache.get();
            int currentGeneration = getMirror(context).updates();
            // The below code might race with other threads during a generation change. That is OK, as the cache
            // is thread safe and will quickly converge to a stable state for the new generation.
            if (cache == null || currentGeneration != cache.generation()) {
                cache = new GenerationCache(currentGeneration);
                generationCache.set(cache);
            }
            if (distributor != null) {
                return cachingGetTargetSpec(distributor, context, cache);
            }
            // Wildcard lookup case. Must not be cached.
            return super.getTargetSpec(null, context);
        }

        private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
            String cachedTarget = cache.get(distributor);
            if (cachedTarget != null) {
                return cachedTarget;
            }
            // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
            // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
            // next invocation.
            String resolvedTarget = super.getTargetSpec(distributor, context);
            cache.put(distributor, resolvedTarget);
            return resolvedTarget;
        }

    }

    /**
     * Tracks "instability" across nodes based on number of failures received versus some
     * implementation-specific limit.
     *
     * Implementations must be thread-safe.
     *
     * TODO should ideally be protected, but there's a package mismatch between policy classes and its tests
     */
    public interface InstabilityChecker {
        boolean tooManyFailures(int nodeIndex);
        void addFailure(Integer calculatedDistributor);
    }

    /** Class that tracks a failure of a given type per node. */
    public static class PerNodeCountingInstabilityChecker implements InstabilityChecker {
        private final List<Integer> nodeFailures = new CopyOnWriteArrayList<>();
        private final int failureLimit;

        public PerNodeCountingInstabilityChecker(int failureLimit) {
            this.failureLimit = failureLimit;
        }

        @Override
        public boolean tooManyFailures(int nodeIndex) {
            if (nodeFailures.size() > nodeIndex && nodeFailures.get(nodeIndex) > failureLimit) {
                nodeFailures.set(nodeIndex, 0);
                return true;
            } else {
                return false;
            }
        }

        @Override
        public void addFailure(Integer calculatedDistributor) {
            while (nodeFailures.size() <= calculatedDistributor) {
                nodeFailures.add(0);
            }
            nodeFailures.set(calculatedDistributor, nodeFailures.get(calculatedDistributor) + 1);
        }
    }

    /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
    public static class Parameters {

        protected final String clusterName;
        protected final String distributionConfigId;
        protected final DistributionConfig distributionConfig;
        protected final SlobrokHostPatternGenerator slobrokHostPatternGenerator;

        public Parameters(Map<String, String> params) {
            this(params, null);
        }

        private Parameters(Map<String, String> params, DistributionConfig config) {
            clusterName = params.get("cluster");
            if (clusterName == null)
                throw new IllegalArgumentException("Required parameter 'cluster', the name of the content cluster, not set");
            distributionConfig = config;
            if (distributionConfig != null && distributionConfig.cluster(clusterName) == null)
                throw new IllegalArgumentException("Distribution config for cluster '" + clusterName + "' not found");
            distributionConfigId = params.get("clusterconfigid"); // TODO jonmv: remove
            slobrokHostPatternGenerator = createPatternGenerator();
        }

        private String getDistributionConfigId() {
            return distributionConfigId == null ? clusterName : distributionConfigId;
        }
        public String getClusterName() {
            return clusterName;
        }
        public SlobrokHostPatternGenerator createPatternGenerator() {
            return new SlobrokHostPatternGenerator(getClusterName());
        }
        public HostFetcher createHostFetcher(SlobrokPolicy policy, int percent) {
            return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy, percent);
        }
        public Distribution createDistribution(SlobrokPolicy policy) {
            return distributionConfig == null ? new Distribution(getDistributionConfigId())
                                              : new Distribution(distributionConfig.cluster(clusterName));
        }
        public InstabilityChecker createInstabilityChecker() {
            return new PerNodeCountingInstabilityChecker(getAttemptRandomOnFailuresLimit());
        }

        /**
         * When we have gotten this amount of failures from a node (Any kind of failures). We try to send to a random other node, just to see if the
         * failure was related to node being bad. (Hard to detect from failure)
         */
        int getAttemptRandomOnFailuresLimit() { return 5; }

        /**
         * If we receive more than this number of wrong distribution replies with old cluster states, we throw the current cached state and takes the
         * old one. This guards us against version resets.
         */
        int maxOldClusterStatesSeenBeforeThrowingCachedState() { return 20; }

        /**
         * When getting new cluster states we update good nodes. If we have more than this percentage of up nodes, we send to up nodes instead of totally random.
         * (To avoid hitting trashing bad nodes still in slobrok)
         */
        int getRequiredUpPercentageToSendToKnownGoodNodes() { return 60; }
    }

    /** Helper class to get the bucket identifier of a message. */
    public static class BucketIdCalculator {
        private static final BucketIdFactory factory = new BucketIdFactory();

        private BucketId getBucketId(Message msg) {
            switch (msg.getType()) {
                case DocumentProtocol.MESSAGE_PUTDOCUMENT:         return factory.getBucketId(((PutDocumentMessage)msg).getDocumentPut().getDocument().getId());
                case DocumentProtocol.MESSAGE_GETDOCUMENT:         return factory.getBucketId(((GetDocumentMessage)msg).getDocumentId());
                case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:      return factory.getBucketId(((RemoveDocumentMessage)msg).getDocumentId());
                case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:      return factory.getBucketId(((UpdateDocumentMessage)msg).getDocumentUpdate().getId());
                case DocumentProtocol.MESSAGE_GETBUCKETLIST:       return ((GetBucketListMessage)msg).getBucketId();
                case DocumentProtocol.MESSAGE_STATBUCKET:          return ((StatBucketMessage)msg).getBucketId();
                case DocumentProtocol.MESSAGE_CREATEVISITOR:       return ((CreateVisitorMessage)msg).getBuckets().get(0);
                case DocumentProtocol.MESSAGE_REMOVELOCATION:      return ((RemoveLocationMessage)msg).getBucketId();
                default:
                    log.log(Level.SEVERE, "Message type '" + msg.getType() + "' not supported.");
                    return null;
            }
        }

        BucketId handleBucketIdCalculation(RoutingContext context) {
            BucketId id = getBucketId(context.getMessage());
            if (id == null || id.getRawId() == 0) {
                Reply reply = new EmptyReply();
                reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "No bucket id available in message."));
                context.setReply(reply);
            }
            return id;
        }
    }

    /** Class handling the logic of picking a distributor */
    public static class DistributorSelectionLogic {
        /** Message context class. Contains data we want to inspect about a request at reply time. */
        private static class MessageContext {
            final Integer calculatedDistributor;
            final ClusterState usedState;

            MessageContext(ClusterState usedState) {
                this(usedState, null);
            }
            MessageContext(ClusterState usedState, Integer calculatedDistributor) {
                this.calculatedDistributor = calculatedDistributor;
                this.usedState = usedState;
            }

            public String toString() {
                return "Context(Distributor " + calculatedDistributor +
                       ", state version " + usedState.getVersion() + ")";
            }
        }

        private final HostFetcher hostFetcher;
        private final Distribution distribution;
        private final InstabilityChecker persistentFailureChecker;
        private final AtomicReference<ClusterState> safeCachedClusterState = new AtomicReference<>(null);
        private final AtomicInteger oldClusterVersionGottenCount = new AtomicInteger(0);
        private final int maxOldClusterVersionBeforeSendingRandom; // Reset cluster version protection

        DistributorSelectionLogic(Parameters params, SlobrokPolicy policy) {
            try {
                hostFetcher = params.createHostFetcher(policy, params.getRequiredUpPercentageToSendToKnownGoodNodes());
                distribution = params.createDistribution(policy);
                persistentFailureChecker = params.createInstabilityChecker();
                maxOldClusterVersionBeforeSendingRandom = params.maxOldClusterStatesSeenBeforeThrowingCachedState();
            } catch (Throwable e) {
                destroy();
                throw e;
            }
        }

        public void destroy() {
            if (hostFetcher != null) {
                hostFetcher.close();
            }
            if (distribution != null) {
                distribution.close();
            }
        }

        String getTargetSpec(RoutingContext context, BucketId bucketId) {
            String sendRandomReason = null;
            ClusterState cachedClusterState = safeCachedClusterState.get();

            if (cachedClusterState != null) { // If we have a cached cluster state (regular case), we use that to calculate correct node.
                try{
                    Integer target = distribution.getIdealDistributorNode(cachedClusterState, bucketId, owningBucketStates);
                    // If we have had too many failures towards existing node, reset failure count and send to random
                    if (persistentFailureChecker.tooManyFailures(target)) {
                        sendRandomReason = "Too many failures detected versus distributor " + target + ". Sending to random instead of using cached state.";
                        target = null;
                    }
                    // If we have found a target, and the target exists in slobrok, send to it.
                    if (target != null) {
                        context.setContext(new MessageContext(cachedClusterState, target));
                        String targetSpec = hostFetcher.getTargetSpec(target, context);
                        if (targetSpec != null) {
                            if (context.shouldTrace(1)) {
                                context.trace(1, "Using distributor " + target + " for " +
                                        bucketId + " as our state version is " + cachedClusterState.getVersion());
                            }
                            return targetSpec;
                        } else {
                            sendRandomReason = "Want to use distributor " + target + " but it is not in slobrok. Sending to random.";
                            log.log(Level.FINE, "Target distributor is not in slobrok");
                        }
                    } else {
                        context.setContext(new MessageContext(cachedClusterState));
                    }
                } catch (Distribution.TooFewBucketBitsInUseException e) {
                    Reply reply = new WrongDistributionReply(cachedClusterState.toString(true));
                    reply.addError(new Error(DocumentProtocol.ERROR_WRONG_DISTRIBUTION,
                                             "Too few distribution bits used for given cluster state"));
                    context.setReply(reply);
                    return null;
                } catch (Distribution.NoDistributorsAvailableException e) {
                    log.log(Level.FINE, "No distributors available; clearing cluster state");
                    safeCachedClusterState.set(null);
                    sendRandomReason = "No distributors available. Sending to random distributor.";
                    context.setContext(createRandomDistributorTargetContext());
                }
            } else {
                context.setContext(createRandomDistributorTargetContext());
                sendRandomReason = "No cluster state cached. Sending to random distributor.";
            }
            if (context.shouldTrace(1)) {
                context.trace(1, sendRandomReason != null ? sendRandomReason : "Sending to random distributor for unknown reason");
            }
            return hostFetcher.getRandomTargetSpec(context);
        }

        private static MessageContext createRandomDistributorTargetContext() {
            return new MessageContext(null);
        }

        private static Optional<ClusterState> clusterStateFromReply(final WrongDistributionReply reply) {
            try {
                return Optional.of(new ClusterState(reply.getSystemState()));
            } catch (Exception e) {
                reply.getTrace().trace(1, "Error when parsing system state string " + reply.getSystemState());
                return Optional.empty();
            }
        }

        void handleWrongDistribution(WrongDistributionReply reply, RoutingContext routingContext) {
            final MessageContext context = (MessageContext) routingContext.getContext();
            final Optional<ClusterState> replyState = clusterStateFromReply(reply);
            if (!replyState.isPresent()) {
                return;
            }
            final ClusterState newState = replyState.get();
            resetCachedStateIfClusterStateVersionLikelyRolledBack(newState);
            markReplyAsImmediateRetryIfNewStateObserved(reply, context, newState);

            if (context.calculatedDistributor == null) {
                traceReplyFromRandomDistributor(reply, newState);
            } else {
                traceReplyFromSpecificDistributor(reply, context, newState);
            }
            updateCachedRoutingStateFromWrongDistribution(context, newState);
        }

        private void updateCachedRoutingStateFromWrongDistribution(MessageContext context, ClusterState newState) {
            ClusterState cachedClusterState = safeCachedClusterState.get();
            if (cachedClusterState == null || newState.getVersion() >= cachedClusterState.getVersion()) {
                safeCachedClusterState.set(newState);
                if (newState.getClusterState().equals(State.UP)) {
                    hostFetcher.updateValidTargets(newState);
                }
            } else if (newState.getVersion() + 2000000000 < cachedClusterState.getVersion()) {
                safeCachedClusterState.set(null);
            } else if (context.calculatedDistributor != null) {
                persistentFailureChecker.addFailure(context.calculatedDistributor);
            }
        }

        private void traceReplyFromSpecificDistributor(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
            if (context.usedState == null) {
                String msg = "Used state must be set as distributor is calculated. Bug.";
                reply.getTrace().trace(1, msg);
                log.log(Level.SEVERE, msg);
            } else if (newState.getVersion() == context.usedState.getVersion()) {
                String msg = "Message sent to distributor " + context.calculatedDistributor +
                             " retrieved cluster state version " + newState.getVersion() +
                             " which was the state we used to calculate distributor as target last time.";
                reply.getTrace().trace(1, msg);
                // Client load can be rejected towards distributors even with a matching cluster state version.
                // This usually happens during a node fail-over transition, where the target distributor will
                // reject an operation bound to a particular bucket if it does not own the bucket in _both_
                // the current and the next (transition target) state. Since it can happen during normal operation
                // and will happen per client operation, we keep this as debug level to prevent spamming the logs.
                log.log(Level.FINE, msg);
            } else if (newState.getVersion() > context.usedState.getVersion()) {
                if (reply.getTrace().shouldTrace(1)) {
                    reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
                            " updated cluster state from version " + context.usedState.getVersion() +
                            " to " + newState.getVersion());
                }
            } else {
                if (reply.getTrace().shouldTrace(1)) {
                    reply.getTrace().trace(1, "Message sent to distributor " + context.calculatedDistributor +
                            " returned older cluster state version " + newState.getVersion());
                }
            }
        }

        private void resetCachedStateIfClusterStateVersionLikelyRolledBack(ClusterState newState) {
            ClusterState cachedClusterState = safeCachedClusterState.get();
            if (cachedClusterState != null && cachedClusterState.getVersion() > newState.getVersion()) {
                if (oldClusterVersionGottenCount.incrementAndGet() >= maxOldClusterVersionBeforeSendingRandom) {
                    oldClusterVersionGottenCount.set(0);
                    safeCachedClusterState.set(null);
                }
            }
        }

        private void markReplyAsImmediateRetryIfNewStateObserved(WrongDistributionReply reply, MessageContext context, ClusterState newState) {
            if (context.usedState != null && newState.getVersion() <= context.usedState.getVersion()) {
                if (reply.getRetryDelay() <= 0.0) {
                    reply.setRetryDelay(-1);
                }
            } else {
                if (reply.getRetryDelay() <= 0.0) {
                    reply.setRetryDelay(0);
                }
            }
        }

        private void traceReplyFromRandomDistributor(WrongDistributionReply reply, ClusterState newState) {
            if (!reply.getTrace().shouldTrace(1)) {
                return;
            }
            ClusterState cachedClusterState = safeCachedClusterState.get();
            if (cachedClusterState == null) {
                reply.getTrace().trace(1, "Message sent to * with no previous state, received version " + newState.getVersion());
            } else if (newState.getVersion() == cachedClusterState.getVersion()) {
                reply.getTrace().trace(1, "Message sent to * found that cluster state version " + newState.getVersion() + " was correct.");
            } else if (newState.getVersion() > cachedClusterState.getVersion()) {
                reply.getTrace().trace(1, "Message sent to * updated cluster state to version " + newState.getVersion());
            } else {
                reply.getTrace().trace(1, "Message sent to * retrieved older cluster state version " + newState.getVersion());
            }
        }

        /**
         * Returns whether a given error Reply should be counted towards potentially ignoring the cached
         * cluster state and triggering a random send (and thus likely WrongDistributionReply with the
         * current cluster state). Certain error codes may be used frequently by the content layer for
         * purposes that do _not_ indicate that a change in cluster state may have happened, and should
         * therefore not be counted for this purpose:
         *  - ERROR_TEST_AND_SET_CONDITION_FAILED: may happen for any mutating operation that has an
         *    associated TaS condition. Technically an APP_FATAL_ERROR since resending doesn't make sense.
         *  - ERROR_BUSY: may happen for concurrent mutations and if distributors are in the process of
         *    changing bucket ownership and the grace period hasn't passed yet.
         */
        private static boolean shouldCountAsErrorForRandomSendTrigger(Reply reply) {
            if (reply.getNumErrors() != 1) {
                return !reply.hasErrors(); // For simplicity, count any reply with > 1 error.
            }
            var error = reply.getError(0);
            switch (error.getCode()) {
                // TODO this feels like a layering violation, but we use DocumentProtocol directly in other places in this policy anyway...
                case DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED:
                case DocumentProtocol.ERROR_BUSY:
                    return false;
                default: return true;
            }
        }

        void handleErrorReply(Reply reply, Object untypedContext) {
            MessageContext messageContext = (MessageContext) untypedContext;
            if (messageContext.calculatedDistributor != null) {
                if (shouldCountAsErrorForRandomSendTrigger(reply)) {
                    persistentFailureChecker.addFailure(messageContext.calculatedDistributor);
                }
                if (reply.getTrace().shouldTrace(1)) {
                    reply.getTrace().trace(1, "Failed with " + messageContext.toString());
                }
            }
        }
    }

    private final BucketIdCalculator bucketIdCalculator = new BucketIdCalculator();
    private final DistributorSelectionLogic distributorSelectionLogic;
    private final Parameters parameters;

    /** Constructor used in production. */
    public ContentPolicy(String param, DistributionConfig config) {
        this(new Parameters(parse(param), config));
    }

    /** Constructor specifying a bit more in detail, so we can override what needs to be overridden in tests */
    public ContentPolicy(Parameters p) {
        super();
        parameters = p;
        distributorSelectionLogic = new DistributorSelectionLogic(parameters, this);
    }

    @Override
    public void select(RoutingContext context) {
        if (context.shouldTrace(1)) {
            context.trace(1, "Selecting route");
        }

        BucketId bucketId = bucketIdCalculator.handleBucketIdCalculation(context);
        if (context.hasReply()) return;

        String targetSpec = distributorSelectionLogic.getTargetSpec(context, bucketId);
        if (context.hasReply()) return;
        if (targetSpec != null) {
            Route route = new Route(context.getRoute());
            route.setHop(0, new Hop().addDirective(new VerbatimDirective(targetSpec)));
            context.addChild(route);
        } else {
            context.setError(ErrorCode.NO_ADDRESS_FOR_SERVICE,
                             "Could not resolve any distributors to send to in cluster " + parameters.clusterName);
        }
    }

    @Override
    public void merge(RoutingContext context) {
        RoutingNodeIterator it = context.getChildIterator();
        Reply reply = (it.hasReply()) ? it.removeReply() : context.getReply();
        if (reply == null) {
            reply = new EmptyReply();
            reply.addError(new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
                    "No reply in any children, nor in the routing context: " + context));
        }

        if (reply instanceof WrongDistributionReply) {
            distributorSelectionLogic.handleWrongDistribution((WrongDistributionReply) reply, context);
        } else if (reply.hasErrors()) {
            distributorSelectionLogic.handleErrorReply(reply, context.getContext());
        } else if (reply instanceof WriteDocumentReply) {
            if (context.shouldTrace(9)) {
                context.trace(9, "Modification timestamp: " + ((WriteDocumentReply)reply).getHighestModificationTimestamp());
            }
        }
        context.setReply(reply);
    }

    @Override
    public void destroy() {
        distributorSelectionLogic.destroy();
    }
}