aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java123
1 files changed, 67 insertions, 56 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 2a33e8c6928..ba58c369afd 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -10,22 +10,25 @@ import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
-import com.yahoo.log.LogLevel;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
import com.yahoo.vespa.config.ConfigCacheKey;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Logger;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
@@ -33,7 +36,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
*
* @author hmusum
*/
-class RpcConfigSourceClient implements ConfigSourceClient {
+class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
private static final double timingValuesRatio = 0.8;
@@ -42,16 +45,18 @@ class RpcConfigSourceClient implements ConfigSourceClient {
private final RpcServer rpcServer;
private final ConfigSourceSet configSourceSet;
- private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>();
- private final Object activeSubscribersLock = new Object();
+ private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
private final static TimingValues timingValues;
- private final ExecutorService exec;
+ private final ScheduledExecutorService nextConfigScheduler =
+ Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config"));
+ private ScheduledFuture<?> nextConfigFuture;
private final JRTConfigRequester requester;
// Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
- private ScheduledFuture<?> delayedResponseScheduler;
+ private final ScheduledExecutorService delayedResponsesScheduler =
+ Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
+ private ScheduledFuture<?> delayedResponsesFuture;
static {
// Proxy should time out before clients upon subscription.
@@ -69,14 +74,10 @@ class RpcConfigSourceClient implements ConfigSourceClient {
this.memoryCache = memoryCache;
this.delayedResponses = new DelayedResponses();
checkConfigSources();
- exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-"));
+ nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
requester = JRTConfigRequester.create(configSourceSet, timingValues);
- // Wait for 5 seconds initially, then run every second
- delayedResponseScheduler = scheduler.scheduleAtFixedRate(
- new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer),
- 5,
- 1,
- SECONDS);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
+ delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
/**
@@ -84,7 +85,7 @@ class RpcConfigSourceClient implements ConfigSourceClient {
*/
private void checkConfigSources() {
if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) {
- log.log(LogLevel.WARNING, "No config sources defined, could not check connection");
+ log.log(Level.WARNING, "No config sources defined, could not check connection");
} else {
Request req = new Request("ping");
for (String configSource : configSourceSet.getSources()) {
@@ -92,15 +93,15 @@ class RpcConfigSourceClient implements ConfigSourceClient {
Target target = supervisor.connect(spec);
target.invokeSync(req, 30.0);
if (target.isValid()) {
- log.log(LogLevel.DEBUG, () -> "Created connection to config source at " + spec.toString());
+ log.log(Level.FINE, () -> "Created connection to config source at " + spec.toString());
return;
} else {
- log.log(LogLevel.INFO, "Could not connect to config source at " + spec.toString());
+ log.log(Level.INFO, "Could not connect to config source at " + spec.toString());
}
target.close();
}
String extra = "";
- log.log(LogLevel.INFO, "Could not connect to any config source in set " + configSourceSet.toString() +
+ log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() +
", please make sure config server(s) are running. " + extra);
}
}
@@ -132,11 +133,11 @@ class RpcConfigSourceClient implements ConfigSourceClient {
RawConfig ret = null;
if (cachedConfig != null) {
- log.log(LogLevel.DEBUG, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() +
+ log.log(Level.FINE, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() +
",configmd5=" + cachedConfig.getConfigMd5());
- log.log(LogLevel.SPAM, () -> "input config=" + input + ",cached config=" + cachedConfig);
+ log.log(Level.FINEST, () -> "input config=" + input + ",cached config=" + cachedConfig);
if (ProxyServer.configOrGenerationHasChanged(cachedConfig, request)) {
- log.log(LogLevel.SPAM, () -> "Cached config is not equal to requested, will return it");
+ log.log(Level.FINEST, () -> "Cached config is not equal to requested, will return it");
if (delayedResponses.remove(delayedResponse)) {
// unless another thread already did it
ret = cachedConfig;
@@ -153,30 +154,36 @@ class RpcConfigSourceClient implements ConfigSourceClient {
}
private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
- synchronized (activeSubscribersLock) {
- if (activeSubscribers.containsKey(configCacheKey)) {
- log.log(LogLevel.DEBUG, () -> "Already a subscriber running for: " + configCacheKey);
- } else {
- log.log(LogLevel.DEBUG, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
- UpstreamConfigSubscriber subscriber =
- new UpstreamConfigSubscriber(input, this, configSourceSet, timingValues, requester, memoryCache);
- try {
- subscriber.subscribe();
- activeSubscribers.put(configCacheKey, subscriber);
- exec.execute(subscriber);
- } catch (ConfigurationRuntimeException e) {
- log.log(LogLevel.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
- subscriber.cancel();
- }
- }
+ if (activeSubscribers.containsKey(configCacheKey)) return;
+
+ log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
+ var subscriber = new Subscriber(input, configSourceSet, timingValues, requester);
+ try {
+ subscriber.subscribe();
+ activeSubscribers.put(configCacheKey, subscriber);
+ } catch (ConfigurationRuntimeException e) {
+ log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
+ subscriber.cancel();
}
}
@Override
+ public void run() {
+ activeSubscribers.values().forEach(subscriber -> {
+ if (!subscriber.isClosed()) {
+ Optional<RawConfig> config = subscriber.nextGeneration();
+ config.ifPresent(this::updateWithNewConfig);
+ }
+ });
+ }
+
+ @Override
public void cancel() {
shutdownSourceConnections();
- delayedResponseScheduler.cancel(true);
- scheduler.shutdown();
+ delayedResponsesFuture.cancel(true);
+ delayedResponsesScheduler.shutdown();
+ nextConfigFuture.cancel(true);
+ nextConfigScheduler.shutdown();
}
/**
@@ -184,13 +191,9 @@ class RpcConfigSourceClient implements ConfigSourceClient {
*/
@Override
public void shutdownSourceConnections() {
- synchronized (activeSubscribersLock) {
- for (Subscriber subscriber : activeSubscribers.values()) {
- subscriber.cancel();
- }
- activeSubscribers.clear();
- }
- exec.shutdown();
+ activeSubscribers.values().forEach(Subscriber::cancel);
+ activeSubscribers.clear();
+ nextConfigScheduler.shutdown();
requester.close();
}
@@ -215,14 +218,14 @@ class RpcConfigSourceClient implements ConfigSourceClient {
* @param config new config
*/
public void updateSubscribers(RawConfig config) {
- log.log(LogLevel.DEBUG, () -> "Config updated for " + config.getKey() + "," + config.getGeneration());
+ log.log(Level.FINE, () -> "Config updated for " + config.getKey() + "," + config.getGeneration());
DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses();
- log.log(LogLevel.SPAM, () -> "Delayed response queue: " + responseDelayQueue);
+ log.log(Level.FINEST, () -> "Delayed response queue: " + responseDelayQueue);
if (responseDelayQueue.size() == 0) {
- log.log(LogLevel.DEBUG, () -> "There exists no matching element on delayed response queue for " + config.getKey());
+ log.log(Level.FINE, () -> "There exists no matching element on delayed response queue for " + config.getKey());
return;
} else {
- log.log(LogLevel.DEBUG, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
+ log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements");
}
boolean found = false;
for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) {
@@ -232,17 +235,17 @@ class RpcConfigSourceClient implements ConfigSourceClient {
&& (config.getGeneration() >= request.getRequestGeneration() || config.getGeneration() == 0)) {
if (delayedResponses.remove(response)) {
found = true;
- log.log(LogLevel.DEBUG, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
+ log.log(Level.FINE, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
rpcServer.returnOkResponse(request, config);
} else {
- log.log(LogLevel.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed");
+ log.log(Level.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed");
}
}
}
if (!found) {
- log.log(LogLevel.DEBUG, () -> "Found no recipient for " + config.getKey() + " in delayed response queue");
+ log.log(Level.FINE, () -> "Found no recipient for " + config.getKey() + " in delayed response queue");
}
- log.log(LogLevel.DEBUG, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration());
+ log.log(Level.FINE, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration());
}
@Override
@@ -250,4 +253,12 @@ class RpcConfigSourceClient implements ConfigSourceClient {
return delayedResponses;
}
+ private void updateWithNewConfig(RawConfig newConfig) {
+ log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +
+ "', generation=" + newConfig.getGeneration() +
+ ", payload=" + newConfig.getPayload());
+ memoryCache.update(newConfig);
+ updateSubscribers(newConfig);
+ }
+
}