summaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/IncompleteResultsThrottler.java
blob: 8721bfb53fa0fb779c6140f56f5249e63ce5a296 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.operationProcessor;

import com.yahoo.vespa.http.client.core.ThrottlePolicy;

import java.util.Random;

/**
 * Adjusts in-flight operations based on throughput. It will walk the graph and try to find
 * local optimum.
 *
 * It looks at the throughput, adjust max in-flight based on the previous throughput and settings.
 *
 * In the beginning it moves faster, and then stabilizes.
 *
 * It will wait a bit after adjusting before it starts to sample, since there is a latency between adjustment
 * and result.
 *
 * There are several mechanisms to reduce impact of several clients running in parallel. The window size has a
 * random part, and the wait time before sampling after adjustment has a random part as well.
 *
 * To avoid running wild with large values of max-in flight, it is tuned to stay on the smaller part, and
 * rather reduce max-in flight than to have a too large value.
 *
 * In case the where the queue is moved to minimum size, it will now and then increase queue size to get
 * more sample data and possibly grow size.
 *
 * Class is fully thread safe, i.e. all public methods are thread safe.
 *
 * @author dybis
 */
public class IncompleteResultsThrottler {
    private final ConcurrentDocumentOperationBlocker blocker = new ConcurrentDocumentOperationBlocker();
    private final int maxInFlightValue;
    private final int minInFlightValue;
    private final Random random = new Random();
    private final ThrottlePolicy policy;

    // 9-11 seconds with some randomness to avoid fully synchronous feeders.
    public final long phaseSizeMs = 9000 + (random.nextInt() % 2000);
    private final Clock clock;

    private final Object monitor = new Object();
    private long sampleStartTimeMs = 0;
    private int previousNumOk = 0;
    private int previousMaxInFlight = 0;
    private int stabilizingPhasesLeft = 0;
    private int adjustCycleCount = 0;
    private int maxInFlightNow;
    private int numOk = 0;
    private int minWindowSizeCounter = 0;
    private int minPermitsAvailable = 0;

    protected static int INITIAL_MAX_IN_FLIGHT_VALUE = 200;
    protected static int SECOND_MAX_IN_FLIGHT_VALUE = 270;
    private StringBuilder debugMessage = new StringBuilder();

    /**
     * Creates the throttler.
     * @param minInFlightValue the throttler will never throttle beyond this limit.
     * @param maxInFlightValue the throttler will never throttle above this limit. If zero, no limit.
     * @param clock use to calculate window size. Can be null if minWindowSize and maxInFlightValue are equal.
     * @param policy is the algorithm for finding next value of the number of in-flight documents operations.
     */
    public IncompleteResultsThrottler(int minInFlightValue, int maxInFlightValue, Clock clock, ThrottlePolicy policy) {
        this.maxInFlightValue = maxInFlightValue == 0 ? Integer.MAX_VALUE : maxInFlightValue;
        this.minInFlightValue = minInFlightValue == 0 ? this.maxInFlightValue : minInFlightValue;
        this.policy = policy;
        this.clock = clock;
        if (minInFlightValue != maxInFlightValue) {
            this.sampleStartTimeMs = clock.getTimeMillis();
        }
        setNewSemaphoreSize(INITIAL_MAX_IN_FLIGHT_VALUE);
    }

    public int availableCapacity() {
        return blocker.availablePermits();
    }

    public void operationStart() {
        try {
            blocker.startOperation();
        } catch (InterruptedException e) {
            // Ignore
        }
        if (maxInFlightValue != minInFlightValue) {
            synchronized (monitor) {
                adjustThrottling();
            }
        }
    }

    public String getDebugMessage() {
        synchronized (monitor) {
            return debugMessage.toString();
        }
    }

    public interface Clock {
        long getTimeMillis();
    }

    public void resultReady(boolean success) {
        blocker.operationDone();
        if (!success) {
            return;
        }
        synchronized (monitor) {
            numOk++;
            minPermitsAvailable = Math.min(minPermitsAvailable, blocker.availablePermits());
        }
    }

    // Only for testing
    protected int waitingThreads() {
        synchronized (monitor) {
            return maxInFlightNow - blocker.availablePermits();
        }
    }

    private double getCeilingDifferencePerformance(int adjustCycle) {
        // We want larger adjustments in the early phase.
        if (adjustCycle > 10) {
            return 0.7;
        }
        return 1.2;
    }

    private void adjustCycle() {
        adjustCycleCount++;
        stabilizingPhasesLeft = adjustCycleCount < 5 ? 1 : 2 + random.nextInt() % 2;

        double maxPerformanceChange = getCeilingDifferencePerformance(adjustCycleCount);
        boolean messagesQueued = minPermitsAvailable < 2;

        int newMaxInFlight = policy.calcNewMaxInFlight(
                maxPerformanceChange, numOk, previousNumOk, previousMaxInFlight, maxInFlightNow, messagesQueued);
        debugMessage = new StringBuilder();
        debugMessage.append("previousMaxInFlight: " + previousMaxInFlight
                + " maxInFlightNow: " + maxInFlightNow
                + " numOk: " + numOk + " " + " previousOk: " + previousNumOk
                + " new size is: " + newMaxInFlight);
        previousMaxInFlight = maxInFlightNow;
        previousNumOk = numOk;

        setNewSemaphoreSize(adjustCycleCount == 1 ? SECOND_MAX_IN_FLIGHT_VALUE : newMaxInFlight);
    }

    private void adjustThrottling() {
        if (clock.getTimeMillis() < sampleStartTimeMs + phaseSizeMs) {
            return;
        }
        sampleStartTimeMs += phaseSizeMs;

        if (stabilizingPhasesLeft-- == 0) {
            adjustCycle();
        }
        numOk = 0;
        this.minPermitsAvailable = maxInFlightNow;
    }

    private int tryBoostingSizeIfMinValueOverSeveralCycles(final int size) {
        if (size <= minInFlightValue) {
            minWindowSizeCounter++;
        } else {
            minWindowSizeCounter = 0;
        }
        if (minWindowSizeCounter == 4) {
            debugMessage.append(" (inc max in flight to get more data)");
            minWindowSizeCounter = 0;
            return size + 10;
        }
        return size;

    }

    private void setNewSemaphoreSize(final int size) {
        maxInFlightNow =
                Math.max(minInFlightValue, Math.min(
                        tryBoostingSizeIfMinValueOverSeveralCycles(size), maxInFlightValue));
        blocker.setMaxConcurrency(maxInFlightNow);
    }
}