aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
blob: 18fa7d94117b55cc3cccf6a09c6a18c6c2ee751c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client.impl;

import ai.vespa.feed.client.DocumentId;
import ai.vespa.feed.client.FeedClient;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

class HttpRequestStrategyTest {

    @Test
    void testConcurrency() {
        int documents = 1 << 16;
        HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
        HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS));

        HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
                                                                                .setConnectionsPerEndpoint(1 << 10)
                                                                                .setMaxStreamPerConnection(1 << 12),
                                                               cluster);
        CountDownLatch latch = new CountDownLatch(1);
        new Thread(() -> {
            try {
                while ( ! latch.await(1, TimeUnit.SECONDS))
                    System.err.println(cluster.stats().inflight());
            }
            catch (InterruptedException ignored) { }
        }).start();
        long startNanos = System.nanoTime();
        for (int i = 0; i < documents; i++)
            strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request);

        strategy.await();
        latch.countDown();
        executor.shutdown();
        cluster.close();
        OperationStats stats = cluster.stats();
        long successes = stats.responsesByCode().get(200);
        System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
        System.err.println(stats);

        assertEquals(documents, stats.requests());
        assertEquals(documents, stats.responses());
        assertEquals(documents, stats.responsesByCode().get(200));
        assertEquals(0, stats.inflight());
        assertEquals(0, stats.exceptions());
        assertEquals(0, stats.bytesSent());
        assertEquals(2 * documents, stats.bytesReceived());
    }

    @Test
    void testRetries() throws ExecutionException, InterruptedException {
        int minStreams = 16; // Hard limit for minimum number of streams per connection.
        MockCluster cluster = new MockCluster();
        AtomicLong now = new AtomicLong(0);
        CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
        HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
                                                                                .setRetryStrategy(new FeedClient.RetryStrategy() {
                                                                                    @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
                                                                                    @Override public int retries() { return 1; }
                                                                                })
                                                                                .setCircuitBreaker(breaker)
                                                                                .setConnectionsPerEndpoint(1)
                                                                                .setMaxStreamPerConnection(minStreams),
                                                               new BenchmarkingCluster(cluster));
        OperationStats initial = strategy.stats();

        DocumentId id1 = DocumentId.of("ns", "type", "1");
        DocumentId id2 = DocumentId.of("ns", "type", "2");
        HttpRequest request = new HttpRequest("POST", "/", null, null, null);

        // Runtime exception is not retried.
        cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
        ExecutionException expected = assertThrows(ExecutionException.class,
                                                   () -> strategy.enqueue(id1, request).get());
        assertTrue(expected.getCause() instanceof FeedException);
        assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
        assertEquals(1, strategy.stats().requests());

        // IOException is retried.
        cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
        expected = assertThrows(ExecutionException.class,
                                () -> strategy.enqueue(id1, request).get());
        assertEquals("retry me", expected.getCause().getCause().getMessage());
        assertEquals(3, strategy.stats().requests());

        // Successful response is returned
        HttpResponse success = HttpResponse.of(200, null);
        cluster.expect((__, vessel) -> vessel.complete(success));
        assertEquals(success, strategy.enqueue(id1, request).get());
        assertEquals(4, strategy.stats().requests());

        // Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
        now.set(2000);
        HttpResponse throttled = HttpResponse.of(429, null);
        AtomicInteger count = new AtomicInteger(3);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
        cluster.expect((req, vessel) -> {
            if (req == request) {
                if (count.decrementAndGet() > 0)
                    vessel.complete(throttled);
                else {
                    completion.set(vessel);
                    latch.countDown();
                }
            }
            else vessel.complete(success);
        });
        CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
        CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
        assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
        latch.await();
        assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
        now.set(4000);
        assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
        completion.get().complete(success);
        assertEquals(success, delayed.get());
        assertEquals(success, serialised.get());

        // Some error responses are retried.
        HttpResponse serverError = HttpResponse.of(500, null);
        cluster.expect((__, vessel) -> vessel.complete(serverError));
        assertEquals(serverError, strategy.enqueue(id1, request).get());
        assertEquals(11, strategy.stats().requests());
        assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.

        // Error responses are not retried when not of appropriate type.
        cluster.expect((__, vessel) -> vessel.complete(serverError));
        assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
        assertEquals(12, strategy.stats().requests());

        // Some error responses are not retried.
        HttpResponse badRequest = HttpResponse.of(400, null);
        cluster.expect((__, vessel) -> vessel.complete(badRequest));
        assertEquals(badRequest, strategy.enqueue(id1, request).get());
        assertEquals(13, strategy.stats().requests());

        // Circuit breaker opens some time after starting to fail.
        now.set(6000);
        assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
        now.set(605000);
        assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.

        strategy.destroy();
        OperationStats stats = strategy.stats();
        Map<Integer, Long> codes = new HashMap<>();
        codes.put(200, 4L);
        codes.put(400, 1L);
        codes.put(429, 2L);
        codes.put(500, 3L);
        assertEquals(codes, stats.responsesByCode());
        assertEquals(3, stats.exceptions());

        assertEquals(stats, stats.since(initial));
        assertEquals(0, stats.since(stats).averageLatencyMillis());
        assertEquals(0, stats.since(stats).requests());
        assertEquals(0, stats.since(stats).bytesSent());
    }

    @Test
    void testShutdown() {
        MockCluster cluster = new MockCluster();
        AtomicLong now = new AtomicLong(0);
        CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
        HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
                                                                                .setRetryStrategy(new FeedClient.RetryStrategy() {
                                                                                    @Override public int retries() { return 1; }
                                                                                })
                                                                                .setCircuitBreaker(breaker)
                                                                                .setConnectionsPerEndpoint(1),
                                                               new BenchmarkingCluster(cluster));

        DocumentId id1 = DocumentId.of("ns", "type", "1");
        DocumentId id2 = DocumentId.of("ns", "type", "2");
        DocumentId id3 = DocumentId.of("ns", "type", "3");
        DocumentId id4 = DocumentId.of("ns", "type", "4");
        DocumentId id5 = DocumentId.of("ns", "type", "5");
        HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
        HttpRequest partial = new HttpRequest("POST", "/", null, null, null);
        HttpRequest request = new HttpRequest("POST", "/", null, null, null);
        HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);

        // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
        Phaser phaser = new Phaser(2);
        Phaser blocker = new Phaser(2);
        cluster.expect((req, vessel) -> {
            if (req == blocking) {
                phaser.arriveAndAwaitAdvance();  // Synchronise with test main thread, and then ...
                blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue.
                throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything.
            }
            else if (req == partial) {
                phaser.arriveAndAwaitAdvance();  // Let test thread enqueue more ops before failing (and retrying) this.
                vessel.completeExceptionally(new IOException("failed"));
            }
            else if (req == failing) {
                System.err.println("failing");
                vessel.completeExceptionally(new RuntimeException("fatal"));
            }
        });
        // inflight completes dispatch, but causes no response.
        CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request);
        // serialised 1 and 2 are waiting for the above inflight to complete.
        CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request);
        CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request);
        CompletableFuture<HttpResponse> retried = strategy.enqueue(id2, partial);
        CompletableFuture<HttpResponse> failed = strategy.enqueue(id3, failing);
        CompletableFuture<HttpResponse> blocked = strategy.enqueue(id4, blocking);
        CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request);
        phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async.
        // failed immediately fails, and lets us assert the above retry is indeed enqueued.
        assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal",
                     assertThrows(ExecutionException.class, failed::get).getMessage());
        phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.

        // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it;
        //                blocked is blocking dispatch, delayed is enqueued, waiting for dispatch;
        //                failed has a partial result, and has a retry in the dispatch queue.
        assertFalse(inflight.isDone());
        assertFalse(serialised1.isDone());
        assertFalse(serialised2.isDone());
        assertTrue(failed.isDone());
        assertFalse(retried.isDone());
        assertFalse(blocked.isDone());
        assertFalse(delayed.isDone());

        // Kill dispatch thread, and see that all enqueued operations, and new ones, complete.
        blocker.arriveAndAwaitAdvance();
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, inflight::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, serialised1::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, serialised2::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, blocked::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, delayed::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
                     assertThrows(ExecutionException.class, retried::get).getMessage());
        assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
                     assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());
    }

    static class MockCluster implements Cluster {

        final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();

        void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
            dispatch.set(expected);
        }

        @Override
        public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
            dispatch.get().accept(request, vessel);
        }

    }

}