aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java50
1 files changed, 24 insertions, 26 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index d063c45a3f7..8756090e420 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -1,27 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
-import java.util.logging.Level;
import com.yahoo.log.LogSetup;
import com.yahoo.log.event.Event;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload;
import com.yahoo.yolean.system.CatchSignals;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
@@ -40,27 +40,24 @@ public class ProxyServer implements Runnable {
private static final int JRT_TRANSPORT_THREADS = 4;
static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";
- private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
+ private static final Logger log = Logger.getLogger(ProxyServer.class.getName());
+
private final AtomicBoolean signalCaught = new AtomicBoolean(false);
private final Supervisor supervisor;
private final ConfigProxyRpcServer rpcServer;
- private ConfigSourceSet configSource;
-
- private volatile ConfigSourceClient configClient;
-
- private final MemoryCache memoryCache;
private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;
+ private ConfigSourceSet configSource;
+ private volatile ConfigSourceClient configClient;
private volatile Mode mode = new Mode(DEFAULT);
- ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) {
- this.configSource = source;
- supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
+ ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) {
+ this.configSource = Objects.requireNonNull(source);
log.log(Level.FINE, () -> "Using config source '" + source);
- this.memoryCache = memoryCache;
+ this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
this.rpcServer = createRpcServer(spec);
- this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient;
+ this.configClient = Objects.requireNonNull(configClient);
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -97,12 +94,12 @@ public class ProxyServer implements Runnable {
switch (newMode.getMode()) {
case MEMORYCACHE:
configClient.shutdownSourceConnections();
- configClient = new MemoryCacheConfigClient(memoryCache);
+ configClient = new MemoryCacheConfigClient();
this.mode = new Mode(modeName);
break;
case DEFAULT:
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
this.mode = new Mode(modeName);
break;
default:
@@ -115,8 +112,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) {
- return new RpcConfigSourceClient(rpcServer, source, memoryCache);
+ private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) {
+ return new RpcConfigSourceClient(new ResponseHandler(), source);
}
private void setupSignalHandler() {
@@ -159,7 +156,7 @@ public class ProxyServer implements Runnable {
Event.started("configproxy");
ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources);
- ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null);
+ ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources));
// catch termination and interrupt signal
proxyServer.setupSignalHandler();
Thread proxyserverThread = threadFactory.newThread(proxyServer);
@@ -169,7 +166,8 @@ public class ProxyServer implements Runnable {
}
static Properties getSystemProperties() {
- final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
+ String[] inputConfigSources = System.getProperty("proxyconfigsources",
+ DEFAULT_PROXY_CONFIG_SOURCES).split(",");
return new Properties(inputConfigSources);
}
@@ -184,15 +182,15 @@ public class ProxyServer implements Runnable {
// Cancels all config instances and flushes the cache. When this method returns,
// the cache will not be updated again before someone calls getConfig().
private synchronized void flush() {
- memoryCache.clear();
- configClient.cancel();
+ configClient.memoryCache().clear();
+ configClient.shutdown();
}
void stop() {
Event.stopping("configproxy", "shutdown rpcServer");
if (rpcServer != null) rpcServer.shutdown();
Event.stopping("configproxy", "cancel configClient");
- if (configClient != null) configClient.cancel();
+ configClient.shutdown();
Event.stopping("configproxy", "flush");
flush();
Event.stopping("configproxy", "close fileDistribution");
@@ -200,8 +198,8 @@ public class ProxyServer implements Runnable {
Event.stopping("configproxy", "stop complete");
}
- MemoryCache getMemoryCache() {
- return memoryCache;
+ MemoryCache memoryCache() {
+ return configClient.memoryCache();
}
String getActiveSourceConnection() {
@@ -215,7 +213,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
}
DelayedResponses delayedResponses() {