aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-12-11 22:16:02 +0100
committerHarald Musum <musum@yahooinc.com>2021-12-11 22:16:02 +0100
commit683632cd79a0cebeec7649dcf6361d27dfc283d8 (patch)
treef45fddb31b3aca60dfb725ba5b719b2458ba270b /config-proxy
parent60b142c007083c773e910b44cc57d65e7f2c9274 (diff)
Synchronize access to subscribers map
subscribeToConfig() may be called form different threads and subscribers map therefore needs locking to work properly
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java36
1 files changed, 23 insertions, 13 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 362ca1164b8..eb1a3e32471 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -17,6 +17,7 @@ import com.yahoo.vespa.config.TimingValues;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -45,7 +46,8 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final ResponseHandler responseHandler;
private final ConfigSourceSet configSourceSet;
- private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
+ private final Object subscribersLock = new Object();
+ private final Map<ConfigCacheKey, Subscriber> subscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses;
private final ScheduledExecutorService nextConfigScheduler =
@@ -139,22 +141,28 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) {
- if (activeSubscribers.containsKey(configCacheKey)) return;
+ synchronized (subscribersLock) {
+ if (subscribers.containsKey(configCacheKey)) return;
- log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
- var subscriber = new Subscriber(input, configSourceSet, timingValues, requester);
- try {
- subscriber.subscribe();
- activeSubscribers.put(configCacheKey, subscriber);
- } catch (ConfigurationRuntimeException e) {
- log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
- subscriber.cancel();
+ log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey);
+ var subscriber = new Subscriber(input, configSourceSet, timingValues, requester);
+ try {
+ subscriber.subscribe();
+ subscribers.put(configCacheKey, subscriber);
+ } catch (ConfigurationRuntimeException e) {
+ log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber");
+ subscriber.cancel();
+ }
}
}
@Override
public void run() {
- activeSubscribers.values().forEach(subscriber -> {
+ Collection<Subscriber> s;
+ synchronized (subscribersLock) {
+ s = List.copyOf(subscribers.values());
+ }
+ s.forEach(subscriber -> {
if (!subscriber.isClosed()) {
Optional<RawConfig> config = subscriber.nextGeneration();
config.ifPresent(this::updateWithNewConfig);
@@ -180,8 +188,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
@Override
public void shutdownSourceConnections() {
log.log(Level.FINE, "Subscriber::cancel");
- activeSubscribers.values().forEach(Subscriber::cancel);
- activeSubscribers.clear();
+ synchronized (subscribers) {
+ subscribers.values().forEach(Subscriber::cancel);
+ subscribers.clear();
+ }
log.log(Level.FINE, "nextConfigFuture.cancel");
nextConfigFuture.cancel(true);
log.log(Level.FINE, "nextConfigScheduler.shutdownNow");