aboutsummaryrefslogtreecommitdiffstats
path: root/config/src/main/java/com/yahoo/config/subscription/impl/JRTConfigRequester.java
blob: d5d21119d3b0f5f6b2955ab01df98c43f86fa7aa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.subscription.impl;

import com.yahoo.config.ConfigInstance;
import com.yahoo.config.ConfigurationRuntimeException;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.RequestWaiter;
import com.yahoo.text.internal.SnippetGenerator;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.config.ErrorCode;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTClientConfigRequest;
import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.yolean.Exceptions;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.yahoo.jrt.ErrorCode.CONNECTION;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;

/**
 * Requests configs using RPC, and acts as the callback target.
 * It uses the {@link JRTConfigSubscription} and {@link JRTClientConfigRequest}
 * as context, and puts the request objects on a queue on the subscription,
 * for handling by the user thread.
 *
 * @author Vegard Havdal
 */
public class JRTConfigRequester implements RequestWaiter {

    private static final Logger log = Logger.getLogger(JRTConfigRequester.class.getName());
    public static final ConfigSourceSet defaultSourceSet = ConfigSourceSet.createDefault();
    private static final JRTManagedConnectionPools managedPool = new JRTManagedConnectionPools();
    private static final int TRACELEVEL = 6;
    private static final Duration delayBetweenWarnings = Duration.ofSeconds(60);
    static final float randomFraction = 0.2f;
    /* Time to be added to server timeout to create client timeout. This is the time allowed for the server to respond after serverTimeout has elapsed. */
    private static final Duration additionalTimeForClientTimeout = Duration.ofSeconds(10);

    private final TimingValues timingValues;
    private final ScheduledThreadPoolExecutor scheduler;

    private final ConnectionPool connectionPool;
    private final ConfigSourceSet configSourceSet;

    private Instant timeForLastLogWarning;
    private int failures = 0;
    private volatile boolean closed = false;

    /**
     * Returns a new requester
     *
     * @param connectionPool the connectionPool this requester should use
     * @param timingValues   timeouts and delays used when sending JRT config requests
     */
    JRTConfigRequester(ConfigSourceSet configSourceSet, ScheduledThreadPoolExecutor scheduler,
                       ConnectionPool connectionPool, TimingValues timingValues) {
        this.configSourceSet = configSourceSet;
        this.scheduler = scheduler;
        this.connectionPool = connectionPool;
        this.timingValues = timingValues;
        // Adjust so that we wait 5 seconds with logging warning in case there are some errors just when starting up
        timeForLastLogWarning = Instant.now().minus(delayBetweenWarnings).plus(Duration.ofSeconds(5));
    }

    /**
     * Only for testing
     */
    public JRTConfigRequester(ConnectionPool connectionPool, TimingValues timingValues) {
        this(null, new ScheduledThreadPoolExecutor(1), connectionPool, timingValues);
    }

    public static JRTConfigRequester create(ConfigSourceSet sourceSet, TimingValues timingValues) {
        return managedPool.acquire(sourceSet, timingValues);
    }

    /**
     * Requests the config for the {@link com.yahoo.config.ConfigInstance} on the given {@link ConfigSubscription}
     *
     * @param sub a subscription
     */
    public <T extends ConfigInstance> void request(JRTConfigSubscription<T> sub) {
        JRTClientConfigRequest req = JRTConfigRequestFactory.createFromSub(sub);
        doRequest(sub, req);
    }

    private <T extends ConfigInstance> void doRequest(JRTConfigSubscription<T> sub, JRTClientConfigRequest req) {
        Connection connection = connectionPool.getCurrent();
        Request request = req.getRequest();
        request.setContext(new RequestContext(sub, req, connection));
        if (!req.validateParameters())
            throw new ConfigurationRuntimeException("Error in parameters for config request: " + req);

        Duration jrtClientTimeout = getClientTimeout(req);
        log.log(FINE, () -> "Requesting config for " + sub + " on connection " + connection
                + " with client timeout " + jrtClientTimeout +
                (log.isLoggable(FINEST) ? (",defcontent=" + req.getDefContent().asString()) : ""));
        connection.invokeAsync(request, jrtClientTimeout, this);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void handleRequestDone(Request req) {
        JRTConfigSubscription<ConfigInstance> sub = null;
        try {
            RequestContext context = (RequestContext) req.getContext();
            sub = context.sub;
            doHandle(sub, context.jrtReq, context.connection);
        } catch (RuntimeException e) {
            if (sub != null) {
                // Sets this field, it will get thrown from the user thread
                sub.setException(e);
            } else {
                // Very unlikely
                log.log(SEVERE, "Failed to get subscription object from JRT config callback: " +
                        Exceptions.toMessageString(e));
            }
        }
    }

    private void doHandle(JRTConfigSubscription<ConfigInstance> sub, JRTClientConfigRequest jrtReq, Connection connection) {
        if (sub.isClosed()) return; // Avoid error messages etc. after closing

        boolean validResponse = jrtReq.validateResponse();
        log.log(FINE, () -> "Response " + (validResponse ? "valid" : "invalid") + ". Req: " + jrtReq + "\nSpec: " + connection);
        Trace trace = jrtReq.getResponseTrace();
        trace.trace(TRACELEVEL, "JRTConfigRequester.doHandle()");
        log.log(FINEST, () -> trace.toString());
        if (validResponse)
            handleOKRequest(jrtReq, sub);
        else
            handleFailedRequest(jrtReq, sub, connection);
    }

    private void logFailingRequest(JRTClientConfigRequest jrtReq, Connection connection) {
        if (closed) return;

        if (jrtReq.errorCode() == CONNECTION) {
            log.log(FINE, () -> "Request failed: " + jrtReq.errorMessage() +
                    "\nConnection spec: " + connection);
        } else if (jrtReq.errorCode() == ErrorCode.APPLICATION_NOT_LOADED) {
            log.log(INFO, () -> "Request failed: " + jrtReq.errorMessage() +
                    "\nConnection spec: " + connection);
        } else if (timeForLastLogWarning.isBefore(Instant.now().minus(delayBetweenWarnings))) {
            log.log(WARNING, "Request failed: " + ErrorCode.getName(jrtReq.errorCode()) +
                    ". Connection spec: " + connection.getAddress() +
                    ", error message: " + jrtReq.errorMessage());
            timeForLastLogWarning = Instant.now();
        } else {
            log.log(FINE, () -> "Request failed: " + ErrorCode.getName(jrtReq.errorCode()) +
                    ". Connection spec: " + connection.getAddress() +
                    ", error message: " + jrtReq.errorMessage());
        }
    }

    private void handleFailedRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<ConfigInstance> sub, Connection connection) {
        logFailingRequest(jrtReq, connection);

        connectionPool.switchConnection(connection);
        if (failures < 10)
            failures++;
        long delay = calculateFailedRequestDelay(failures, timingValues);
        log.log(FINE, () -> "Request for config " + jrtReq.getShortDescription() + "' failed with error code " +
                jrtReq.errorCode() + " (" + jrtReq.errorMessage() + "), scheduling new request " +
                " in " + delay + " ms");
        scheduleNextRequest(jrtReq, sub, delay, calculateErrorTimeout());
    }

