summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-04-26 21:43:17 +0200
committerHarald Musum <musum@verizonmedia.com>2020-04-26 21:43:17 +0200
commitc6f4c3c58e85b059caeb33b6654e713977408b3d (patch)
treed14ad211e23894131ee65586e26d6db1abeb477d /config-proxy
parente426febfaeb0840dbc148026f394e4cfc1f0c71a (diff)
Use fewer threads to get next generation of config
Check for next config generation in one thread. Simplify Subscriber. Use a ConcurrentHashMap for active subscribers. Reduces the number of threads used by config proxy from ~230 to ~30
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java2
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java3
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java91
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java64
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java82
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java3
6 files changed, 111 insertions, 134 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
index 9fb78e7e812..e9110a0bc78 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
@@ -24,8 +24,6 @@ interface ConfigSourceClient {
List<String> getSourceConnections();
- void updateSubscribers(RawConfig config);
-
DelayedResponses delayedResponses();
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
index df0b274f32b..2e55b9a6f86 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
@@ -60,9 +60,6 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
}
@Override
- public void updateSubscribers(RawConfig config) {}
-
- @Override
public DelayedResponses delayedResponses() {
return delayedResponses;
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 12fd95b8503..ba58c369afd 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -10,6 +10,10 @@ import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import com.yahoo.vespa.config.ConfigCacheKey;
import com.yahoo.vespa.config.RawConfig;
@@ -17,15 +21,14 @@ import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.DelayQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Logger;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
@@ -33,7 +36,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
*
* @author hmusum
*/
-class RpcConfigSourceClient implements ConfigSourceClient {
+class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
private static final double timingValuesRatio = 0.8;
@@ -42,16 +45,18 @@ class RpcConfigSourceClient implements ConfigSourceClient {
private final RpcServer rpcServer;
private final ConfigSourceSet configSourceSet;
- private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>();
- private final Object activeSubscribersLock = new Object();
+ private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
private final static TimingValues timingValues;
- private final ExecutorService exec;
+ private final ScheduledExecutorService nextConfigScheduler =
+ Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config"));
+ private ScheduledFuture<?> nextConfigFuture;
private final JRTConfigRequester requester;
// Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
- private ScheduledFuture<?> delayedResponseScheduler;
+ private final ScheduledExecutorService delayedResponsesScheduler =
+ Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
+ private ScheduledFuture<?> delayedResponsesFuture;
static {
// Proxy should time out before clients upon subscription.
@@ -69,14 +74,10 @@ class RpcConfigSourceClient implements ConfigSourceClient {
this.memoryCache = memoryCache;
this.delayedResponses = new DelayedResponses();
checkConfigSources();
- exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-"));
+ nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
requester = JRTConfigRequester.create(configSourceSet, timingValues);
- // Wait for 5 seconds initially, then run every second
- delayedResponseScheduler = scheduler.scheduleAtFixedRate(
- new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer),
- 5,
- 1,
- SECONDS);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
+ delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
/**
@@ -153,30 +154,36 @@ class RpcConfigSourceClient implements ConfigSourceClient {
}
private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
- synchronized (activeSubscribersLock) {
- if (activeSubscribers.containsKey(configCacheKey)) {
- log.log(Level.FINE, () -> "Already a subscriber running for: " + configCacheKey);
- } else {
- log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
- UpstreamConfigSubscriber subscriber =
- new UpstreamConfigSubscriber(input, this, configSourceSet, timingValues, requester, memoryCache);
- try {
- subscriber.subscribe();
- activeSubscribers.put(configCacheKey, subscriber);
- exec.execute(subscriber);
- } catch (ConfigurationRuntimeException e) {
- log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
- subscriber.cancel();
- }
- }
+ if (activeSubscribers.containsKey(configCacheKey)) return;
+
+ log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
+ var subscriber = new Subscriber(input, configSourceSet, timingValues, requester);
+ try {
+ subscriber.subscribe();
+ activeSubscribers.put(configCacheKey, subscriber);
+ } catch (ConfigurationRuntimeException e) {
+ log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
+ subscriber.cancel();
}
}
@Override
+ public void run() {
+ activeSubscribers.values().forEach(subscriber -> {
+ if (!subscriber.isClosed()) {
+ Optional<RawConfig> config = subscriber.nextGeneration();
+ config.ifPresent(this::updateWithNewConfig);
+ }
+ });
+ }
+
+ @Override
public void cancel() {
shutdownSourceConnections();
- delayedResponseScheduler.cancel(true);
- scheduler.shutdown();
+ delayedResponsesFuture.cancel(true);
+ delayedResponsesScheduler.shutdown();
+ nextConfigFuture.cancel(true);
+ nextConfigScheduler.shutdown();
}
/**
@@ -184,13 +191,9 @@ class RpcConfigSourceClient implements ConfigSourceClient {
*/
@Override
public void shutdownSourceConnections() {
- synchronized (activeSubscribersLock) {
- for (Subscriber subscriber : activeSubscribers.values()) {
- subscriber.cancel();
- }
- activeSubscribers.clear();
- }
- exec.shutdown();
+ activeSubscribers.values().forEach(Subscriber::cancel);
+ activeSubscribers.clear();
+ nextConfigScheduler.shutdown();
requester.close();
}
@@ -250,4 +253,12 @@ class RpcConfigSourceClient implements ConfigSourceClient {
return delayedResponses;
}
+ private void updateWithNewConfig(RawConfig newConfig) {
+ log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +
+ "', generation=" + newConfig.getGeneration() +
+ ", payload=" + newConfig.getPayload());
+ memoryCache.update(newConfig);
+ updateSubscribers(newConfig);
+ }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java
index 74c99b7670b..f96d6470679 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java
@@ -1,12 +1,68 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.config.subscription.impl.GenericConfigHandle;
+import com.yahoo.config.subscription.impl.GenericConfigSubscriber;
+import com.yahoo.config.subscription.impl.JRTConfigRequester;
+
+import java.util.Optional;
+import java.util.logging.Level;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.TimingValues;
+import com.yahoo.yolean.Exceptions;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
/**
- * Interface for subscribing to config from upstream config sources.
- *
* @author hmusum
*/
-public interface Subscriber extends Runnable {
+public class Subscriber {
+
+ private final static Logger log = Logger.getLogger(Subscriber.class.getName());
+
+ private final RawConfig config;
+ private final ConfigSourceSet configSourceSet;
+ private final TimingValues timingValues;
+ private final GenericConfigSubscriber subscriber;
+ private GenericConfigHandle handle;
+
+ Subscriber(RawConfig config, ConfigSourceSet configSourceSet, TimingValues timingValues, JRTConfigRequester requester) {
+ this.config = config;
+ this.configSourceSet = configSourceSet;
+ this.timingValues = timingValues;
+ this.subscriber = new GenericConfigSubscriber(Map.of(configSourceSet, requester));
+ }
+
+ void subscribe() {
+ ConfigKey<?> key = config.getKey();
+ handle = subscriber.subscribe(new ConfigKey<>(key.getName(), key.getConfigId(), key.getNamespace()),
+ config.getDefContent(), configSourceSet, timingValues);
+ }
+
+ public Optional<RawConfig> nextGeneration() {
+ if (subscriber.nextGeneration(0)) {
+ try {
+ return Optional.of(handle.getRawConfig());
+ } catch (Exception e) { // To avoid thread throwing exception and loop never running this again
+ log.log(Level.WARNING, "Got exception: " + Exceptions.toMessageString(e));
+ } catch (Throwable e) {
+ com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e));
+ }
+ }
+ return Optional.empty();
+ }
+
+ public void cancel() {
+ if (subscriber != null) {
+ subscriber.close();
+ }
+ }
+
+ boolean isClosed() {
+ return subscriber.isClosed();
+ }
- void cancel();
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java
deleted file mode 100644
index a24407588be..00000000000
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java
+++ /dev/null
@@ -1,82 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy;
-
-import com.yahoo.config.subscription.ConfigSourceSet;
-import com.yahoo.config.subscription.impl.GenericConfigHandle;
-import com.yahoo.config.subscription.impl.GenericConfigSubscriber;
-import com.yahoo.config.subscription.impl.JRTConfigRequester;
-import java.util.logging.Level;
-import com.yahoo.vespa.config.ConfigKey;
-import com.yahoo.vespa.config.RawConfig;
-import com.yahoo.vespa.config.TimingValues;
-import com.yahoo.yolean.Exceptions;
-
-import java.util.Map;
-import java.util.logging.Logger;
-
-/**
- * @author hmusum
- */
-public class UpstreamConfigSubscriber implements Subscriber {
-
- private final static Logger log = Logger.getLogger(UpstreamConfigSubscriber.class.getName());
-
- private final RawConfig config;
- private final ConfigSourceClient configSourceClient;
- private final ConfigSourceSet configSourceSet;
- private final TimingValues timingValues;
- private final JRTConfigRequester requester;
- private final MemoryCache memoryCache;
- private GenericConfigSubscriber subscriber;
- private GenericConfigHandle handle;
-
- UpstreamConfigSubscriber(RawConfig config, ConfigSourceClient configSourceClient, ConfigSourceSet configSourceSet,
- TimingValues timingValues, JRTConfigRequester requester,
- MemoryCache memoryCache) {
- this.config = config;
- this.configSourceClient = configSourceClient;
- this.configSourceSet = configSourceSet;
- this.timingValues = timingValues;
- this.requester = requester;
- this.memoryCache = memoryCache;
- }
-
- void subscribe() {
- subscriber = new GenericConfigSubscriber(Map.of(configSourceSet, requester));
- ConfigKey<?> key = config.getKey();
- handle = subscriber.subscribe(new ConfigKey<>(key.getName(), key.getConfigId(), key.getNamespace()),
- config.getDefContent(), configSourceSet, timingValues);
- }
-
- @Override
- public void run() {
- do {
- if (! subscriber.nextGeneration()) continue;
-
- try {
- updateWithNewConfig(handle);
- } catch (Exception e) { // To avoid thread throwing exception and loop never running this again
- log.log(Level.WARNING, "Got exception: " + Exceptions.toMessageString(e));
- } catch (Throwable e) {
- com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e));
- }
- } while (!subscriber.isClosed());
- }
-
- private void updateWithNewConfig(GenericConfigHandle handle) {
- RawConfig newConfig = handle.getRawConfig();
- log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +
- "', generation=" + newConfig.getGeneration() +
- ", payload=" + newConfig.getPayload());
- memoryCache.update(newConfig);
- configSourceClient.updateSubscribers(newConfig);
- }
-
- @Override
- public void cancel() {
- if (subscriber != null) {
- subscriber.close();
- }
- }
-
-}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
index 06e55eef4fa..2126d673da0 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
@@ -54,9 +54,6 @@ public class MockConfigSourceClient implements ConfigSourceClient{
}
@Override
- public void updateSubscribers(RawConfig config) { }
-
- @Override
public DelayedResponses delayedResponses() { return delayedResponses; }
}