aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
blob: 7ead0c4a37fe5e9b99156d84b9410ec388f7785f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.operationProcessor;

import com.google.common.collect.ArrayListMultimap;
import com.yahoo.vespa.http.client.FeedClient;
import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.SessionParams;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.communication.EndpointIOException;
import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.Exceptions;
import com.yahoo.vespa.http.client.core.communication.ClusterConnection;

import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Merges several endpointResult into one Result and does the callback.
 *
 * @author dybis
 */
public class OperationProcessor {

    private static final Logger log = Logger.getLogger(OperationProcessor.class.getName());
    private final Map<String, DocumentSendInfo> docSendInfoByOperationId = new LinkedHashMap<>();
    private final ArrayListMultimap<String, Document> blockedDocumentsByDocumentId = ArrayListMultimap.create();
    private final Set<String> inflightDocumentIds = new HashSet<>();
    private final int numDestinations;
    private final FeedClient.ResultCallback resultCallback;
    private final Object monitor = new Object();
    private final IncompleteResultsThrottler incompleteResultsThrottler;
    // Position in the array is cluster ID.
    private final List<ClusterConnection> clusters = new ArrayList<>();
    private final ScheduledThreadPoolExecutor timeoutExecutor;
    private final OperationStats operationStats;
    private final int maxRetries;
    private final long minTimeBetweenRetriesMs;
    private final Random random = new SecureRandom();
    private final int traceEveryXOperation;
    private final boolean blockOperationsToSameDocument;
    private int traceCounter = 0;
    private final boolean traceToStderr;
    private final ThreadGroup ioThreadGroup;
    private final String clientId = new BigInteger(130, random).toString(32);

    public OperationProcessor(
            IncompleteResultsThrottler incompleteResultsThrottler,
            FeedClient.ResultCallback resultCallback,
            SessionParams sessionParams,
            ScheduledThreadPoolExecutor timeoutExecutor) {
        this.numDestinations = sessionParams.getClusters().size();
        this.resultCallback = resultCallback;
        this.incompleteResultsThrottler = incompleteResultsThrottler;
        this.timeoutExecutor = timeoutExecutor;
        this.blockOperationsToSameDocument = sessionParams.getConnectionParams().isEnableV3Protocol();
        this.ioThreadGroup = new ThreadGroup("operationprocessor");

        if (sessionParams.getClusters().isEmpty()) {
            throw new IllegalArgumentException("Cannot feed to 0 clusters.");
        }

        for (Cluster cluster : sessionParams.getClusters()) {
            if (cluster.getEndpoints().isEmpty()) {
                throw new IllegalArgumentException("Cannot feed to empty cluster.");
            }
        }

        for (int i = 0; i < sessionParams.getClusters().size(); i++) {
            Cluster cluster = sessionParams.getClusters().get(i);

            clusters.add(new ClusterConnection(
                    this,
                    sessionParams.getFeedParams(),
                    sessionParams.getConnectionParams(),
                    sessionParams.getErrorReport(),
                    cluster,
                    i,
                    sessionParams.getClientQueueSize() / sessionParams.getClusters().size(),
                    timeoutExecutor));

            }
        operationStats = new OperationStats(sessionParams, clusters, incompleteResultsThrottler);
        maxRetries = sessionParams.getConnectionParams().getMaxRetries();
        minTimeBetweenRetriesMs = sessionParams.getConnectionParams().getMinTimeBetweenRetriesMs();
        traceEveryXOperation = sessionParams.getConnectionParams().getTraceEveryXOperation();
        traceToStderr = sessionParams.getConnectionParams().getPrintTraceToStdErr();
    }

    public ThreadGroup getIoThreadGroup() {
        return ioThreadGroup;
    }

    public int getIncompleteResultQueueSize() {
        synchronized (monitor) {
            return docSendInfoByOperationId.size();
        }
    }

    /** Returns the id of the oldest operation to be sent. */
    public Optional<String> oldestIncompleteResultId() {
        synchronized (monitor) {
            return docSendInfoByOperationId.isEmpty()
                    ? Optional.empty()
                    : Optional.of(docSendInfoByOperationId.keySet().iterator().next());
        }
    }

    public String getClientId() {
        return clientId;
    }

    private boolean retriedThis(EndpointResult endpointResult, DocumentSendInfo documentSendInfo, int clusterId) {
        final Result.Detail detail = endpointResult.getDetail();
        // If success, no retries to do.
        if (detail.getResultType() == Result.ResultType.OPERATION_EXECUTED) {
            return false;
        }

        int retries = documentSendInfo.incRetries(clusterId, detail);
        if (retries > maxRetries) {
            return false;
        }

        String exceptionMessage = detail.getException() == null ? "" : detail.getException().getMessage();
        if (exceptionMessage == null) {
            exceptionMessage = "";
        }
        // TODO: Return proper error code in structured data in next version of internal API.
        // Error codes from messagebus/src/cpp/messagebus/errorcode.h
        boolean retryThisOperation =
                detail.getResultType() == Result.ResultType.TRANSITIVE_ERROR ||
                exceptionMessage.contains("SEND_QUEUE_CLOSED") ||
                exceptionMessage.contains("ILLEGAL_ROUTE") ||
                exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") ||
                exceptionMessage.contains("NETWORK_ERROR") ||
                exceptionMessage.contains("SEQUENCE_ERROR") ||
                exceptionMessage.contains("NETWORK_SHUTDOWN") ||
                exceptionMessage.contains("TIMEOUT");

        if (retryThisOperation) {
            int waitTime = (int) (minTimeBetweenRetriesMs * (1 + random.nextDouble() / 3));
            log.finest("Retrying due to " + detail.toString() + " attempt " + retries
                    + " in " + waitTime + " ms.");
            timeoutExecutor.schedule(
                    () -> postToCluster(clusters.get(clusterId), documentSendInfo.getDocument()),
                    waitTime,
                    TimeUnit.MILLISECONDS);
            return true;
        }

        return false;
    }

