aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-12-10 17:54:52 +0100
committerGitHub <noreply@github.com>2021-12-10 17:54:52 +0100
commit3fca6584773338c6166c1f3016f384f834e97426 (patch)
treeb5972b00b0e292d13100e8b29140935fb61f4ba3 /config-proxy
parent78a583b6db9c8b887528c3138e85188ccbb48c80 (diff)
Revert "Revert "Hmusum/refactor config proxy [run-systemtest]""
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java66
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java4
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java25
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java15
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java50
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java63
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java21
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java22
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java10
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java5
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java9
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java36
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java8
14 files changed, 173 insertions, 184 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index 8791918dab0..158df654439 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -30,10 +30,10 @@ import java.util.logging.Logger;
*
* @author hmusum
*/
-public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
+public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
- private static final int TRACELEVEL = 6;
+ static final int TRACELEVEL = 6;
private final Spec spec;
private final Supervisor supervisor;
@@ -79,10 +79,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
this::ping)
.methodDesc("ping")
.returnDesc(0, "ret code", "return code, 0 is OK"));
- supervisor.addMethod(new Method("printStatistics", "", "s",
- this::printStatistics)
- .methodDesc("printStatistics")
- .returnDesc(0, "statistics", "Statistics for server"));
supervisor.addMethod(new Method("listCachedConfig", "", "S",
this::listCachedConfig)
.methodDesc("list cached configs)")
@@ -145,26 +141,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
});
}
- /**
- * Returns a String with statistics data for the server.
- *
- * @param req a Request
- */
- private void printStatistics(Request req) {
- dispatchRpcRequest(req, () -> {
- StringBuilder sb = new StringBuilder();
- sb.append("\nDelayed responses queue size: ");
- sb.append(proxyServer.delayedResponses().size());
- sb.append("\nContents: ");
- for (DelayedResponse delayed : proxyServer.delayedResponses().responses()) {
- sb.append(delayed.getRequest().toString()).append("\n");
- }
-
- req.returnValues().add(new StringValue(sb.toString()));
- req.returnRequest();
- });
- }
-
private void listCachedConfig(Request req) {
dispatchRpcRequest(req, () -> listCachedConfig(req, false));
}
@@ -201,7 +177,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void invalidateCache(Request req) {
dispatchRpcRequest(req, () -> {
- proxyServer.getMemoryCache().clear();
+ proxyServer.memoryCache().clear();
String[] s = new String[2];
s[0] = "0";
s[1] = "success";
@@ -237,7 +213,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void dumpCache(Request req) {
dispatchRpcRequest(req, () -> {
- final MemoryCache memoryCache = proxyServer.getMemoryCache();
+ final MemoryCache memoryCache = proxyServer.memoryCache();
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
req.returnRequest();
});
@@ -269,12 +245,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
* @param request a Request
*/
private void getConfigImpl(JRTServerConfigRequest request) {
+ ResponseHandler responseHandler = new ResponseHandler();
request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums());
if (!request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
return;
}
try {
@@ -282,13 +259,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
if (config == null) {
log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
} else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response");
}
} catch (Exception e) {
e.printStackTrace();
- returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
}
}
@@ -302,7 +279,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private void listCachedConfig(Request req, boolean full) {
String[] ret;
- MemoryCache cache = proxyServer.getMemoryCache();
+ MemoryCache cache = proxyServer.memoryCache();
ret = new String[cache.size()];
int i = 0;
for (RawConfig config : cache.values()) {
@@ -348,29 +325,4 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
// requesting this config?
}
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
- request.addOkResponse(config.getPayload(),
- config.getGeneration(),
- config.applyOnRestart(),
- config.getPayloadChecksums());
- log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
- ",generation=" + config.getGeneration());
- log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
-
-
- // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
- // TODO Move logic so that all requests are returned in CheckDelayedResponse
- try {
- request.getRequest().returnRequest();
- } catch (IllegalStateException e) {
- log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
- }
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
- request.addErrorResponse(errorCode, message);
- request.getRequest().returnRequest();
- }
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
index 6e5fe2d3fd8..dae732e56ec 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
@@ -16,7 +16,7 @@ interface ConfigSourceClient {
RawConfig getConfig(RawConfig input, JRTServerConfigRequest request);
- void cancel();
+ void shutdown();
void shutdownSourceConnections();
@@ -26,4 +26,6 @@ interface ConfigSourceClient {
DelayedResponses delayedResponses();
+ MemoryCache memoryCache();
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
index f77bd4b9138..0e8ebe0d9c9 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
@@ -6,10 +6,12 @@ import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.yolean.Exceptions;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.yahoo.protect.Process.logAndDie;
+
/**
* The run method of this class is executed periodically to return delayed responses
* (requests use long polling, so config proxy needs to return a response when they time out).
@@ -22,12 +24,13 @@ public class DelayedResponseHandler implements Runnable {
private final DelayedResponses delayedResponses;
private final MemoryCache memoryCache;
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
+ private final AtomicLong sentResponses = new AtomicLong();
- DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) {
+ DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) {
this.delayedResponses = delayedResponses;
this.memoryCache = memoryCache;
- this.rpcServer = rpcServer;
+ this.responseHandler = responseHandler;
}
@Override
@@ -41,25 +44,27 @@ public class DelayedResponseHandler implements Runnable {
log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() +
" delayed responses. First one is " + delayedResponses.responses().peek());
DelayedResponse response;
- AtomicInteger i = new AtomicInteger(0);
while ((response = delayedResponses.responses().poll()) != null) {
JRTServerConfigRequest request = response.getRequest();
ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5());
RawConfig config = memoryCache.get(cacheKey);
if (config != null) {
- rpcServer.returnOkResponse(request, config);
- i.incrementAndGet();
+ responseHandler.returnOkResponse(request, config);
+ sentResponses.incrementAndGet();
} else {
log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " +
request.getConfigKey() + ", will retry");
}
}
- log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " +
- (System.currentTimeMillis() - start) + " ms");
+ log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() +
+ " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) { // To avoid thread throwing exception and executor never running this again
log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e));
} catch (Throwable e) {
- com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
+ logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
}
}
+
+ public long sentResponses() { return sentResponses.get(); }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
index 6e90ad16f50..7ae8501278d 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
@@ -1,12 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import java.util.logging.Level;
-import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.Collections;
import java.util.List;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -20,8 +22,8 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
private final MemoryCache cache;
private final DelayedResponses delayedResponses = new DelayedResponses();
- MemoryCacheConfigClient(MemoryCache cache) {
- this.cache = cache;
+ MemoryCacheConfigClient() {
+ this.cache = new MemoryCache();
}
/**
@@ -44,7 +46,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
}
@Override
- public void cancel() {}
+ public void shutdown() {}
@Override
public void shutdownSourceConnections() {}
@@ -64,4 +66,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
return delayedResponses;
}
+ @Override
+ public MemoryCache memoryCache() { return cache; }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index d063c45a3f7..8756090e420 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -1,27 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
-import java.util.logging.Level;
import com.yahoo.log.LogSetup;
import com.yahoo.log.event.Event;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload;
import com.yahoo.yolean.system.CatchSignals;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
@@ -40,27 +40,24 @@ public class ProxyServer implements Runnable {
private static final int JRT_TRANSPORT_THREADS = 4;
static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";
- private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
+ private static final Logger log = Logger.getLogger(ProxyServer.class.getName());
+
private final AtomicBoolean signalCaught = new AtomicBoolean(false);
private final Supervisor supervisor;
private final ConfigProxyRpcServer rpcServer;
- private ConfigSourceSet configSource;
-
- private volatile ConfigSourceClient configClient;
-
- private final MemoryCache memoryCache;
private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;
+ private ConfigSourceSet configSource;
+ private volatile ConfigSourceClient configClient;
private volatile Mode mode = new Mode(DEFAULT);
- ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) {
- this.configSource = source;
- supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
+ ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) {
+ this.configSource = Objects.requireNonNull(source);
log.log(Level.FINE, () -> "Using config source '" + source);
- this.memoryCache = memoryCache;
+ this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
this.rpcServer = createRpcServer(spec);
- this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient;
+ this.configClient = Objects.requireNonNull(configClient);
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -97,12 +94,12 @@ public class ProxyServer implements Runnable {
switch (newMode.getMode()) {
case MEMORYCACHE:
configClient.shutdownSourceConnections();
- configClient = new MemoryCacheConfigClient(memoryCache);
+ configClient = new MemoryCacheConfigClient();
this.mode = new Mode(modeName);
break;
case DEFAULT:
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
this.mode = new Mode(modeName);
break;
default:
@@ -115,8 +112,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) {
- return new RpcConfigSourceClient(rpcServer, source, memoryCache);
+ private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) {
+ return new RpcConfigSourceClient(new ResponseHandler(), source);
}
private void setupSignalHandler() {
@@ -159,7 +156,7 @@ public class ProxyServer implements Runnable {
Event.started("configproxy");
ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources);
- ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null);
+ ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources));
// catch termination and interrupt signal
proxyServer.setupSignalHandler();
Thread proxyserverThread = threadFactory.newThread(proxyServer);
@@ -169,7 +166,8 @@ public class ProxyServer implements Runnable {
}
static Properties getSystemProperties() {
- final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
+ String[] inputConfigSources = System.getProperty("proxyconfigsources",
+ DEFAULT_PROXY_CONFIG_SOURCES).split(",");
return new Properties(inputConfigSources);
}
@@ -184,15 +182,15 @@ public class ProxyServer implements Runnable {
// Cancels all config instances and flushes the cache. When this method returns,
// the cache will not be updated again before someone calls getConfig().
private synchronized void flush() {
- memoryCache.clear();
- configClient.cancel();
+ configClient.memoryCache().clear();
+ configClient.shutdown();
}
void stop() {
Event.stopping("configproxy", "shutdown rpcServer");
if (rpcServer != null) rpcServer.shutdown();
Event.stopping("configproxy", "cancel configClient");
- if (configClient != null) configClient.cancel();
+ configClient.shutdown();
Event.stopping("configproxy", "flush");
flush();
Event.stopping("configproxy", "close fileDistribution");
@@ -200,8 +198,8 @@ public class ProxyServer implements Runnable {
Event.stopping("configproxy", "stop complete");
}
- MemoryCache getMemoryCache() {
- return memoryCache;
+ MemoryCache memoryCache() {
+ return configClient.memoryCache();
}
String getActiveSourceConnection() {
@@ -215,7 +213,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(rpcServer, configSource, memoryCache);
+ configClient = createRpcClient(configSource);
}
DelayedResponses delayedResponses() {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
new file mode 100644
index 00000000000..c9cfbdd3e16
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL;
+
+/**
+ * An RPC server that handles config and file distribution requests.
+ *
+ * @author hmusum
+ */
+public class ResponseHandler {
+
+ private final Optional<AtomicLong> sentResponses;
+
+ public ResponseHandler() {
+ this(false);
+ }
+
+ // For testing only
+ ResponseHandler(boolean trackResponses) {
+ sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty();
+ }
+
+ private final static Logger log = Logger.getLogger(ResponseHandler.class.getName());
+
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
+ request.addOkResponse(config.getPayload(),
+ config.getGeneration(),
+ config.applyOnRestart(),
+ config.getPayloadChecksums());
+ log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
+ ",generation=" + config.getGeneration());
+ log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
+
+
+ // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
+ // TODO Move logic so that all requests are returned in CheckDelayedResponse
+ try {
+ request.getRequest().returnRequest();
+ } catch (IllegalStateException e) {
+ log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
+ }
+ sentResponses.ifPresent(AtomicLong::getAndIncrement);
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
+ request.addErrorResponse(errorCode, message);
+ request.getRequest().returnRequest();
+ }
+
+ public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 5df7b1fc021..362ca1164b8 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
private final ConfigSourceSet configSourceSet;
private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
@@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
private final ScheduledFuture<?> delayedResponsesFuture;
- RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
- this.rpcServer = rpcServer;
+ RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) {
+ this.responseHandler = responseHandler;
this.configSourceSet = configSourceSet;
- this.memoryCache = memoryCache;
+ this.memoryCache = new MemoryCache();
this.delayedResponses = new DelayedResponses();
checkConfigSources();
nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
this.requester = JRTConfigRequester.create(configSourceSet, timingValues);
- DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler);
this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
@@ -163,7 +163,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public void cancel() {
+ public void shutdown() {
log.log(Level.FINE, "shutdownSourceConnections");
shutdownSourceConnections();
log.log(Level.FINE, "delayedResponsesFuture.cancel");
@@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation);
if (config.getPayload().getData().getByteLength() == 0)
log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config");
- rpcServer.returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed");
}
@@ -243,9 +243,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public DelayedResponses delayedResponses() {
- return delayedResponses;
- }
+ public DelayedResponses delayedResponses() { return delayedResponses; }
+
+ @Override
+ public MemoryCache memoryCache() { return memoryCache; }
private void updateWithNewConfig(RawConfig newConfig) {
log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
index 1bcf8d5d8be..691bc6c43a7 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
@@ -92,7 +92,7 @@ public class ConfigProxyRpcServerTest {
assertThat(ret.length, is(0));
final RawConfig config = ProxyServerTest.fooConfig;
- server.proxyServer().getMemoryCache().update(config);
+ server.proxyServer().memoryCache().update(config);
req = new Request("listCachedConfig");
client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
@@ -119,7 +119,7 @@ public class ConfigProxyRpcServerTest {
assertThat(ret.length, is(0));
final RawConfig config = ProxyServerTest.fooConfig;
- server.proxyServer().getMemoryCache().update(config);
+ server.proxyServer().memoryCache().update(config);
req = new Request("listCachedConfigFull");
client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
@@ -133,7 +133,7 @@ public class ConfigProxyRpcServerTest {
}
/**
- * Tests printStatistics RPC command
+ * Tests listSourceConnections RPC command
*/
@Test
public void testRpcMethodListSourceConnections() throws ListenFailedException {
@@ -151,20 +151,6 @@ public class ConfigProxyRpcServerTest {
}
/**
- * Tests printStatistics RPC command
- */
- @Test
- public void testRpcMethodPrintStatistics() {
- Request req = new Request("printStatistics");
- client.invoke(req);
- assertFalse(req.errorMessage(), req.isError());
- assertThat(req.returnValues().size(), is(1));
- assertThat(req.returnValues().get(0).asString(), is("\n" +
- "Delayed responses queue size: 0\n" +
- "Contents: "));
- }
-
- /**
* Tests invalidateCache RPC command
*/
@Test
@@ -275,7 +261,7 @@ public class ConfigProxyRpcServerTest {
}
private static ProxyServer createTestServer(ConfigSourceSet source) {
- return new ProxyServer(null, source, new MemoryCache(), null);
+ return new ProxyServer(null, source, new RpcConfigSourceClient(new ResponseHandler(), source));
}
private static class TestServer implements AutoCloseable {
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
index c2a0282fd05..8a668b34fd0 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
@@ -6,8 +6,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
/**
* @author hmusum
@@ -29,16 +28,15 @@ public class DelayedResponseHandlerTest {
public void basic() {
ConfigTester tester = new ConfigTester();
DelayedResponses delayedResponses = new DelayedResponses();
- final MockRpcServer mockRpcServer = new MockRpcServer();
- final MemoryCache memoryCache = new MemoryCache();
+ MemoryCache memoryCache = new MemoryCache();
memoryCache.update(ConfigTester.fooConfig);
- final DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, mockRpcServer);
+ DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, new ResponseHandler());
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 0)));
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 1200000))); // should not be returned yet
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.errorConfig, 0))); // will not give a config when resolving
delayedResponseHandler.checkDelayedResponses();
- assertThat(mockRpcServer.responses, is(1L));
+ assertEquals(1, delayedResponseHandler.sentResponses());
}
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
index 51d0b983764..ae7350b11e0 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
@@ -16,9 +16,8 @@ public class MemoryCacheConfigClientTest {
@Test
public void basic() {
- MemoryCache cache = new MemoryCache();
- cache.update(ConfigTester.fooConfig);
- MemoryCacheConfigClient client = new MemoryCacheConfigClient(cache);
+ MemoryCacheConfigClient client = new MemoryCacheConfigClient();
+ client.memoryCache().update(ConfigTester.fooConfig);
assertThat(client.getConfig(ConfigTester.fooConfig, null), is(ConfigTester.fooConfig));
assertNull(client.getConfig(ConfigTester.barConfig, null));
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
index c0efc1cb355..d0724b9dbd0 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
@@ -18,9 +18,9 @@ public class MockConfigSourceClient implements ConfigSourceClient{
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses = new DelayedResponses();
- MockConfigSourceClient(MockConfigSource configSource, MemoryCache memoryCache) {
+ MockConfigSourceClient(MockConfigSource configSource) {
this.configSource = configSource;
- this.memoryCache = memoryCache;
+ this.memoryCache = new MemoryCache();
}
@Override
@@ -35,7 +35,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{
}
@Override
- public void cancel() {
+ public void shutdown() {
configSource.clear();
}
@@ -56,4 +56,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{
@Override
public DelayedResponses delayedResponses() { return delayedResponses; }
+ @Override
+ public MemoryCache memoryCache() { return memoryCache; }
+
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
deleted file mode 100644
index 56fcca191de..00000000000
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy;
-
-import com.yahoo.vespa.config.RawConfig;
-import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
-
-/**
- * @author hmusum
- */
-public class MockRpcServer implements RpcServer {
-
- volatile long responses = 0;
- volatile long errorResponses = 0;
-
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- responses++;
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- responses++;
- errorResponses++;
- }
-}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
index cdda2bf6e77..15de93b748f 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
@@ -2,7 +2,10 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.config.subscription.ConfigSourceSet;
-import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.ErrorCode;
+import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.Payload;
import org.junit.After;
@@ -25,9 +28,8 @@ import static org.junit.Assert.assertTrue;
*/
public class ProxyServerTest {
- private final MemoryCache memoryCache = new MemoryCache();
private final MockConfigSource source = new MockConfigSource();
- private final MockConfigSourceClient client = new MockConfigSourceClient(source, memoryCache);
+ private final ConfigSourceClient client = new MockConfigSourceClient(source);
private ProxyServer proxy;
static final RawConfig fooConfig = ConfigTester.fooConfig;
@@ -46,7 +48,7 @@ public class ProxyServerTest {
source.clear();
source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, 0));
source.put(errorConfigKey, createConfigWithNextConfigGeneration(fooConfig, ErrorCode.UNKNOWN_DEFINITION));
- proxy = createTestServer(source, client, memoryCache);
+ proxy = createTestServer(source, client);
}
@After
@@ -57,10 +59,10 @@ public class ProxyServerTest {
@Test
public void basic() {
assertTrue(proxy.getMode().isDefault());
- assertThat(proxy.getMemoryCache().size(), is(0));
+ assertThat(proxy.memoryCache().size(), is(0));
ConfigTester tester = new ConfigTester();
- final MemoryCache memoryCache = proxy.getMemoryCache();
+ MemoryCache memoryCache = proxy.memoryCache();
assertEquals(0, memoryCache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
@@ -74,7 +76,7 @@ public class ProxyServerTest {
*/
@Test
public void testModeSwitch() {
- ProxyServer proxy = createTestServer(source, client, new MemoryCache());
+ ProxyServer proxy = createTestServer(source, client);
assertTrue(proxy.getMode().isDefault());
for (String mode : Mode.modes()) {
@@ -109,7 +111,7 @@ public class ProxyServerTest {
@Test
public void testGetConfigAndCaching() {
ConfigTester tester = new ConfigTester();
- final MemoryCache memoryCache = proxy.getMemoryCache();
+ MemoryCache memoryCache = proxy.memoryCache();
assertEquals(0, memoryCache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
@@ -134,14 +136,14 @@ public class ProxyServerTest {
// Simulate an error response
source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, ErrorCode.INTERNAL_ERROR));
- final MemoryCache cacheManager = proxy.getMemoryCache();
- assertEquals(0, cacheManager.size());
+ MemoryCache memoryCache = proxy.memoryCache();
+ assertEquals(0, memoryCache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
assertNotNull(res.getPayload());
assertTrue(res.isError());
- assertEquals(0, cacheManager.size());
+ assertEquals(0, memoryCache.size());
// Put a version of the same config into backend without error and see that it now works (i.e. we are
// not getting a cached response (of the error in the previous request)
@@ -152,12 +154,12 @@ public class ProxyServerTest {
assertNotNull(res);
assertNotNull(res.getPayload().getData());
assertThat(res.getPayload().toString(), is(ConfigTester.fooPayload.toString()));
- assertEquals(1, cacheManager.size());
+ assertEquals(1, memoryCache.size());
JRTServerConfigRequest newRequestBasedOnResponse = tester.createRequest(res);
RawConfig res2 = proxy.resolveConfig(newRequestBasedOnResponse);
assertFalse(ProxyServer.configOrGenerationHasChanged(res2, newRequestBasedOnResponse));
- assertEquals(1, cacheManager.size());
+ assertEquals(1, memoryCache.size());
}
/**
@@ -169,7 +171,7 @@ public class ProxyServerTest {
@Test
public void testNoCachingOfEmptyConfig() {
ConfigTester tester = new ConfigTester();
- MemoryCache cache = proxy.getMemoryCache();
+ MemoryCache cache = proxy.memoryCache();
assertEquals(0, cache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
@@ -222,10 +224,8 @@ public class ProxyServerTest {
assertThat(properties.configSources[0], is(ProxyServer.DEFAULT_PROXY_CONFIG_SOURCES));
}
- private static ProxyServer createTestServer(ConfigSourceSet source,
- ConfigSourceClient configSourceClient,
- MemoryCache memoryCache) {
- return new ProxyServer(null, source, memoryCache, configSourceClient);
+ private static ProxyServer createTestServer(ConfigSourceSet source, ConfigSourceClient configSourceClient) {
+ return new ProxyServer(null, source, configSourceClient);
}
static RawConfig createConfigWithNextConfigGeneration(RawConfig config, int errorCode) {
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
index 372c8c41c99..ada98f4b30e 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
@@ -17,7 +17,7 @@ import static org.junit.Assert.assertEquals;
*/
public class RpcConfigSourceClientTest {
- private MockRpcServer rpcServer;
+ private ResponseHandler responseHandler;
private RpcConfigSourceClient rpcConfigSourceClient;
@Rule
@@ -26,8 +26,8 @@ public class RpcConfigSourceClientTest {
@Before
public void setup() {
- rpcServer = new MockRpcServer();
- rpcConfigSourceClient = new RpcConfigSourceClient(rpcServer, new MockConfigSource(), new MemoryCache());
+ responseHandler = new ResponseHandler(true);
+ rpcConfigSourceClient = new RpcConfigSourceClient(responseHandler, new MockConfigSource());
}
@Test
@@ -90,7 +90,7 @@ public class RpcConfigSourceClientTest {
}
private void assertSentResponses(int expected) {
- assertEquals(expected, rpcServer.responses);
+ assertEquals(expected, responseHandler.sentResponses());
}
private void simulateClientRequestingConfig(RawConfig config) {