aboutsummaryrefslogtreecommitdiffstats
path: root/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/DelayedConfigResponses.java
blob: 0b54a09d963a8807ea5ed52d20d5e7d9fb945f12 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.server.rpc;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.TargetWatcher;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.server.GetConfigContext;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Takes care of <i>delayed responses</i> in the config server.
 * A delayed response is a response sent at request (server) timeout
 * for a config which has not changed since the request was initiated.
 *
 * @author hmusum
 */
public class DelayedConfigResponses {
    private static final Logger log = Logger.getLogger(DelayedConfigResponses.class.getName());
    private final RpcServer rpcServer;

    private final ScheduledExecutorService executorService;
    private final boolean useJrtWatcher;

    private final Map<ApplicationId, MetricUpdater> metrics = new ConcurrentHashMap<>();
    
    /* Requests that resolve to config that has not changed are put on this queue. When activating
       config, all requests on this queue are reprocessed as if they were a new request */
    private final Map<ApplicationId, BlockingQueue<DelayedConfigResponse>> delayedResponses =
            new ConcurrentHashMap<>();
            
    DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads) {
        this(rpcServer, numTimerThreads, true);
    }

    // Since JRT does not allow adding watcher for "fake" requests, we must be able to disable it for unit tests :(
    DelayedConfigResponses(RpcServer rpcServer, int numTimerThreads, boolean useJrtWatcher) {
        this.rpcServer = rpcServer;
        ScheduledThreadPoolExecutor executor =
                new ScheduledThreadPoolExecutor(numTimerThreads,
                                                ThreadFactoryFactory.getDaemonThreadFactory("delayed config responses"));
        executor.setRemoveOnCancelPolicy(true);
        this.executorService = executor;
        this.useJrtWatcher = useJrtWatcher;
    }

    List<DelayedConfigResponse> allDelayedResponses() {
        List<DelayedConfigResponse> responses = new ArrayList<>();
        for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> entry : delayedResponses.entrySet()) {
            responses.addAll(entry.getValue());
        }
        return responses;
    }

    /**
     * The run method of this class is run by a Timer when the timeout expires.
     * The timer associated with this response must be cancelled first.
     */
    class DelayedConfigResponse implements Runnable, TargetWatcher {

        final JRTServerConfigRequest request;
        private final BlockingQueue<DelayedConfigResponse> delayedResponsesQueue;
        private final ApplicationId app;
        private ScheduledFuture<?> future;

        DelayedConfigResponse(JRTServerConfigRequest req, BlockingQueue<DelayedConfigResponse> delayedResponsesQueue, ApplicationId app) {
            this.request = req;
            this.delayedResponsesQueue = delayedResponsesQueue;
            this.app = app;
        }

        @Override
        public synchronized void run() {
            removeFromQueue();
            removeWatcher();
            rpcServer.addToRequestQueue(request, true, null);
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, logPre()+"DelayedConfigResponse. putting on queue: " + request.getShortDescription());
            }
        }

        /**
         * Remove delayed response from its queue
         */
        private void removeFromQueue() {
            delayedResponsesQueue.remove(this);
        }

        JRTServerConfigRequest getRequest() {
            return request;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Delayed response for ").append(logPre()).append(request.getShortDescription());
            return sb.toString();
        }

        String logPre() {
            return TenantRepository.logPre(app);
        }

        synchronized void cancelAndRemove() {
            removeFromQueue();
            cancel();
        }

        synchronized boolean cancel() {
            removeWatcher();
            if (future == null) {
                throw new IllegalStateException("Cannot cancel a task that has not been scheduled");
            }
            return future.cancel(false);
        }

        synchronized void schedule(long delay) throws InterruptedException {
            delayedResponsesQueue.put(this);
            future = executorService.schedule(this, delay, TimeUnit.MILLISECONDS);
            addWatcher();
        }

        /**
         * Removes this delayed response if target is invalid.
         *
         * @param target a Target that has become invalid (i.e, client has closed connection)
         * @see DelayedConfigResponses
         */
        @Override
        public void notifyTargetInvalid(Target target) {
            cancelAndRemove();
        }

        private void addWatcher() {
            if (useJrtWatcher) {
                request.getRequest().target().addWatcher(this);
            }
        }

        private void removeWatcher() {
            if (useJrtWatcher) {
                request.getRequest().target().removeWatcher(this);
            }
        }
    }

    /**
     * Creates a DelayedConfigResponse object for taking care of requests that should
     * not be responded to right away.  Puts the object on the delayedResponsesQueue.
     *
     * NOTE: This method is called from multiple threads, so everything here needs to be
     * thread safe!
     *
     * @param request a JRTConfigRequest
     */
    final void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
        if (request.isDelayedResponse()) {
            log.log(Level.FINE, () -> context.logPre()+"Request already delayed");
        } else {            
            createQueueIfNotExists(context);
            BlockingQueue<DelayedConfigResponse> delayedResponsesQueue = delayedResponses.get(context.applicationId());
            DelayedConfigResponse response = new DelayedConfigResponse(request, delayedResponsesQueue, context.applicationId());
            request.setDelayedResponse(true);
            try {
                if (log.isLoggable(Level.FINE)) {
                    log.log(Level.FINE, context.logPre()+"Putting on delayedRequests queue (" + delayedResponsesQueue.size() + " elements): " +
                            response.getRequest().getShortDescription());
                }
                // Config will be resolved in the run() method of DelayedConfigResponse,
                // when the timer expires or config is updated/activated.
                response.schedule(Math.max(0, request.getTimeout()));
                metricDelayedResponses(context.applicationId(), delayedResponsesQueue.size());
            } catch (InterruptedException e) {
                log.log(Level.WARNING, context.logPre()+"Interrupted when putting on delayed requests queue.");
            }
        }
    }

    private synchronized void metricDelayedResponses(ApplicationId app, int elems) {
        metrics.computeIfAbsent(app, key -> rpcServer.metricUpdaterFactory()
                                                     .getOrCreateMetricUpdater(Metrics.createDimensions(key)))
               .setDelayedResponses(elems);
    }

    private synchronized void createQueueIfNotExists(GetConfigContext context) {
        if ( ! delayedResponses.containsKey(context.applicationId())) {
            delayedResponses.put(context.applicationId(), new LinkedBlockingQueue<>());
        }
    }

    void stop() {
        executorService.shutdown();
    }

    /**
     * Drains delayed responses queue and returns responses in an array
     *
     * @return and array of DelayedConfigResponse objects
     */
    List<DelayedConfigResponse> drainQueue(ApplicationId app) {
        ArrayList<DelayedConfigResponse> ret = new ArrayList<>();
        
        if (delayedResponses.containsKey(app)) {
            BlockingQueue<DelayedConfigResponse> queue = delayedResponses.get(app);
            queue.drainTo(ret);
        }
        metrics.remove(app);
        return ret;
    }

    @Override
    public String toString() {
        return "DelayedConfigResponses. Average Size=" + size();
    }

    int size() {
        int totalQueueSize = 0;
        int numQueues = 0;
        for (Map.Entry<ApplicationId, BlockingQueue<DelayedConfigResponse>> e : delayedResponses.entrySet()) {
            numQueues++;
            totalQueueSize+=e.getValue().size();
        }
        return (numQueues > 0) ? (totalQueueSize / numQueues) : 0;
    }
}