From 683632cd79a0cebeec7649dcf6361d27dfc283d8 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Sat, 11 Dec 2021 22:16:02 +0100 Subject: Synchronize access to subscribers map subscribeToConfig() may be called form different threads and subscribers map therefore needs locking to work properly --- .../vespa/config/proxy/RpcConfigSourceClient.java | 36 ++++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'config-proxy') diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 362ca1164b8..eb1a3e32471 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -17,6 +17,7 @@ import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,7 +46,8 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final ResponseHandler responseHandler; private final ConfigSourceSet configSourceSet; - private final Map activeSubscribers = new ConcurrentHashMap<>(); + private final Object subscribersLock = new Object(); + private final Map subscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final ScheduledExecutorService nextConfigScheduler = @@ -139,22 +141,28 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) { - if (activeSubscribers.containsKey(configCacheKey)) return; + synchronized (subscribersLock) { + if (subscribers.containsKey(configCacheKey)) return; - log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); - try { - subscriber.subscribe(); - activeSubscribers.put(configCacheKey, subscriber); - } catch (ConfigurationRuntimeException e) { - log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); - subscriber.cancel(); + log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); + var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); + try { + subscriber.subscribe(); + subscribers.put(configCacheKey, subscriber); + } catch (ConfigurationRuntimeException e) { + log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); + subscriber.cancel(); + } } } @Override public void run() { - activeSubscribers.values().forEach(subscriber -> { + Collection s; + synchronized (subscribersLock) { + s = List.copyOf(subscribers.values()); + } + s.forEach(subscriber -> { if (!subscriber.isClosed()) { Optional config = subscriber.nextGeneration(); config.ifPresent(this::updateWithNewConfig); @@ -180,8 +188,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { @Override public void shutdownSourceConnections() { log.log(Level.FINE, "Subscriber::cancel"); - activeSubscribers.values().forEach(Subscriber::cancel); - activeSubscribers.clear(); + synchronized (subscribers) { + subscribers.values().forEach(Subscriber::cancel); + subscribers.clear(); + } log.log(Level.FINE, "nextConfigFuture.cancel"); nextConfigFuture.cancel(true); log.log(Level.FINE, "nextConfigScheduler.shutdownNow"); -- cgit v1.2.3