diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-02-21 12:18:04 +0100 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-02-21 12:18:04 +0100 |
commit | 8dafc34949693548a5a30566112f306c613b7922 (patch) | |
tree | dabfc8ae1c47e9d9ef2482115956c02cde1de305 /config-proxy | |
parent | 25a3eb4ce927bd5ea1225bfb77b69ce3512b0647 (diff) |
Move delayed responses and scheduler into RpcConfigSourceClient
Diffstat (limited to 'config-proxy')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java | 29 | ||||
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java | 23 |
2 files changed, 23 insertions, 29 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 72c8db28b21..fc1c7f62048 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; @@ -15,14 +14,10 @@ import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownl import com.yahoo.yolean.system.CatchSignals; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT; -import static java.util.concurrent.TimeUnit.SECONDS; /** * A proxy server that handles RPC config requests. The proxy can run in two modes: @@ -39,14 +34,9 @@ public class ProxyServer implements Runnable { private final static Logger log = Logger.getLogger(ProxyServer.class.getName()); private final AtomicBoolean signalCaught = new AtomicBoolean(false); - - // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); private final Supervisor supervisor = new Supervisor(new Transport(JRT_TRANSPORT_THREADS)); - private ScheduledFuture<?> delayedResponseScheduler; private final ConfigProxyRpcServer rpcServer; - final DelayedResponses delayedResponses; private ConfigSourceSet configSource; private volatile ConfigSourceClient configClient; @@ -57,12 +47,11 @@ public class ProxyServer implements Runnable { private volatile Mode mode = new Mode(DEFAULT); ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) { - this.delayedResponses = new DelayedResponses(); this.configSource = source; log.log(LogLevel.DEBUG, "Using config source '" + source); this.memoryCache = memoryCache; this.rpcServer = createRpcServer(spec); - this.configClient = (configClient == null) ? createRpcClient(rpcServer, delayedResponses, source, memoryCache) : configClient; + this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient; this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source); } @@ -73,11 +62,6 @@ public class ProxyServer implements Runnable { t.setName("RpcServer"); t.start(); } - // Wait for 5 seconds initially, then run every second - delayedResponseScheduler = scheduler.scheduleAtFixedRate(new DelayedResponseHandler(delayedResponses, - memoryCache, - rpcServer), - 5, 1, SECONDS); } RawConfig resolveConfig(JRTServerConfigRequest req) { @@ -109,7 +93,7 @@ public class ProxyServer implements Runnable { break; case DEFAULT: flush(); - configClient = createRpcClient(rpcServer, delayedResponses, configSource, memoryCache); + configClient = createRpcClient(rpcServer, configSource, memoryCache); this.mode = new Mode(modeName); break; default: @@ -122,9 +106,8 @@ public class ProxyServer implements Runnable { return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } - private RpcConfigSourceClient createRpcClient(RpcServer rpcServer, DelayedResponses delayedResponses, - ConfigSourceSet source, MemoryCache memoryCache) { - return new RpcConfigSourceClient(rpcServer, source, memoryCache, delayedResponses); + private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) { + return new RpcConfigSourceClient(rpcServer, source, memoryCache); } private void setupSignalHandler() { @@ -191,7 +174,7 @@ public class ProxyServer implements Runnable { void stop() { Event.stopping("configproxy", "shutdown"); if (rpcServer != null) rpcServer.shutdown(); - if (delayedResponseScheduler != null) delayedResponseScheduler.cancel(true); + if (configClient != null) configClient.cancel(); flush(); fileDistributionAndUrlDownload.close(); } @@ -211,7 +194,7 @@ public class ProxyServer implements Runnable { void updateSourceConnections(List<String> sources) { configSource = new ConfigSourceSet(sources); flush(); - configClient = createRpcClient(rpcServer, delayedResponses, configSource, memoryCache); + configClient = createRpcClient(rpcServer, configSource, memoryCache); } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index a674a43be8d..94d491b964d 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -22,8 +22,12 @@ import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.logging.Logger; +import static java.util.concurrent.TimeUnit.SECONDS; + /** * An Rpc client to a config source * @@ -43,9 +47,11 @@ class RpcConfigSourceClient implements ConfigSourceClient { private final MemoryCache memoryCache; private final DelayedResponses delayedResponses; private final static TimingValues timingValues; - private final ExecutorService exec; private final JRTConfigRequester requester; + // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); + private ScheduledFuture<?> delayedResponseScheduler; static { // Proxy should time out before clients upon subscription. @@ -57,17 +63,20 @@ class RpcConfigSourceClient implements ConfigSourceClient { timingValues = tv; } - RpcConfigSourceClient(RpcServer rpcServer, - ConfigSourceSet configSourceSet, - MemoryCache memoryCache, - DelayedResponses delayedResponses) { + RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) { this.rpcServer = rpcServer; this.configSourceSet = configSourceSet; this.memoryCache = memoryCache; - this.delayedResponses = delayedResponses; + this.delayedResponses = new DelayedResponses(); checkConfigSources(); exec = Executors.newCachedThreadPool(new DaemonThreadFactory("subscriber-")); requester = JRTConfigRequester.create(configSourceSet, timingValues); + // Wait for 5 seconds initially, then run every second + delayedResponseScheduler = scheduler.scheduleAtFixedRate( + new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer), + 5, + 1, + SECONDS); } /** @@ -166,6 +175,8 @@ class RpcConfigSourceClient implements ConfigSourceClient { @Override public void cancel() { shutdownSourceConnections(); + delayedResponseScheduler.cancel(true); + scheduler.shutdown(); } /** |