    private Result process(EndpointResult endpointResult, int clusterId) {
        Result result;
        Document blockedDocumentToSend = null;
        synchronized (monitor) {
            if (!docSendInfoByOperationId.containsKey(endpointResult.getOperationId())) {
                log.finer("Received out-of-order or too late result, discarding: " + endpointResult);
                return null;
            }
            DocumentSendInfo documentSendInfo = docSendInfoByOperationId.get(endpointResult.getOperationId());

            if (retriedThis(endpointResult, documentSendInfo, clusterId)) {
                return null;
            }

            if (!documentSendInfo.addIfNotAlreadyThere(endpointResult.getDetail(), clusterId)) {
                // Duplicate message, we have seen this operation before.
                return null;
            }

            // Is this the last operation we are waiting for?
            if (documentSendInfo.detailCount() != numDestinations) {
                return null;
            }

            result = documentSendInfo.createResult();
            docSendInfoByOperationId.remove(endpointResult.getOperationId());

            String documentId = documentSendInfo.getDocument().getDocumentId();
            /**
             * If we got a pending operation against this document
             * dont't remove it from inflightDocuments and send blocked document operation
             */
            List<Document> blockedDocuments = blockedDocumentsByDocumentId.get(documentId);
            if (blockedDocuments.isEmpty()) {
                inflightDocumentIds.remove(documentId);
            } else {
                blockedDocumentToSend = blockedDocuments.remove(0);
            }
        }
        if (blockedDocumentToSend != null) {
            sendToClusters(blockedDocumentToSend);
        }
        return result;
    }

    public void resultReceived(EndpointResult endpointResult, int clusterId) {
        Result result = process(endpointResult, clusterId);

        if (result != null) {
            incompleteResultsThrottler.resultReady(result.isSuccess());
            resultCallback.onCompletion(result.getDocumentId(), result);
            if (traceToStderr && result.hasLocalTrace()) {
                System.err.println(result.toString());
            }
        }
    }

    public void onEndpointError(FeedEndpointException e) {
        resultCallback.onEndpointException(e);
    }

    public List<Exception> closeClusters() {
        List<Exception> exceptions = new ArrayList<>();
        // first, close cluster sessions and allow connections to drain normally
        for (ClusterConnection cluster : clusters) {
            try {
                cluster.close();
            } catch (Exception e) {
                exceptions.add(e);
            }
        }
        return exceptions;
    }

    public void sendDocument(Document document) {
        incompleteResultsThrottler.operationStart();

        synchronized (monitor) {
            if (blockOperationsToSameDocument && inflightDocumentIds.contains(document.getDocumentId())) {
                blockedDocumentsByDocumentId.put(document.getDocumentId(), document);
                return;
            }
            inflightDocumentIds.add(document.getDocumentId());
        }

        sendToClusters(document);
    }

    private void sendToClusters(Document document) {

        synchronized (monitor) {
            boolean traceThisDoc = traceEveryXOperation > 0 && traceCounter++ % traceEveryXOperation == 0;
            docSendInfoByOperationId.put(document.getOperationId(), new DocumentSendInfo(document, traceThisDoc));
        }

        for (ClusterConnection clusterConnection : clusters) {
            postToCluster(clusterConnection, document);
        }
    }

    private void postToCluster(ClusterConnection clusterConnection, Document document) {
        try {
            clusterConnection.post(document);
        } catch (EndpointIOException eio) {
            resultReceived(EndPointResultFactory.createError(eio.getEndpoint(),
                                                             document.getOperationId(),
                                                             eio),
                                                             clusterConnection.getClusterId());
        }
    }

    public String getStatsAsJson() {
        return operationStats.getStatsAsJson();
    }

    public void close() {
        List<Exception> exceptions = closeClusters();
        try {
            closeExecutor();
        } catch (InterruptedException e) {
            exceptions.add(e);
        }

        if (exceptions.isEmpty()) {
            return;
        }
        if (exceptions.size() == 1) {
            if (exceptions.get(0) instanceof RuntimeException) {
                throw (RuntimeException) exceptions.get(0);
            } else {
                throw new RuntimeException(exceptions.get(0));
            }
        }

        StringBuilder b = new StringBuilder();
        b.append("Exception thrown while closing one or more clusters: ");
        for (int i = 0; i < exceptions.size(); i++) {
            Exception e = exceptions.get(i);
            b.append(Exceptions.toMessageString(e));
            if (i != (exceptions.size() - 1)) {
                b.append(", ");
            }
        }
        throw new RuntimeException(b.toString(), exceptions.get(0));
    }

    private void closeExecutor() throws InterruptedException {
        log.log(Level.FINE, "Shutting down timeout executor.");
        timeoutExecutor.shutdownNow();

        log.log(Level.FINE, "Awaiting termination of already running timeout tasks.");
        if (! timeoutExecutor.awaitTermination(300, TimeUnit.SECONDS)) {
            log.severe("Did not manage to shut down the executors within 300 secs, system stuck?");
            throw new RuntimeException("Did not manage to shut down retry threads. Please report problem.");
        }
    }
}