diff options
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java')
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java | 50 |
1 files changed, 24 insertions, 26 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index d063c45a3f7..8756090e420 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -1,27 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; -import java.util.logging.Level; import com.yahoo.log.LogSetup; import com.yahoo.log.event.Event; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload; import com.yahoo.yolean.system.CatchSignals; import java.util.List; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT; @@ -40,27 +40,24 @@ public class ProxyServer implements Runnable { private static final int JRT_TRANSPORT_THREADS = 4; static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070"; - private final static Logger log = Logger.getLogger(ProxyServer.class.getName()); + private static final Logger log = Logger.getLogger(ProxyServer.class.getName()); + private final AtomicBoolean signalCaught = new AtomicBoolean(false); private final Supervisor supervisor; private final ConfigProxyRpcServer rpcServer; - private ConfigSourceSet configSource; - - private volatile ConfigSourceClient configClient; - - private final MemoryCache memoryCache; private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload; + private ConfigSourceSet configSource; + private volatile ConfigSourceClient configClient; private volatile Mode mode = new Mode(DEFAULT); - ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) { - this.configSource = source; - supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); + ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) { + this.configSource = Objects.requireNonNull(source); log.log(Level.FINE, () -> "Using config source '" + source); - this.memoryCache = memoryCache; + this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); this.rpcServer = createRpcServer(spec); - this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient; + this.configClient = Objects.requireNonNull(configClient); this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source); } @@ -97,12 +94,12 @@ public class ProxyServer implements Runnable { switch (newMode.getMode()) { case MEMORYCACHE: configClient.shutdownSourceConnections(); - configClient = new MemoryCacheConfigClient(memoryCache); + configClient = new MemoryCacheConfigClient(); this.mode = new Mode(modeName); break; case DEFAULT: flush(); - configClient = createRpcClient(rpcServer, configSource, memoryCache); + configClient = createRpcClient(configSource); this.mode = new Mode(modeName); break; default: @@ -115,8 +112,8 @@ public class ProxyServer implements Runnable { return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } - private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) { - return new RpcConfigSourceClient(rpcServer, source, memoryCache); + private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) { + return new RpcConfigSourceClient(new ResponseHandler(), source); } private void setupSignalHandler() { @@ -159,7 +156,7 @@ public class ProxyServer implements Runnable { Event.started("configproxy"); ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources); - ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null); + ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources)); // catch termination and interrupt signal proxyServer.setupSignalHandler(); Thread proxyserverThread = threadFactory.newThread(proxyServer); @@ -169,7 +166,8 @@ public class ProxyServer implements Runnable { } static Properties getSystemProperties() { - final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(","); + String[] inputConfigSources = System.getProperty("proxyconfigsources", + DEFAULT_PROXY_CONFIG_SOURCES).split(","); return new Properties(inputConfigSources); } @@ -184,15 +182,15 @@ public class ProxyServer implements Runnable { // Cancels all config instances and flushes the cache. When this method returns, // the cache will not be updated again before someone calls getConfig(). private synchronized void flush() { - memoryCache.clear(); - configClient.cancel(); + configClient.memoryCache().clear(); + configClient.shutdown(); } void stop() { Event.stopping("configproxy", "shutdown rpcServer"); if (rpcServer != null) rpcServer.shutdown(); Event.stopping("configproxy", "cancel configClient"); - if (configClient != null) configClient.cancel(); + configClient.shutdown(); Event.stopping("configproxy", "flush"); flush(); Event.stopping("configproxy", "close fileDistribution"); @@ -200,8 +198,8 @@ public class ProxyServer implements Runnable { Event.stopping("configproxy", "stop complete"); } - MemoryCache getMemoryCache() { - return memoryCache; + MemoryCache memoryCache() { + return configClient.memoryCache(); } String getActiveSourceConnection() { @@ -215,7 +213,7 @@ public class ProxyServer implements Runnable { void updateSourceConnections(List<String> sources) { configSource = new ConfigSourceSet(sources); flush(); - configClient = createRpcClient(rpcServer, configSource, memoryCache); + configClient = createRpcClient(configSource); } DelayedResponses delayedResponses() { |