aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueue.java
blob: 1dd8b3bf3ec320066555a2337cea77d51d238c7d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.http.client.core.communication;

import com.yahoo.vespa.http.client.FeedEndpointException;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
 * The shared queue of operation results.
 * This is multithread safe.
 *
 * @author Einar M R Rosenvinge
 */
class EndpointResultQueue {

    private static final Logger log = Logger.getLogger(EndpointResultQueue.class.getName());
    private final OperationProcessor operationProcessor;

    /** The currently in flight operations */
    private final Map<String, TimerFuture> futureByOperation = new HashMap<>();

    private final Endpoint endpoint;
    private final int clusterId;
    private final ScheduledThreadPoolExecutor timer;
    private final long totalTimeoutMs;

    EndpointResultQueue(OperationProcessor operationProcessor,
                        Endpoint endpoint,
                        int clusterId,
                        ScheduledThreadPoolExecutor timer,
                        long totalTimeoutMs) {
        this.operationProcessor = operationProcessor;
        this.endpoint = endpoint;
        this.clusterId = clusterId;
        this.timer = timer;
        this.totalTimeoutMs = totalTimeoutMs;
    }

    public synchronized void operationSent(String operationId) {
        DocumentTimerTask task = new DocumentTimerTask(operationId);
        ScheduledFuture<?> future = timer.schedule(task, totalTimeoutMs, TimeUnit.MILLISECONDS);
        futureByOperation.put(operationId, new TimerFuture(future));
    }

    public synchronized void failOperation(EndpointResult result, int clusterId) {
        resultReceived(result, clusterId, false);
    }

    public synchronized void resultReceived(EndpointResult result, int clusterId) {
        resultReceived(result, clusterId, true);
    }

    void onEndpointError(FeedEndpointException e) {
        operationProcessor.onEndpointError(e);
    }

    private synchronized void resultReceived(EndpointResult result, int clusterId, boolean duplicateGivesWarning) {
        operationProcessor.resultReceived(result, clusterId);

        TimerFuture timerFuture = futureByOperation.remove(result.getOperationId());
        if (timerFuture == null) {
            if (duplicateGivesWarning) {
                log.warning("Result for ID '" + result.getOperationId() + "' received from '" + endpoint +
                            "', but we have no record of a sent operation. Either something is wrong on the server side " +
                            "(bad VIP usage?), or we have somehow received duplicate results, " +
                            "or operation was received _after_ client-side timeout.");
            }
            return;
        }
        timerFuture.getFuture().cancel(false);
    }

    /** Called only from ScheduledThreadPoolExecutor thread in DocumentTimerTask.run(), see below */
    private synchronized void timeout(String operationId) {
        TimerFuture timerFuture = futureByOperation.remove(operationId);
        if (timerFuture == null) {
            log.finer("Timeout of operation '" + operationId + "', but operation " +
                      "not found in map. Result was probably received just-in-time from server, while timeout " +
                      "task could not be cancelled.");
            return;
        }
        EndpointResult endpointResult = EndPointResultFactory.createTransientError(
                endpoint, operationId, new RuntimeException("Timed out waiting for reply from server."));
        operationProcessor.resultReceived(endpointResult, clusterId);
    }

    public synchronized int getPendingSize() {
        return futureByOperation.values().size();
    }

    public synchronized void failPending(Exception exception) {
        for (Map.Entry<String, TimerFuture> timerFutureEntry : futureByOperation.entrySet()) {
            timerFutureEntry.getValue().getFuture().cancel(false);
            failedOperationId(timerFutureEntry.getKey(), exception);
        }
        futureByOperation.clear();
    }

    private synchronized void failedOperationId(String operationId, Exception exception) {
        EndpointResult endpointResult = EndPointResultFactory.createError(endpoint, operationId, exception);
        operationProcessor.resultReceived(endpointResult, clusterId);
    }

    private class DocumentTimerTask implements Runnable {

        private final String operationId;

        private DocumentTimerTask(String operationId) {
            this.operationId = operationId;
        }

        @Override
        public void run() {
            timeout(operationId);
        }

    }

    private static class TimerFuture {

        private final ScheduledFuture<?> future;

        public TimerFuture(ScheduledFuture<?> future) {
            this.future = future;
        }

        private ScheduledFuture<?> getFuture() {
            return future;
        }

    }

}