aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-12-12 09:23:54 +0100
committerHarald Musum <musum@yahooinc.com>2021-12-12 09:23:54 +0100
commit415b8df2bab22b597cdaefb8133c009eff026dd1 (patch)
treee384e5cb346e544974972f06242775349c03d5a5 /config-proxy
parentbc117aa5fd1aaa54ae8c86c103899bc81b5d481f (diff)
parentc2523f853ce2190dd50caf1f5a661baf50475db3 (diff)
Merge branch 'master' into revert-20366-revert-20350-hmusum/config-subscription-refactoring-part-5
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java37
1 files changed, 24 insertions, 13 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index e6a3d66f340..56fdae477b2 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -17,6 +17,7 @@ import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -45,7 +46,8 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final ResponseHandler responseHandler;
private final ConfigSourceSet configSourceSet;
- private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
+ private final Object subscribersLock = new Object();
+ private final Map<ConfigCacheKey, Subscriber> subscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
private final ScheduledExecutorService nextConfigScheduler =
@@ -139,22 +141,29 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
- if (activeSubscribers.containsKey(configCacheKey)) return;
+ synchronized (subscribersLock) {
+ if (subscribers.containsKey(configCacheKey)) return;
- log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
- var subscriber = new Subscriber(input, timingValues, requesters.getRequester(configSourceSet, timingValues));
- try {
- subscriber.subscribe();
- activeSubscribers.put(configCacheKey, subscriber);
- } catch (ConfigurationRuntimeException e) {
- log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
- subscriber.cancel();
+ log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
+ var subscriber = new Subscriber(input, timingValues, requesters
+ .getRequester(configSourceSet, timingValues));
+ try {
+ subscriber.subscribe();
+ subscribers.put(configCacheKey, subscriber);
+ } catch (ConfigurationRuntimeException e) {
+ log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
+ subscriber.cancel();
+ }
}
}
@Override
public void run() {
- activeSubscribers.values().forEach(subscriber -> {
+ Collection<Subscriber> s;
+ synchronized (subscribersLock) {
+ s = List.copyOf(subscribers.values());
+ }
+ s.forEach(subscriber -> {
if (!subscriber.isClosed()) {
Optional<RawConfig> config = subscriber.nextGeneration();
config.ifPresent(this::updateWithNewConfig);
@@ -180,8 +189,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
@Override
public void shutdownSourceConnections() {
log.log(Level.FINE, "Subscriber::cancel");
- activeSubscribers.values().forEach(Subscriber::cancel);
- activeSubscribers.clear();
+ synchronized (subscribers) {
+ subscribers.values().forEach(Subscriber::cancel);
+ subscribers.clear();
+ }
log.log(Level.FINE, "nextConfigFuture.cancel");
nextConfigFuture.cancel(true);
log.log(Level.FINE, "nextConfigScheduler.shutdownNow");