aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-10-05 09:07:22 +0200
committerHarald Musum <musum@yahooinc.com>2021-10-05 09:07:22 +0200
commitf9b6a26a7872f06ec98166b3b610a16ebd0764e6 (patch)
tree2ded2fcc3c4218bb39ba2a67611cd40372220aea /config-proxy
parentcd818461f360b504f0e4d2c5e3c99722a9b5508f (diff)
Throw if no config sources defined, log if empty config
Other changes are non-functional
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java100
1 files changed, 49 insertions, 51 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 71f1571b9c8..a2373111bac 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -1,4 +1,4 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
import com.yahoo.concurrent.DaemonThreadFactory;
@@ -11,6 +11,7 @@ import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
@@ -37,8 +38,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
*/
class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
- private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
- private static final double timingValuesRatio = 0.8;
+ private static final Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
+ private static final TimingValues timingValues = createTimingValues();
private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
@@ -47,7 +48,6 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
- private final static TimingValues timingValues;
private final ScheduledExecutorService nextConfigScheduler =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config"));
private final ScheduledFuture<?> nextConfigFuture;
@@ -57,16 +57,6 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
private final ScheduledFuture<?> delayedResponsesFuture;
- static {
- // Proxy should time out before clients upon subscription.
- TimingValues tv = new TimingValues();
- tv.setUnconfiguredDelay((long)(tv.getUnconfiguredDelay()* timingValuesRatio)).
- setConfiguredErrorDelay((long)(tv.getConfiguredErrorDelay()* timingValuesRatio)).
- setSubscribeTimeout((long)(tv.getSubscribeTimeout()* timingValuesRatio)).
- setConfiguredErrorTimeout(-1); // Never cache errors
- timingValues = tv;
- }
-
RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
this.rpcServer = rpcServer;
this.configSourceSet = configSourceSet;
@@ -74,35 +64,31 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
this.delayedResponses = new DelayedResponses();
checkConfigSources();
nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
- requester = JRTConfigRequester.create(configSourceSet, timingValues);
+ this.requester = JRTConfigRequester.create(configSourceSet, timingValues);
DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
- delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
+ this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
/**
* Checks if config sources are available
*/
private void checkConfigSources() {
- if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) {
- log.log(Level.WARNING, "No config sources defined, could not check connection");
- } else {
- Request req = new Request("ping");
- for (String configSource : configSourceSet.getSources()) {
- Spec spec = new Spec(configSource);
- Target target = supervisor.connect(spec);
- target.invokeSync(req, 30.0);
- if (target.isValid()) {
- log.log(Level.FINE, () -> "Created connection to config source at " + spec.toString());
- return;
- } else {
- log.log(Level.INFO, "Could not connect to config source at " + spec.toString());
- }
- target.close();
- }
- String extra = "";
- log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() +
- ", please make sure config server(s) are running. " + extra);
+ if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0)
+ throw new IllegalArgumentException("No config sources defined, could not check connection");
+
+ Request req = new Request("ping");
+ for (String configSource : configSourceSet.getSources()) {
+ Spec spec = new Spec(configSource);
+ Target target = supervisor.connect(spec);
+ target.invokeSync(req, 30.0);
+ if (target.isValid())
+ return;
+
+ log.log(Level.INFO, "Could not connect to config source at " + spec.toString());
+ target.close();
}
+ log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() +
+ ", please make sure config server(s) are running.");
}
/**
@@ -126,7 +112,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
DelayedResponse delayedResponse = new DelayedResponse(request);
delayedResponses.add(delayedResponse);
- final ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5());
+ ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5());
RawConfig cachedConfig = memoryCache.get(configCacheKey);
boolean needToGetConfig = true;
@@ -219,40 +205,41 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
/**
- * This method will be called when a response with changed config is received from upstream
- * (content or generation has changed) or the server timeout has elapsed.
+ * Updates subscribers with new config. This method will be called when a response with changed config is
+ * received from upstream (content or generation has changed) or the server timeout has elapsed.
*
* @param config new config
*/
public void updateSubscribers(RawConfig config) {
- log.log(Level.FINE, () -> "Config updated for " + config.getKey() + "," + config.getGeneration());
+ ConfigKey<?> key = config.getKey();
+ long generation = config.getGeneration();
+ log.log(Level.FINE, () -> "Config updated for " + key + "," + generation);
DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses();
+ if (responseDelayQueue.size() == 0) return;
+
+ log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
log.log(Level.FINEST, () -> "Delayed response queue: " + responseDelayQueue);
- if (responseDelayQueue.size() == 0) {
- log.log(Level.FINE, () -> "There exists no matching element on delayed response queue for " + config.getKey());
- return;
- } else {
- log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
- }
boolean found = false;
for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) {
JRTServerConfigRequest request = response.getRequest();
- if (request.getConfigKey().equals(config.getKey())
+ if (request.getConfigKey().equals(key)
// Generation 0 is special, used when returning empty sentinel config
- && (config.getGeneration() >= request.getRequestGeneration() || config.getGeneration() == 0)) {
+ && (generation >= request.getRequestGeneration() || generation == 0)) {
if (delayedResponses.remove(response)) {
found = true;
- log.log(Level.FINE, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
+ log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation);
+ if (config.getPayload().getData().getByteLength() == 0)
+ log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config");
rpcServer.returnOkResponse(request, config);
} else {
- log.log(Level.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed");
+ log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed");
}
}
}
if (!found) {
- log.log(Level.FINE, () -> "Found no recipient for " + config.getKey() + " in delayed response queue");
+ log.log(Level.FINE, () -> "Found no recipient for " + key + " in delayed response queue");
}
- log.log(Level.FINE, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration());
+ log.log(Level.FINE, () -> "Finished updating config for " + key + "," + generation);
}
@Override
@@ -268,4 +255,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
updateSubscribers(newConfig);
}
+ private static TimingValues createTimingValues() {
+ // Proxy should time out before clients upon subscription.
+ double timingValuesRatio = 0.8;
+ TimingValues tv = new TimingValues();
+ tv.setUnconfiguredDelay((long) (tv.getUnconfiguredDelay() * timingValuesRatio)).
+ setConfiguredErrorDelay((long) (tv.getConfiguredErrorDelay() * timingValuesRatio)).
+ setSubscribeTimeout((long) (tv.getSubscribeTimeout() * timingValuesRatio)).
+ setConfiguredErrorTimeout(-1); // Never cache errors
+ return tv;
+ }
+
}