diff options
author | Harald Musum <musum@yahooinc.com> | 2021-12-11 22:16:02 +0100 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2021-12-11 22:16:02 +0100 |
commit | 683632cd79a0cebeec7649dcf6361d27dfc283d8 (patch) | |
tree | f45fddb31b3aca60dfb725ba5b719b2458ba270b | |
parent | 60b142c007083c773e910b44cc57d65e7f2c9274 (diff) |
Synchronize access to subscribers map
subscribeToConfig() may be called form different threads and
subscribers map therefore needs locking to work properly
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 362ca1164b8..eb1a3e32471 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -17,6 +17,7 @@ import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,7 +46,8 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final ResponseHandler responseHandler; private final ConfigSourceSet configSourceSet; - private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); + private final Object subscribersLock = new Object(); + private final Map<ConfigCacheKey, Subscriber> subscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final ScheduledExecutorService nextConfigScheduler = @@ -139,22 +141,28 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) { - if (activeSubscribers.containsKey(configCacheKey)) return; + synchronized (subscribersLock) { + if (subscribers.containsKey(configCacheKey)) return; - log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); - try { - subscriber.subscribe(); - activeSubscribers.put(configCacheKey, subscriber); - } catch (ConfigurationRuntimeException e) { - log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); - subscriber.cancel(); + log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); + var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); + try { + subscriber.subscribe(); + subscribers.put(configCacheKey, subscriber); + } catch (ConfigurationRuntimeException e) { + log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); + subscriber.cancel(); + } } } @Override public void run() { - activeSubscribers.values().forEach(subscriber -> { + Collection<Subscriber> s; + synchronized (subscribersLock) { + s = List.copyOf(subscribers.values()); + } + s.forEach(subscriber -> { if (!subscriber.isClosed()) { Optional<RawConfig> config = subscriber.nextGeneration(); config.ifPresent(this::updateWithNewConfig); @@ -180,8 +188,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { @Override public void shutdownSourceConnections() { log.log(Level.FINE, "Subscriber::cancel"); - activeSubscribers.values().forEach(Subscriber::cancel); - activeSubscribers.clear(); + synchronized (subscribers) { + subscribers.values().forEach(Subscriber::cancel); + subscribers.clear(); + } log.log(Level.FINE, "nextConfigFuture.cancel"); nextConfigFuture.cancel(true); log.log(Level.FINE, "nextConfigScheduler.shutdownNow"); |