diff options
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java | 123 |
1 files changed, 67 insertions, 56 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 2a33e8c6928..ba58c369afd 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -10,22 +10,25 @@ import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; -import com.yahoo.log.LogLevel; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; import com.yahoo.vespa.config.ConfigCacheKey; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.TimingValues; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.logging.Logger; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; /** @@ -33,7 +36,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; * * @author hmusum */ -class RpcConfigSourceClient implements ConfigSourceClient { +class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName()); private static final double timingValuesRatio = 0.8; @@ -42,16 +45,18 @@ class RpcConfigSourceClient implements ConfigSourceClient { private final RpcServer rpcServer; private final ConfigSourceSet configSourceSet; - private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>(); - private final Object activeSubscribersLock = new Object(); + private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final static TimingValues timingValues; - private final ExecutorService exec; + private final ScheduledExecutorService nextConfigScheduler = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("next config")); + private ScheduledFuture<?> nextConfigFuture; private final JRTConfigRequester requester; // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); - private ScheduledFuture<?> delayedResponseScheduler; + private final ScheduledExecutorService delayedResponsesScheduler = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); + private ScheduledFuture<?> delayedResponsesFuture; static { // Proxy should time out before clients upon subscription. @@ -69,14 +74,10 @@ class RpcConfigSourceClient implements ConfigSourceClient { this.memoryCache = memoryCache; this.delayedResponses = new DelayedResponses(); checkConfigSources(); - exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-")); + nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); requester = JRTConfigRequester.create(configSourceSet, timingValues); - // Wait for 5 seconds initially, then run every second - delayedResponseScheduler = scheduler.scheduleAtFixedRate( - new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer), - 5, - 1, - SECONDS); + DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); + delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } /** @@ -84,7 +85,7 @@ class RpcConfigSourceClient implements ConfigSourceClient { */ private void checkConfigSources() { if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) { - log.log(LogLevel.WARNING, "No config sources defined, could not check connection"); + log.log(Level.WARNING, "No config sources defined, could not check connection"); } else { Request req = new Request("ping"); for (String configSource : configSourceSet.getSources()) { @@ -92,15 +93,15 @@ class RpcConfigSourceClient implements ConfigSourceClient { Target target = supervisor.connect(spec); target.invokeSync(req, 30.0); if (target.isValid()) { - log.log(LogLevel.DEBUG, () -> "Created connection to config source at " + spec.toString()); + log.log(Level.FINE, () -> "Created connection to config source at " + spec.toString()); return; } else { - log.log(LogLevel.INFO, "Could not connect to config source at " + spec.toString()); + log.log(Level.INFO, "Could not connect to config source at " + spec.toString()); } target.close(); } String extra = ""; - log.log(LogLevel.INFO, "Could not connect to any config source in set " + configSourceSet.toString() + + log.log(Level.INFO, "Could not connect to any config source in set " + configSourceSet.toString() + ", please make sure config server(s) are running. " + extra); } } @@ -132,11 +133,11 @@ class RpcConfigSourceClient implements ConfigSourceClient { RawConfig ret = null; if (cachedConfig != null) { - log.log(LogLevel.DEBUG, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() + + log.log(Level.FINE, () -> "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() + ",configmd5=" + cachedConfig.getConfigMd5()); - log.log(LogLevel.SPAM, () -> "input config=" + input + ",cached config=" + cachedConfig); + log.log(Level.FINEST, () -> "input config=" + input + ",cached config=" + cachedConfig); if (ProxyServer.configOrGenerationHasChanged(cachedConfig, request)) { - log.log(LogLevel.SPAM, () -> "Cached config is not equal to requested, will return it"); + log.log(Level.FINEST, () -> "Cached config is not equal to requested, will return it"); if (delayedResponses.remove(delayedResponse)) { // unless another thread already did it ret = cachedConfig; @@ -153,30 +154,36 @@ class RpcConfigSourceClient implements ConfigSourceClient { } private void subscribeToConfig(RawConfig input, ConfigCacheKey configCacheKey) { - synchronized (activeSubscribersLock) { - if (activeSubscribers.containsKey(configCacheKey)) { - log.log(LogLevel.DEBUG, () -> "Already a subscriber running for: " + configCacheKey); - } else { - log.log(LogLevel.DEBUG, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); - UpstreamConfigSubscriber subscriber = - new UpstreamConfigSubscriber(input, this, configSourceSet, timingValues, requester, memoryCache); - try { - subscriber.subscribe(); - activeSubscribers.put(configCacheKey, subscriber); - exec.execute(subscriber); - } catch (ConfigurationRuntimeException e) { - log.log(LogLevel.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); - subscriber.cancel(); - } - } + if (activeSubscribers.containsKey(configCacheKey)) return; + + log.log(Level.FINE, () -> "Could not find good config in cache, creating subscriber for: " + configCacheKey); + var subscriber = new Subscriber(input, configSourceSet, timingValues, requester); + try { + subscriber.subscribe(); + activeSubscribers.put(configCacheKey, subscriber); + } catch (ConfigurationRuntimeException e) { + log.log(Level.INFO, "Subscribe for '" + configCacheKey + "' failed, closing subscriber"); + subscriber.cancel(); } } @Override + public void run() { + activeSubscribers.values().forEach(subscriber -> { + if (!subscriber.isClosed()) { + Optional<RawConfig> config = subscriber.nextGeneration(); + config.ifPresent(this::updateWithNewConfig); + } + }); + } + + @Override public void cancel() { shutdownSourceConnections(); - delayedResponseScheduler.cancel(true); - scheduler.shutdown(); + delayedResponsesFuture.cancel(true); + delayedResponsesScheduler.shutdown(); + nextConfigFuture.cancel(true); + nextConfigScheduler.shutdown(); } /** @@ -184,13 +191,9 @@ class RpcConfigSourceClient implements ConfigSourceClient { */ @Override public void shutdownSourceConnections() { - synchronized (activeSubscribersLock) { - for (Subscriber subscriber : activeSubscribers.values()) { - subscriber.cancel(); - } - activeSubscribers.clear(); - } - exec.shutdown(); + activeSubscribers.values().forEach(Subscriber::cancel); + activeSubscribers.clear(); + nextConfigScheduler.shutdown(); requester.close(); } @@ -215,14 +218,14 @@ class RpcConfigSourceClient implements ConfigSourceClient { * @param config new config */ public void updateSubscribers(RawConfig config) { - log.log(LogLevel.DEBUG, () -> "Config updated for " + config.getKey() + "," + config.getGeneration()); + log.log(Level.FINE, () -> "Config updated for " + config.getKey() + "," + config.getGeneration()); DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses(); - log.log(LogLevel.SPAM, () -> "Delayed response queue: " + responseDelayQueue); + log.log(Level.FINEST, () -> "Delayed response queue: " + responseDelayQueue); if (responseDelayQueue.size() == 0) { - log.log(LogLevel.DEBUG, () -> "There exists no matching element on delayed response queue for " + config.getKey()); + log.log(Level.FINE, () -> "There exists no matching element on delayed response queue for " + config.getKey()); return; } else { - log.log(LogLevel.DEBUG, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements"); + log.log(Level.FINE, () -> "Delayed response queue has " + responseDelayQueue.size() + " elements"); } boolean found = false; for (DelayedResponse response : responseDelayQueue.toArray(new DelayedResponse[0])) { @@ -232,17 +235,17 @@ class RpcConfigSourceClient implements ConfigSourceClient { && (config.getGeneration() >= request.getRequestGeneration() || config.getGeneration() == 0)) { if (delayedResponses.remove(response)) { found = true; - log.log(LogLevel.DEBUG, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration()); + log.log(Level.FINE, () -> "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration()); rpcServer.returnOkResponse(request, config); } else { - log.log(LogLevel.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed"); + log.log(Level.INFO, "Could not remove " + config.getKey() + " from delayedResponses queue, already removed"); } } } if (!found) { - log.log(LogLevel.DEBUG, () -> "Found no recipient for " + config.getKey() + " in delayed response queue"); + log.log(Level.FINE, () -> "Found no recipient for " + config.getKey() + " in delayed response queue"); } - log.log(LogLevel.DEBUG, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration()); + log.log(Level.FINE, () -> "Finished updating config for " + config.getKey() + "," + config.getGeneration()); } @Override @@ -250,4 +253,12 @@ class RpcConfigSourceClient implements ConfigSourceClient { return delayedResponses; } + private void updateWithNewConfig(RawConfig newConfig) { + log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + + "', generation=" + newConfig.getGeneration() + + ", payload=" + newConfig.getPayload()); + memoryCache.update(newConfig); + updateSubscribers(newConfig); + } + } |