diff options
Diffstat (limited to 'config-proxy/src/main')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java | 100 |
1 files changed, 49 insertions, 51 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 71f1571b9c8..a2373111bac 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -1,4 +1,4 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; import com.yahoo.concurrent.DaemonThreadFactory; @@ -11,6 +11,7 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; @@ -37,8 +38,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; */ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { - private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName()); - private static final double timingValuesRatio = 0.8; + private static final Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName()); + private static final TimingValues timingValues = createTimingValues(); private final Supervisor supervisor = new Supervisor(new Transport("config-source-client")); @@ -47,7 +48,6 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; - private final static TimingValues timingValues; private final ScheduledExecutorService nextConfigScheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config")); private final ScheduledFuture<?> nextConfigFuture; @@ -57,16 +57,6 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); private final ScheduledFuture<?> delayedResponsesFuture; - static { - // Proxy should time out before clients upon subscription. - TimingValues tv = new TimingValues(); - tv.setUnconfiguredDelay((long)(tv.getUnconfiguredDelay()* timingValuesRatio)). - setConfiguredErrorDelay((long)(tv.getConfiguredErrorDelay()* timingValuesRatio)). - setSubscribeTimeout((long)(tv.getSubscribeTimeout()* timingValuesRatio)). - setConfiguredErrorTimeout(-1); // Never cache errors - timingValues = tv; - } - RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) { this.rpcServer = rpcServer; this.configSourceSet = configSourceSet; @@ -74,35 +64,31 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { this.delayedResponses = new DelayedResponses(); checkConfigSources(); nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); - requester = JRTConfigRequester.create(configSourceSet, timingValues); + this.requester = JRTConfigRequester.create(configSourceSet, timingValues); DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); - delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); + this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } /** * Checks if config sources are available */ private void checkConfigSources() { - if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) { - log.log(Level.WARNING, "No config sources defined, could not check connection"); - } else { - Request req = new Request("ping"); - for (String configSource : configSourceSet.getSources()) { - Spec spec = new Spec(configSource); - Target target = supervisor.connect(spec); - target.invokeSync(req, 30.0); - if (target.isValid()) { - log.log(Level.FINE, () -> "Created connection to config source at " + spec.toString()); - return; - } else { - log.log(Level.INFO, "Could not connect to config source at " + spec.toString()); - } - target.close(); - } - String extra = ""; - log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() + - ", please make sure config server(s) are running. " + extra); + if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) + throw new IllegalArgumentException("No config sources defined, could not check connection"); + + Request req = new Request("ping"); + for (String configSource : configSourceSet.getSources()) { + Spec spec = new Spec(configSource); + Target target = supervisor.connect(spec); + target.invokeSync(req, 30.0); + if (target.isValid()) + return; + + log.log(Level.INFO, "Could not connect to config source at " + spec.toString()); + target.close(); } + log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() + + ", please make sure config server(s) are running."); } /** @@ -126,7 +112,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { DelayedResponse delayedResponse = new DelayedResponse(request); delayedResponses.add(delayedResponse); - final ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5()); + ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5()); RawConfig cachedConfig = memoryCache.get(configCacheKey); boolean needToGetConfig = true; @@ -219,40 +205,41 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } /** - * This method will be called when a response with changed config is received from upstream - * (content or generation has changed) or the server timeout has elapsed. + * Updates subscribers with new config. This method will be called when a response with changed config is + * received from upstream (content or generation has changed) or the server timeout has elapsed. * * @param config new config */ public void updateSubscribers(RawConfig config) { - log.log(Level.FINE, () -> "Config updated for " + config.getKey() + "," + config.getGeneration()); + ConfigKey<?> key = config.getKey(); + long generation = config.getGeneration(); + log.log(Level.FINE, () -> "Config updated for " + key + "," + generation); DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses(); + if (responseDelayQueue.size() == 0) return; + + log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements"); log.log(Level.FINEST, () -> "Delayed response queue: " + responseDelayQueue); - if (responseDelayQueue.size() == 0) { - log.log(Level.FINE, () -> "There exists no matching element on delayed response queue for " + config.getKey()); - return; - } else { - log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements"); - } boolean found = false; for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) { JRTServerConfigRequest request = response.getRequest(); - if (request.getConfigKey().equals(config.getKey()) + if (request.getConfigKey().equals(key) // Generation 0 is special, used when returning empty sentinel config - && (config.getGeneration() >= request.getRequestGeneration() || config.getGeneration() == 0)) { + && (generation >= request.getRequestGeneration() || generation == 0)) { if (delayedResponses.remove(response)) { found = true; - log.log(Level.FINE, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration()); + log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation); + if (config.getPayload().getData().getByteLength() == 0) + log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config"); rpcServer.returnOkResponse(request, config); } else { - log.log(Level.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed"); + log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed"); } } } if (!found) { - log.log(Level.FINE, () -> "Found no recipient for " + config.getKey() + " in delayed response queue"); + log.log(Level.FINE, () -> "Found no recipient for " + key + " in delayed response queue"); } - log.log(Level.FINE, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration()); + log.log(Level.FINE, () -> "Finished updating config for " + key + "," + generation); } @Override @@ -268,4 +255,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { updateSubscribers(newConfig); } + private static TimingValues createTimingValues() { + // Proxy should time out before clients upon subscription. + double timingValuesRatio = 0.8; + TimingValues tv = new TimingValues(); + tv.setUnconfiguredDelay((long) (tv.getUnconfiguredDelay() * timingValuesRatio)). + setConfiguredErrorDelay((long) (tv.getConfiguredErrorDelay() * timingValuesRatio)). + setSubscribeTimeout((long) (tv.getSubscribeTimeout() * timingValuesRatio)). + setConfiguredErrorTimeout(-1); // Never cache errors + return tv; + } + } |