diff options
author | Harald Musum <musum@yahooinc.com> | 2021-12-12 09:23:54 +0100 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2021-12-12 09:23:54 +0100 |
commit | 415b8df2bab22b597cdaefb8133c009eff026dd1 (patch) | |
tree | e384e5cb346e544974972f06242775349c03d5a5 /config-proxy/src/main | |
parent | bc117aa5fd1aaa54ae8c86c103899bc81b5d481f (diff) | |
parent | c2523f853ce2190dd50caf1f5a661baf50475db3 (diff) |
Merge branch 'master' into revert-20366-revert-20350-hmusum/config-subscription-refactoring-part-5
Diffstat (limited to 'config-proxy/src/main')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index e6a3d66f340..56fdae477b2 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -17,6 +17,7 @@ import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,7 +46,8 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final ResponseHandler responseHandler; private final ConfigSourceSet configSourceSet; - private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); + private final Object subscribersLock = new Object(); + private final Map<ConfigCacheKey, Subscriber> subscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final ScheduledExecutorService nextConfigScheduler = @@ -139,22 +141,29 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) { - if (activeSubscribers.containsKey(configCacheKey)) return; + synchronized (subscribersLock) { + if (subscribers.containsKey(configCacheKey)) return; - log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - var subscriber = new Subscriber(input, timingValues, requesters.getRequester(configSourceSet, timingValues)); - try { - subscriber.subscribe(); - activeSubscribers.put(configCacheKey, subscriber); - } catch (ConfigurationRuntimeException e) { - log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); - subscriber.cancel(); + log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); + var subscriber = new Subscriber(input, timingValues, requesters + .getRequester(configSourceSet, timingValues)); + try { + subscriber.subscribe(); + subscribers.put(configCacheKey, subscriber); + } catch (ConfigurationRuntimeException e) { + log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); + subscriber.cancel(); + } } } @Override public void run() { - activeSubscribers.values().forEach(subscriber -> { + Collection<Subscriber> s; + synchronized (subscribersLock) { + s = List.copyOf(subscribers.values()); + } + s.forEach(subscriber -> { if (!subscriber.isClosed()) { Optional<RawConfig> config = subscriber.nextGeneration(); config.ifPresent(this::updateWithNewConfig); @@ -180,8 +189,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { @Override public void shutdownSourceConnections() { log.log(Level.FINE, "Subscriber::cancel"); - activeSubscribers.values().forEach(Subscriber::cancel); - activeSubscribers.clear(); + synchronized (subscribers) { + subscribers.values().forEach(Subscriber::cancel); + subscribers.clear(); + } log.log(Level.FINE, "nextConfigFuture.cancel"); nextConfigFuture.cancel(true); log.log(Level.FINE, "nextConfigScheduler.shutdownNow"); |