summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
blob: 23569af3cdc10543eb3595a37ab569a1aa1348d9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;

import ai.vespa.feed.client.FeedClient.RetryStrategy;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Logger;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.logging.Level.INFO;

/**
 * Controls request execution and retries:
 * <ul>
 *     <li>Retry all IO exceptions; however</li>
 *     <li>abort everything if more than 10% of requests result in an exception for some time.</li>
 *     <li>Whenever throttled, limit inflight to one less than the current; and</li>
 *     <li>on every successful response, increase inflight limit by 0.1.</li>
 * </ul>
 *
 * @author jonmv
 */
class HttpRequestStrategy implements RequestStrategy {

    private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
    private static final double errorThreshold = 0.1;

    private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
    private final Object lock = new Object();
    private final RetryStrategy wrapped;
    private final long maxInflight;
    private final long minInflight;
    private double targetInflight;
    private long inflight = 0;
    private double errorRate = 0;
    private long consecutiveSuccesses = 0;
    private boolean failed = false;

    HttpRequestStrategy(FeedClientBuilder builder) {
        this.wrapped = builder.retryStrategy;
        this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
        this.minInflight = builder.maxConnections * (long) Math.min(16, builder.maxStreamsPerConnection);
        this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
    }

    /**
     * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
     * or the user has turned off retries for this type of operation.
     */
    private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
        failure();
        log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request +
                                    ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error");

        if ( ! (thrown instanceof IOException))
            return false;

        if (attempt > wrapped.retries())
            return false;


        switch (request.getMethod().toUpperCase()) {
            case "POST":   return wrapped.retry(FeedClient.OperationType.put);
            case "PUT":    return wrapped.retry(FeedClient.OperationType.update);
            case "DELETE": return wrapped.retry(FeedClient.OperationType.remove);
            default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
        }
    }

    /**
     * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the
     * error rate converge towards the true error rate, at a speed inversely proportional to the target number
     * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width.
     */
    void success() {
        synchronized (lock) {
            errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates.
            targetInflight = min(targetInflight + 0.1, maxInflight);
            ++consecutiveSuccesses;
        }
    }

    /**
     * Called whenever a failure to get a successful response is recorded.
     */
    void failure() {
        synchronized (lock) {
            errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates.
            if (errorRate > errorThreshold)
                failed = true;

            consecutiveSuccesses = 0;
        }
    }

    /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
    private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
        if (response.getCode() / 100 == 2) {
            success();
            return false;
        }

        log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
                            " at " + request + ", " + consecutiveSuccesses + " successes since last error");

        if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
            synchronized (lock) {
                targetInflight = max(inflight * 0.9, minInflight);
            }
            return true;
        }

        failure();
        return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors.
    }

    // Must hold lock.
    private void acquireSlot() {
        try {
            while (inflight >= targetInflight)
                lock.wait();

            ++inflight;
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    // Must hold lock.
    private void releaseSlot() {
        for (long i = --inflight; i < targetInflight; i++)
            lock.notify();
    }

    @Override
    public boolean hasFailed() {
        synchronized (lock) {
            return failed;
        }
    }

    @Override
    public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
                                                         BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) {
        CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
        CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
        CompletableFuture<Void> blocker = new CompletableFuture<>();              // Blocks the next operation with same doc-id, then triggers it when complete.

        // Get the previous inflight operation for this doc-id, or acquire a send slot.
        CompletableFuture<Void> previous;
        synchronized (lock) {
            previous = inflightById.put(documentId, blocker);
            if (previous == null)
                acquireSlot();
        }
        if (previous == null)   // Send immediately if none inflight ...
            dispatch.accept(request, vessel);
        else                    // ... or send when the previous inflight is done.
            previous.thenRun(() -> dispatch.accept(request, vessel));

        handleAttempt(vessel, dispatch, blocker, request, result, documentId, 1);
        return result;
    }

    /** Handles the result of one attempt at the given operation, retrying if necessary. */
    private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch,
                               CompletableFuture<Void> blocker, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result,
                               DocumentId documentId, int attempt) {
        vessel.whenComplete((response, thrown) -> {
            // Retry the operation if it failed with a transient error ...
            if ( ! failed && (thrown != null ? retry(request, thrown, attempt)
                                             : retry(request, response, attempt))) {
                    CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
                    dispatch.accept(request, retry);
                    handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1);
                    return;
                }

            // ... or accept the outcome and mark the operation as complete.
            CompletableFuture<Void> current;
            synchronized (lock) {
                current = inflightById.get(documentId);
                if (current == blocker) {   // Release slot and clear map if no other operations enqueued for this doc-id ...
                    releaseSlot();
                    inflightById.put(documentId, null);
                }
            }
            if (current != blocker)         // ... or trigger sending the next enqueued operation.
                blocker.complete(null);

            if (thrown == null) result.complete(response);
            else result.completeExceptionally(thrown);
        });
    }

}