summaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
blob: cd515383950d43f6aa2723264ddc0427ad23de16 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;

import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.log.LogLevel;
import com.yahoo.log.LogSetup;
import com.yahoo.log.event.Event;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload;
import com.yahoo.yolean.system.CatchSignals;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * A proxy server that handles RPC config requests. The proxy can run in two modes:
 * 'default' and 'memorycache', where the last one will not get config from an upstream
 * config source, but will serve config from memory cache only.
 *
 * @author hmusum
 */
public class ProxyServer implements Runnable {

    private static final int DEFAULT_RPC_PORT = 19090;
    private static final int JRT_TRANSPORT_THREADS = 4;
    static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";

    private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
    private final AtomicBoolean signalCaught = new AtomicBoolean(false);

    // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
    private final Supervisor supervisor = new Supervisor(new Transport(JRT_TRANSPORT_THREADS));
    private ScheduledFuture<?> delayedResponseScheduler;

    private final ConfigProxyRpcServer rpcServer;
    final DelayedResponses delayedResponses;
    private ConfigSourceSet configSource;

    private volatile ConfigSourceClient configClient;

    private final TimingValues timingValues;
    private final MemoryCache memoryCache;
    private static final double timingValuesRatio = 0.8;
    private final static TimingValues defaultTimingValues;
    private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;

    private volatile Mode mode = new Mode(DEFAULT);

    static {
        // Proxy should time out before clients upon subscription.
        TimingValues tv = new TimingValues();
        tv.setUnconfiguredDelay((long)(tv.getUnconfiguredDelay()* timingValuesRatio)).
                setConfiguredErrorDelay((long)(tv.getConfiguredErrorDelay()* timingValuesRatio)).
                setSubscribeTimeout((long)(tv.getSubscribeTimeout()* timingValuesRatio)).
                setConfiguredErrorTimeout(-1);  // Never cache errors
        defaultTimingValues = tv;
    }

    ProxyServer(Spec spec, ConfigSourceSet source, TimingValues timingValues,
                MemoryCache memoryCache, ConfigSourceClient configClient) {
        this.delayedResponses = new DelayedResponses();
        this.configSource = source;
        log.log(LogLevel.DEBUG, "Using config source '" + source);
        this.timingValues = timingValues;
        this.memoryCache = memoryCache;
        this.rpcServer = createRpcServer(spec);
        this.configClient = createClient(rpcServer, delayedResponses, source, timingValues, memoryCache, configClient);
        this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
    }

    public void run() {
        if (rpcServer != null) {
            Thread t = new Thread(rpcServer);
            t.setName("RpcServer");
            t.start();
        }
        // Wait for 5 seconds initially, then run every second
        delayedResponseScheduler = scheduler.scheduleAtFixedRate(new DelayedResponseHandler(delayedResponses,
                                                                                            memoryCache,
                                                                                            rpcServer),
                                                                 5, 1, SECONDS);
    }

    RawConfig resolveConfig(JRTServerConfigRequest req) {
        // Calling getConfig() will either return with an answer immediately or
        // create a background thread that retrieves config from the server and
        // calls updateSubscribers when new config is returned from the config source.
        // In the last case the method below will return null.
        return configClient.getConfig(RawConfig.createFromServerRequest(req), req);
    }

    static boolean configOrGenerationHasChanged(RawConfig config, JRTServerConfigRequest request) {
        return (config != null && ( ! config.hasEqualConfig(request) || config.hasNewerGeneration(request)));
    }

    Mode getMode() {
        return mode;
    }

    void setMode(String modeName) {
        if (modeName.equals(this.mode.name())) return;

        String oldMode = this.mode.name();
        switch (mode.getMode()) {
            case MEMORYCACHE:
                configClient.shutdownSourceConnections();
                configClient = new MemoryCacheConfigClient(memoryCache);
                this.mode = new Mode(modeName);
                break;
            case DEFAULT:
                flush();
                configClient = createRpcClient();
                this.mode = new Mode(modeName);
                break;
            default:
                throw new IllegalArgumentException("Cannot set invalid mode '" + modeName + "'");
        }
        log.log(LogLevel.INFO, "Switching from '" + oldMode + "' mode to '" + modeName.toLowerCase() + "' mode");
    }

    private ConfigSourceClient createClient(RpcServer rpcServer, DelayedResponses delayedResponses,
                                            ConfigSourceSet source, TimingValues timingValues,
                                            MemoryCache memoryCache, ConfigSourceClient client) {
        return (client == null)
                ? new RpcConfigSourceClient(rpcServer, source, memoryCache, timingValues, delayedResponses)
                : client;
    }

    private ConfigProxyRpcServer createRpcServer(Spec spec) {
        return  (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
    }

    private RpcConfigSourceClient createRpcClient() {
        return new RpcConfigSourceClient(rpcServer, configSource, memoryCache, timingValues, delayedResponses);
    }

    private void setupSignalHandler() {
        CatchSignals.setup(signalCaught); // catch termination and interrupt signals
    }

    private void waitForShutdown() {
        synchronized (signalCaught) {
            while (!signalCaught.get()) {
                try {
                    signalCaught.wait();
                } catch (InterruptedException e) {
                    // empty
                }
            }
        }
        stop();
        System.exit(0);
    }

    public static void main(String[] args) {
        /* Initialize the log handler */
        LogSetup.clearHandlers();
        LogSetup.initVespaLogging("configproxy");

        Properties properties = getSystemProperties();

        int port = DEFAULT_RPC_PORT;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        Event.started("configproxy");

        ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources);
        ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources,
                                                  defaultTimingValues(), new MemoryCache(), null);
        // catch termination and interrupt signal
        proxyServer.setupSignalHandler();
        Thread proxyserverThread = new Thread(proxyServer);
        proxyserverThread.setName("configproxy");
        proxyserverThread.start();
        proxyServer.waitForShutdown();
    }

    static Properties getSystemProperties() {
        final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
        return new Properties(inputConfigSources);
    }

    static class Properties {
        final String[] configSources;

        Properties(String[] configSources) {
            this.configSources = configSources;
        }
    }

    static TimingValues defaultTimingValues() {
        return defaultTimingValues;
    }

    TimingValues getTimingValues() {
        return timingValues;
    }

    // Cancels all config instances and flushes the cache. When this method returns,
    // the cache will not be updated again before someone calls getConfig().
    private synchronized void flush() {
        memoryCache.clear();
        configClient.cancel();
    }

    void stop() {
        Event.stopping("configproxy", "shutdown");
        if (rpcServer != null) rpcServer.shutdown();
        if (delayedResponseScheduler != null) delayedResponseScheduler.cancel(true);
        flush();
        fileDistributionAndUrlDownload.close();
    }

    MemoryCache getMemoryCache() {
        return memoryCache;
    }

    String getActiveSourceConnection() {
        return configClient.getActiveSourceConnection();
    }

    List<String> getSourceConnections() {
        return configClient.getSourceConnections();
    }

    void updateSourceConnections(List<String> sources) {
        configSource = new ConfigSourceSet(sources);
        flush();
        configClient = createRpcClient();
    }

}