aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'config-proxy/src/main/java/com')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java66
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java4
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java25
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java15
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java50
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java63
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java21
7 files changed, 135 insertions, 109 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index c7f6530f81c..3f03725fb60 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -30,10 +30,10 @@ import java.util.logging.Logger;
*
* @author hmusum
*/
-public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
+public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
- private static final int TRACELEVEL = 6;
+ static final int TRACELEVEL = 6;
private final Spec spec;
private final Supervisor supervisor;
@@ -79,10 +79,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
this::ping)
.methodDesc("ping")
.returnDesc(0, "ret code", "return code, 0 is OK"));
- supervisor.addMethod(new Method("printStatistics", "", "s",
- this::printStatistics)
- .methodDesc("printStatistics")
- .returnDesc(0, "statistics", "Statistics for server"));
supervisor.addMethod(new Method("listCachedConfig", "", "S",
this::listCachedConfig)
.methodDesc("list cached configs)")
@@ -145,26 +141,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
});
}
- /**
- * Returns a String with statistics data for the server.
- *
- * @param req a Request
- */
- private void printStatistics(Request req) {
- dispatchRpcRequest(req, () -> {
- StringBuilder sb = new StringBuilder();
- sb.append("\nDelayed responses queue size: ");
- sb.append(proxyServer.delayedResponses().size());
- sb.append("\nContents: ");
- for (DelayedResponse delayed : proxyServer.delayedResponses().responses()) {
- sb.append(delayed.getRequest().toString()).append("\n");
- }
-
- req.returnValues().add(new StringValue(sb.toString()));
- req.returnRequest();
- });
- }
-
private void listCachedConfig(Request req) {
dispatchRpcRequest(req, () -> listCachedConfig(req, false));
}
@@ -201,7 +177,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void invalidateCache(Request req) {
dispatchRpcRequest(req, () -> {
- proxyServer.getMemoryCache().clear();
+ proxyServer.memoryCache().clear();
String[] s = new String[2];
s[0] = "0";
s[1] = "success";
@@ -237,7 +213,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void dumpCache(Request req) {
dispatchRpcRequest(req, () -> {
- final MemoryCache memoryCache = proxyServer.getMemoryCache();
+ final MemoryCache memoryCache = proxyServer.memoryCache();
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
req.returnRequest();
});
@@ -269,12 +245,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
* @param request a Request
*/
private void getConfigImpl(JRTServerConfigRequest request) {
+ ResponseHandler responseHandler = new ResponseHandler();
request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums());
if (!request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
return;
}
try {
@@ -282,13 +259,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
if (config == null) {
log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
} else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response");
}
} catch (Exception e) {
e.printStackTrace();
- returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
}
}
@@ -302,7 +279,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void listCachedConfig(Request req, boolean full) {
String[] ret;
- MemoryCache cache = proxyServer.getMemoryCache();
+ MemoryCache cache = proxyServer.memoryCache();
ret = new String[cache.size()];
int i = 0;
for (RawConfig config : cache.values()) {
@@ -348,29 +325,4 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
// requesting this config?
}
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
- request.addOkResponse(config.getPayload(),
- config.getGeneration(),
- config.applyOnRestart(),
- config.getPayloadChecksums());
- log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
- ",generation=" + config.getGeneration());
- log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
-
-
- // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
- // TODO Move logic so that all requests are returned in CheckDelayedResponse
- try {
- request.getRequest().returnRequest();
- } catch (IllegalStateException e) {
- log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
- }
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
- request.addErrorResponse(errorCode, message);
- request.getRequest().returnRequest();
- }
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
index 6e5fe2d3fd8..dae732e56ec 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
@@ -16,7 +16,7 @@ interface ConfigSourceClient {
RawConfig getConfig(RawConfig input, JRTServerConfigRequest request);
- void cancel();
+ void shutdown();
void shutdownSourceConnections();
@@ -26,4 +26,6 @@ interface ConfigSourceClient {
DelayedResponses delayedResponses();
+ MemoryCache memoryCache();
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
index f77bd4b9138..0e8ebe0d9c9 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
@@ -6,10 +6,12 @@ import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.yolean.Exceptions;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.yahoo.protect.Process.logAndDie;
+
/**
* The run method of this class is executed periodically to return delayed responses
* (requests use long polling, so config proxy needs to return a response when they time out).
@@ -22,12 +24,13 @@ public class DelayedResponseHandler implements Runnable {
private final DelayedResponses delayedResponses;
private final MemoryCache memoryCache;
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
+ private final AtomicLong sentResponses = new AtomicLong();
- DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) {
+ DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) {
this.delayedResponses = delayedResponses;
this.memoryCache = memoryCache;
- this.rpcServer = rpcServer;
+ this.responseHandler = responseHandler;
}
@Override
@@ -41,25 +44,27 @@ public class DelayedResponseHandler implements Runnable {
log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() +
" delayed responses. First one is " + delayedResponses.responses().peek());
DelayedResponse response;
- AtomicInteger i = new AtomicInteger(0);
while ((response = delayedResponses.responses().poll()) != null) {
JRTServerConfigRequest request = response.getRequest();
ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5());
RawConfig config = memoryCache.get(cacheKey);
if (config != null) {
- rpcServer.returnOkResponse(request, config);
- i.incrementAndGet();
+ responseHandler.returnOkResponse(request, config);
+ sentResponses.incrementAndGet();
} else {
log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " +
request.getConfigKey() + ", will retry");
}
}
- log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " +
- (System.currentTimeMillis() - start) + " ms");
+ log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() +
+ " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) { // To avoid thread throwing exception and executor never running this again
log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e));
} catch (Throwable e) {
- com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
+ logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
}
}
+
+ public long sentResponses() { return sentResponses.get(); }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
index 6e90ad16f50..7ae8501278d 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
@@ -1,12 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import java.util.logging.Level;
-import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -20,8 +22,8 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
private final MemoryCache cache;
private final DelayedResponses delayedResponses = new DelayedResponses();
- MemoryCacheConfigClient(MemoryCache cache) {
- this.cache = cache;
+ MemoryCacheConfigClient() {
+ this.cache = new MemoryCache();
}
/**
@@ -44,7 +46,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
}
@Override
- public void cancel() {}
+ public void shutdown() {}
@Override
public void shutdownSourceConnections() {}
@@ -64,4 +66,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
return delayedResponses;
}
+ @Override
+ public MemoryCache memoryCache() { return cache; }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index d063c45a3f7..8756090e420 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -1,27 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
-import java.util.logging.Level;
import com.yahoo.log.LogSetup;
import com.yahoo.log.event.Event;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload;
import com.yahoo.yolean.system.CatchSignals;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
@@ -40,27 +40,24 @@ public class ProxyServer implements Runnable {
private static final int JRT_TRANSPORT_THREADS = 4;
static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";
- private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
+ private static final Logger log = Logger.getLogger(ProxyServer.class.getName());
+
private final AtomicBoolean signalCaught = new AtomicBoolean(false);
private final Supervisor supervisor;
private final ConfigProxyRpcServer rpcServer;
- private ConfigSourceSet configSource;
-
- private volatile ConfigSourceClient configClient;
-
- private final MemoryCache memoryCache;
private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;
+ private ConfigSourceSet configSource;
+ private volatile ConfigSourceClient configClient;
private volatile Mode mode = new Mode(DEFAULT);
- ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) {
- this.configSource = source;
- supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
+ ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) {
+ this.configSource = Objects.requireNonNull(source);
log.log(Level.FINE, () -> "Using config source '" + source);
- this.memoryCache = memoryCache;
+ this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
this.rpcServer = createRpcServer(spec);
- this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient;
+ this.configClient = Objects.requireNonNull(configClient);
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -97,12 +94,12 @@ public class ProxyServer implements Runnable {
switch (newMode.getMode()) {
case MEMORYCACHE:
configClient.shutdownSourceConnections();
- configClient = new MemoryCacheConfigClient(memoryCache);
+ configClient = new MemoryCacheConfigClient();
this.mode = new Mode(modeName);
break;
case DEFAULT:
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
this.mode = new Mode(modeName);
break;
default:
@@ -115,8 +112,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) {
- return new RpcConfigSourceClient(rpcServer, source, memoryCache);
+ private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) {
+ return new RpcConfigSourceClient(new ResponseHandler(), source);
}
private void setupSignalHandler() {
@@ -159,7 +156,7 @@ public class ProxyServer implements Runnable {
Event.started("configproxy");
ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources);
- ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null);
+ ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources));
// catch termination and interrupt signal
proxyServer.setupSignalHandler();
Thread proxyserverThread = threadFactory.newThread(proxyServer);
@@ -169,7 +166,8 @@ public class ProxyServer implements Runnable {
}
static Properties getSystemProperties() {
- final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
+ String[] inputConfigSources = System.getProperty("proxyconfigsources",
+ DEFAULT_PROXY_CONFIG_SOURCES).split(",");
return new Properties(inputConfigSources);
}
@@ -184,15 +182,15 @@ public class ProxyServer implements Runnable {
// Cancels all config instances and flushes the cache. When this method returns,
// the cache will not be updated again before someone calls getConfig().
private synchronized void flush() {
- memoryCache.clear();
- configClient.cancel();
+ configClient.memoryCache().clear();
+ configClient.shutdown();
}
void stop() {
Event.stopping("configproxy", "shutdown rpcServer");
if (rpcServer != null) rpcServer.shutdown();
Event.stopping("configproxy", "cancel configClient");
- if (configClient != null) configClient.cancel();
+ configClient.shutdown();
Event.stopping("configproxy", "flush");
flush();
Event.stopping("configproxy", "close fileDistribution");
@@ -200,8 +198,8 @@ public class ProxyServer implements Runnable {
Event.stopping("configproxy", "stop complete");
}
- MemoryCache getMemoryCache() {
- return memoryCache;
+ MemoryCache memoryCache() {
+ return configClient.memoryCache();
}
String getActiveSourceConnection() {
@@ -215,7 +213,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
}
DelayedResponses delayedResponses() {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
new file mode 100644
index 00000000000..c9cfbdd3e16
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL;
+
+/**
+ * An RPC server that handles config and file distribution requests.
+ *
+ * @author hmusum
+ */
+public class ResponseHandler {
+
+ private final Optional<AtomicLong> sentResponses;
+
+ public ResponseHandler() {
+ this(false);
+ }
+
+ // For testing only
+ ResponseHandler(boolean trackResponses) {
+ sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty();
+ }
+
+ private final static Logger log = Logger.getLogger(ResponseHandler.class.getName());
+
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
+ request.addOkResponse(config.getPayload(),
+ config.getGeneration(),
+ config.applyOnRestart(),
+ config.getPayloadChecksums());
+ log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
+ ",generation=" + config.getGeneration());
+ log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
+
+
+ // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
+ // TODO Move logic so that all requests are returned in CheckDelayedResponse
+ try {
+ request.getRequest().returnRequest();
+ } catch (IllegalStateException e) {
+ log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
+ }
+ sentResponses.ifPresent(AtomicLong::getAndIncrement);
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
+ request.addErrorResponse(errorCode, message);
+ request.getRequest().returnRequest();
+ }
+
+ public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 5df7b1fc021..362ca1164b8 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
private final ConfigSourceSet configSourceSet;
private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
@@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
private final ScheduledFuture<?> delayedResponsesFuture;
- RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
- this.rpcServer = rpcServer;
+ RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) {
+ this.responseHandler = responseHandler;
this.configSourceSet = configSourceSet;
- this.memoryCache = memoryCache;
+ this.memoryCache = new MemoryCache();
this.delayedResponses = new DelayedResponses();
checkConfigSources();
nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
this.requester = JRTConfigRequester.create(configSourceSet, timingValues);
- DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler);
this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
@@ -163,7 +163,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public void cancel() {
+ public void shutdown() {
log.log(Level.FINE, "shutdownSourceConnections");
shutdownSourceConnections();
log.log(Level.FINE, "delayedResponsesFuture.cancel");
@@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation);
if (config.getPayload().getData().getByteLength() == 0)
log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config");
- rpcServer.returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed");
}
@@ -243,9 +243,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public DelayedResponses delayedResponses() {
- return delayedResponses;
- }
+ public DelayedResponses delayedResponses() { return delayedResponses; }
+
+ @Override
+ public MemoryCache memoryCache() { return memoryCache; }
private void updateWithNewConfig(RawConfig newConfig) {
log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +