diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-04-26 21:43:17 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-04-26 21:43:17 +0200 |
commit | c6f4c3c58e85b059caeb33b6654e713977408b3d (patch) | |
tree | d14ad211e23894131ee65586e26d6db1abeb477d | |
parent | e426febfaeb0840dbc148026f394e4cfc1f0c71a (diff) |
Use fewer threads to get next generation of config
Check for next config generation in one thread.
Simplify Subscriber.
Use a ConcurrentHashMap for active subscribers.
Reduces the number of threads used by config proxy from ~230 to ~30
6 files changed, 111 insertions, 134 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java index 9fb78e7e812..e9110a0bc78 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java @@ -24,8 +24,6 @@ interface ConfigSourceClient { List<String> getSourceConnections(); - void updateSubscribers(RawConfig config); - DelayedResponses delayedResponses(); } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java index df0b274f32b..2e55b9a6f86 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java @@ -60,9 +60,6 @@ class MemoryCacheConfigClient implements ConfigSourceClient { } @Override - public void updateSubscribers(RawConfig config) {} - - @Override public DelayedResponses delayedResponses() { return delayedResponses; } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 12fd95b8503..ba58c369afd 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -10,6 +10,10 @@ import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import com.yahoo.vespa.config.ConfigCacheKey; import com.yahoo.vespa.config.RawConfig; @@ -17,15 +21,14 @@ import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.logging.Logger; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -33,7 +36,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; * * @author hmusum */ -class RpcConfigSourceClient implements ConfigSourceClient { +class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName()); private static final double timingValuesRatio = 0.8; @@ -42,16 +45,18 @@ class RpcConfigSourceClient implements ConfigSourceClient { private final RpcServer rpcServer; private final ConfigSourceSet configSourceSet; - private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>(); - private final Object activeSubscribersLock = new Object(); + private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final static TimingValues timingValues; - private final ExecutorService exec; + private final ScheduledExecutorService nextConfigScheduler = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config")); + private ScheduledFuture<?> nextConfigFuture; private final JRTConfigRequester requester; // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); - private ScheduledFuture<?> delayedResponseScheduler; + private final ScheduledExecutorService delayedResponsesScheduler = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); + private ScheduledFuture<?> delayedResponsesFuture; static { // Proxy should time out before clients upon subscription. @@ -69,14 +74,10 @@ class RpcConfigSourceClient implements ConfigSourceClient { this.memoryCache = memoryCache; this.delayedResponses = new DelayedResponses(); checkConfigSources(); - exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-")); + nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); requester = JRTConfigRequester.create(configSourceSet, timingValues); - // Wait for 5 seconds initially, then run every second - delayedResponseScheduler = scheduler.scheduleAtFixedRate( - new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer), - 5, - 1, - SECONDS); + DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); + delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } /** @@ -153,30 +154,36 @@ class RpcConfigSourceClient implements ConfigSourceClient { } private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) { - synchronized (activeSubscribersLock) { - if (activeSubscribers.containsKey(configCacheKey)) { - log.log(Level.FINE, () -> "Already a subscriber running for: " + configCacheKey); - } else { - log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - UpstreamConfigSubscriber subscriber = - new UpstreamConfigSubscriber(input, this, configSourceSet, timingValues, requester, memoryCache); - try { - subscriber.subscribe(); - activeSubscribers.put(configCacheKey, subscriber); - exec.execute(subscriber); - } catch (ConfigurationRuntimeException e) { - log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); - subscriber.cancel(); - } - } + if (activeSubscribers.containsKey(configCacheKey)) return; + + log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); + var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); + try { + subscriber.subscribe(); + activeSubscribers.put(configCacheKey, subscriber); + } catch (ConfigurationRuntimeException e) { + log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); + subscriber.cancel(); } } @Override + public void run() { + activeSubscribers.values().forEach(subscriber -> { + if (!subscriber.isClosed()) { + Optional<RawConfig> config = subscriber.nextGeneration(); + config.ifPresent(this::updateWithNewConfig); + } + }); + } + + @Override public void cancel() { shutdownSourceConnections(); - delayedResponseScheduler.cancel(true); - scheduler.shutdown(); + delayedResponsesFuture.cancel(true); + delayedResponsesScheduler.shutdown(); + nextConfigFuture.cancel(true); + nextConfigScheduler.shutdown(); } /** @@ -184,13 +191,9 @@ class RpcConfigSourceClient implements ConfigSourceClient { */ @Override public void shutdownSourceConnections() { - synchronized (activeSubscribersLock) { - for (Subscriber subscriber : activeSubscribers.values()) { - subscriber.cancel(); - } - activeSubscribers.clear(); - } - exec.shutdown(); + activeSubscribers.values().forEach(Subscriber::cancel); + activeSubscribers.clear(); + nextConfigScheduler.shutdown(); requester.close(); } @@ -250,4 +253,12 @@ class RpcConfigSourceClient implements ConfigSourceClient { return delayedResponses; } + private void updateWithNewConfig(RawConfig newConfig) { + log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + + "', generation=" + newConfig.getGeneration() + + ", payload=" + newConfig.getPayload()); + memoryCache.update(newConfig); + updateSubscribers(newConfig); + } + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java index 74c99b7670b..f96d6470679 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java @@ -1,12 +1,68 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.impl.GenericConfigHandle; +import com.yahoo.config.subscription.impl.GenericConfigSubscriber; +import com.yahoo.config.subscription.impl.JRTConfigRequester; + +import java.util.Optional; +import java.util.logging.Level; +import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.TimingValues; +import com.yahoo.yolean.Exceptions; + +import java.util.Map; +import java.util.logging.Logger; + /** - * Interface for subscribing to config from upstream config sources. - * * @author hmusum */ -public interface Subscriber extends Runnable { +public class Subscriber { + + private final static Logger log = Logger.getLogger(Subscriber.class.getName()); + + private final RawConfig config; + private final ConfigSourceSet configSourceSet; + private final TimingValues timingValues; + private final GenericConfigSubscriber subscriber; + private GenericConfigHandle handle; + + Subscriber(RawConfig config, ConfigSourceSet configSourceSet, TimingValues timingValues, JRTConfigRequester requester) { + this.config = config; + this.configSourceSet = configSourceSet; + this.timingValues = timingValues; + this.subscriber = new GenericConfigSubscriber(Map.of(configSourceSet, requester)); + } + + void subscribe() { + ConfigKey<?> key = config.getKey(); + handle = subscriber.subscribe(new ConfigKey<>(key.getName(), key.getConfigId(), key.getNamespace()), + config.getDefContent(), configSourceSet, timingValues); + } + + public Optional<RawConfig> nextGeneration() { + if (subscriber.nextGeneration(0)) { + try { + return Optional.of(handle.getRawConfig()); + } catch (Exception e) { // To avoid thread throwing exception and loop never running this again + log.log(Level.WARNING, "Got exception: " + Exceptions.toMessageString(e)); + } catch (Throwable e) { + com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e)); + } + } + return Optional.empty(); + } + + public void cancel() { + if (subscriber != null) { + subscriber.close(); + } + } + + boolean isClosed() { + return subscriber.isClosed(); + } - void cancel(); } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java deleted file mode 100644 index a24407588be..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy; - -import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.config.subscription.impl.GenericConfigHandle; -import com.yahoo.config.subscription.impl.GenericConfigSubscriber; -import com.yahoo.config.subscription.impl.JRTConfigRequester; -import java.util.logging.Level; -import com.yahoo.vespa.config.ConfigKey; -import com.yahoo.vespa.config.RawConfig; -import com.yahoo.vespa.config.TimingValues; -import com.yahoo.yolean.Exceptions; - -import java.util.Map; -import java.util.logging.Logger; - -/** - * @author hmusum - */ -public class UpstreamConfigSubscriber implements Subscriber { - - private final static Logger log = Logger.getLogger(UpstreamConfigSubscriber.class.getName()); - - private final RawConfig config; - private final ConfigSourceClient configSourceClient; - private final ConfigSourceSet configSourceSet; - private final TimingValues timingValues; - private final JRTConfigRequester requester; - private final MemoryCache memoryCache; - private GenericConfigSubscriber subscriber; - private GenericConfigHandle handle; - - UpstreamConfigSubscriber(RawConfig config, ConfigSourceClient configSourceClient, ConfigSourceSet configSourceSet, - TimingValues timingValues, JRTConfigRequester requester, - MemoryCache memoryCache) { - this.config = config; - this.configSourceClient = configSourceClient; - this.configSourceSet = configSourceSet; - this.timingValues = timingValues; - this.requester = requester; - this.memoryCache = memoryCache; - } - - void subscribe() { - subscriber = new GenericConfigSubscriber(Map.of(configSourceSet, requester)); - ConfigKey<?> key = config.getKey(); - handle = subscriber.subscribe(new ConfigKey<>(key.getName(), key.getConfigId(), key.getNamespace()), - config.getDefContent(), configSourceSet, timingValues); - } - - @Override - public void run() { - do { - if (! subscriber.nextGeneration()) continue; - - try { - updateWithNewConfig(handle); - } catch (Exception e) { // To avoid thread throwing exception and loop never running this again - log.log(Level.WARNING, "Got exception: " + Exceptions.toMessageString(e)); - } catch (Throwable e) { - com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e)); - } - } while (!subscriber.isClosed()); - } - - private void updateWithNewConfig(GenericConfigHandle handle) { - RawConfig newConfig = handle.getRawConfig(); - log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + - "', generation=" + newConfig.getGeneration() + - ", payload=" + newConfig.getPayload()); - memoryCache.update(newConfig); - configSourceClient.updateSubscribers(newConfig); - } - - @Override - public void cancel() { - if (subscriber != null) { - subscriber.close(); - } - } - -} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java index 06e55eef4fa..2126d673da0 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java @@ -54,9 +54,6 @@ public class MockConfigSourceClient implements ConfigSourceClient{ } @Override - public void updateSubscribers(RawConfig config) { } - - @Override public DelayedResponses delayedResponses() { return delayedResponses; } } |