    static long calculateFailedRequestDelay(int failures, TimingValues timingValues) {
        long delay = timingValues.getFixedDelay() * (long)Math.pow(2, failures);
        delay = Math.max(timingValues.getFixedDelay(), Math.min(60_000, delay)); // between timingValues.getFixedDelay() and 60 seconds
        delay = timingValues.getPlusMinusFractionRandom(delay, randomFraction);

        return delay;
    }

    private long calculateErrorTimeout() {
        return timingValues.getPlusMinusFractionRandom(timingValues.getErrorTimeout(), randomFraction);
    }

    private void handleOKRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<ConfigInstance> sub) {
        failures = 0;
        sub.setLastCallBackOKTS(Instant.now());
        log.log(FINE, () -> "OK response received in handleOkRequest: " + jrtReq);
        if (jrtReq.hasUpdatedGeneration()) {
            sub.updateConfig(jrtReq);
        } else if (jrtReq.hasUpdatedConfig()) {
            SnippetGenerator generator = new SnippetGenerator();
            int sizeHint = 500;
            String config = jrtReq.getNewPayload().toString();
            log.log(Level.WARNING, "Config " + jrtReq.getConfigKey() + " has changed without a change in config generation: generation " +
                    jrtReq.getNewGeneration() + ", config: " + generator.makeSnippet(config, sizeHint) +
                    ". This might happen when a newly upgraded config server responds with different config when bootstrapping  " +
                    " (changes to code generating config that are different between versions) or non-deterministic config generation" +
                    " (e.g. when using collections with non-deterministic iteration order)");
        }
        scheduleNextRequest(jrtReq, sub, calculateSuccessDelay(), calculateSuccessTimeout());
    }

    private long calculateSuccessTimeout() {
        return timingValues.getPlusMinusFractionRandom(timingValues.getSuccessTimeout(), randomFraction);
    }

    private long calculateSuccessDelay() {
        return timingValues.getPlusMinusFractionRandom(timingValues.getFixedDelay(), randomFraction);
    }

    private void scheduleNextRequest(JRTClientConfigRequest jrtReq, JRTConfigSubscription<?> sub, long delay, long timeout) {
        long delayBeforeSendingRequest = (delay < 0) ? 0 : delay;
        JRTClientConfigRequest jrtReqNew = jrtReq.nextRequest(timeout);
        log.log(FINEST, () -> timingValues.toString());
        log.log(FINE, () -> "Scheduling new request " + delayBeforeSendingRequest + " millis from now for " + jrtReqNew.getConfigKey());
        scheduler.schedule(new GetConfigTask(jrtReqNew, sub), delayBeforeSendingRequest, TimeUnit.MILLISECONDS);
    }

    /**
     * Task that can be scheduled in a timer for executing a getConfig request
     */
    private class GetConfigTask implements Runnable {
        private final JRTClientConfigRequest jrtReq;
        private final JRTConfigSubscription<?> sub;

        GetConfigTask(JRTClientConfigRequest jrtReq, JRTConfigSubscription<?> sub) {
            this.jrtReq = jrtReq;
            this.sub = sub;
        }

        public void run() {
            doRequest(sub, jrtReq);
        }
    }

    public void close() {
        closed = true;
        if (configSourceSet != null) {
            managedPool.release(configSourceSet);
        }
    }

    @SuppressWarnings("rawtypes")
    private static class RequestContext {
        final JRTConfigSubscription sub;
        final JRTClientConfigRequest jrtReq;
        final Connection connection;

        private RequestContext(JRTConfigSubscription sub, JRTClientConfigRequest jrtReq, Connection connection) {
            this.sub = sub;
            this.jrtReq = jrtReq;
            this.connection = connection;
        }
    }

    int getFailures() { return failures; }

    // TODO: Should be package private, used in integrationtest.rb in system tests
    public ConnectionPool getConnectionPool() {
        return connectionPool;
    }

    private Duration getClientTimeout(JRTClientConfigRequest request) {
        return Duration.ofMillis(request.getTimeout()).plus(additionalTimeForClientTimeout);
    }
}