aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-12-10 14:25:46 +0100
committerGitHub <noreply@github.com>2021-12-10 14:25:46 +0100
commit134121cc3b046de15c5ba4360c84586eb8fe0836 (patch)
tree42bc56808476261b9e96243792aff84b241dda71 /config-proxy
parent13fb8a9a0f83d1d880e6cabcaf883c1af0dae1dd (diff)
Revert "Hmusum/refactor config proxy [run-systemtest]"
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java66
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java4
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java25
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java15
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java50
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java63
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java21
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java22
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java10
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java5
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java9
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java36
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java8
14 files changed, 184 insertions, 173 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index 158df654439..8791918dab0 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -30,10 +30,10 @@ import java.util.logging.Logger;
*
* @author hmusum
*/
-public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
+public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
- static final int TRACELEVEL = 6;
+ private static final int TRACELEVEL = 6;
private final Spec spec;
private final Supervisor supervisor;
@@ -79,6 +79,10 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
this::ping)
.methodDesc("ping")
.returnDesc(0, "ret code", "return code, 0 is OK"));
+ supervisor.addMethod(new Method("printStatistics", "", "s",
+ this::printStatistics)
+ .methodDesc("printStatistics")
+ .returnDesc(0, "statistics", "Statistics for server"));
supervisor.addMethod(new Method("listCachedConfig", "", "S",
this::listCachedConfig)
.methodDesc("list cached configs)")
@@ -141,6 +145,26 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
});
}
+ /**
+ * Returns a String with statistics data for the server.
+ *
+ * @param req a Request
+ */
+ private void printStatistics(Request req) {
+ dispatchRpcRequest(req, () -> {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nDelayed responses queue size: ");
+ sb.append(proxyServer.delayedResponses().size());
+ sb.append("\nContents: ");
+ for (DelayedResponse delayed : proxyServer.delayedResponses().responses()) {
+ sb.append(delayed.getRequest().toString()).append("\n");
+ }
+
+ req.returnValues().add(new StringValue(sb.toString()));
+ req.returnRequest();
+ });
+ }
+
private void listCachedConfig(Request req) {
dispatchRpcRequest(req, () -> listCachedConfig(req, false));
}
@@ -177,7 +201,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private void invalidateCache(Request req) {
dispatchRpcRequest(req, () -> {
- proxyServer.memoryCache().clear();
+ proxyServer.getMemoryCache().clear();
String[] s = new String[2];
s[0] = "0";
s[1] = "success";
@@ -213,7 +237,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private void dumpCache(Request req) {
dispatchRpcRequest(req, () -> {
- final MemoryCache memoryCache = proxyServer.memoryCache();
+ final MemoryCache memoryCache = proxyServer.getMemoryCache();
req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
req.returnRequest();
});
@@ -245,13 +269,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
* @param request a Request
*/
private void getConfigImpl(JRTServerConfigRequest request) {
- ResponseHandler responseHandler = new ResponseHandler();
request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums());
if (!request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
return;
}
try {
@@ -259,13 +282,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
if (config == null) {
log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
} else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- responseHandler.returnOkResponse(request, config);
+ returnOkResponse(request, config);
} else {
log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response");
}
} catch (Exception e) {
e.printStackTrace();
- responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
}
}
@@ -279,7 +302,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private void listCachedConfig(Request req, boolean full) {
String[] ret;
- MemoryCache cache = proxyServer.memoryCache();
+ MemoryCache cache = proxyServer.getMemoryCache();
ret = new String[cache.size()];
int i = 0;
for (RawConfig config : cache.values()) {
@@ -325,4 +348,29 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
// requesting this config?
}
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
+ request.addOkResponse(config.getPayload(),
+ config.getGeneration(),
+ config.applyOnRestart(),
+ config.getPayloadChecksums());
+ log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
+ ",generation=" + config.getGeneration());
+ log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
+
+
+ // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
+ // TODO Move logic so that all requests are returned in CheckDelayedResponse
+ try {
+ request.getRequest().returnRequest();
+ } catch (IllegalStateException e) {
+ log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
+ }
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
+ request.addErrorResponse(errorCode, message);
+ request.getRequest().returnRequest();
+ }
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
index dae732e56ec..6e5fe2d3fd8 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
@@ -16,7 +16,7 @@ interface ConfigSourceClient {
RawConfig getConfig(RawConfig input, JRTServerConfigRequest request);
- void shutdown();
+ void cancel();
void shutdownSourceConnections();
@@ -26,6 +26,4 @@ interface ConfigSourceClient {
DelayedResponses delayedResponses();
- MemoryCache memoryCache();
-
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
index 0e8ebe0d9c9..f77bd4b9138 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
@@ -6,12 +6,10 @@ import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.yolean.Exceptions;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
-import static com.yahoo.protect.Process.logAndDie;
-
/**
* The run method of this class is executed periodically to return delayed responses
* (requests use long polling, so config proxy needs to return a response when they time out).
@@ -24,13 +22,12 @@ public class DelayedResponseHandler implements Runnable {
private final DelayedResponses delayedResponses;
private final MemoryCache memoryCache;
- private final ResponseHandler responseHandler;
- private final AtomicLong sentResponses = new AtomicLong();
+ private final RpcServer rpcServer;
- DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) {
+ DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) {
this.delayedResponses = delayedResponses;
this.memoryCache = memoryCache;
- this.responseHandler = responseHandler;
+ this.rpcServer = rpcServer;
}
@Override
@@ -44,27 +41,25 @@ public class DelayedResponseHandler implements Runnable {
log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() +
" delayed responses. First one is " + delayedResponses.responses().peek());
DelayedResponse response;
+ AtomicInteger i = new AtomicInteger(0);
while ((response = delayedResponses.responses().poll()) != null) {
JRTServerConfigRequest request = response.getRequest();
ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5());
RawConfig config = memoryCache.get(cacheKey);
if (config != null) {
- responseHandler.returnOkResponse(request, config);
- sentResponses.incrementAndGet();
+ rpcServer.returnOkResponse(request, config);
+ i.incrementAndGet();
} else {
log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " +
request.getConfigKey() + ", will retry");
}
}
- log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() +
- " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms");
+ log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " +
+ (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) { // To avoid thread throwing exception and executor never running this again
log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e));
} catch (Throwable e) {
- logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
+ com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
}
}
-
- public long sentResponses() { return sentResponses.get(); }
-
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
index 7ae8501278d..6e90ad16f50 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
@@ -1,14 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import com.yahoo.vespa.config.ConfigCacheKey;
-import com.yahoo.vespa.config.ConfigKey;
-import com.yahoo.vespa.config.RawConfig;
+import java.util.logging.Level;
+import com.yahoo.vespa.config.*;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import java.util.Collections;
import java.util.List;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -22,8 +20,8 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
private final MemoryCache cache;
private final DelayedResponses delayedResponses = new DelayedResponses();
- MemoryCacheConfigClient() {
- this.cache = new MemoryCache();
+ MemoryCacheConfigClient(MemoryCache cache) {
+ this.cache = cache;
}
/**
@@ -46,7 +44,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
}
@Override
- public void shutdown() {}
+ public void cancel() {}
@Override
public void shutdownSourceConnections() {}
@@ -66,7 +64,4 @@ class MemoryCacheConfigClient implements ConfigSourceClient {
return delayedResponses;
}
- @Override
- public MemoryCache memoryCache() { return cache; }
-
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 8756090e420..d063c45a3f7 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -1,27 +1,27 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
+import java.util.logging.Level;
import com.yahoo.log.LogSetup;
import com.yahoo.log.event.Event;
+import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload;
import com.yahoo.yolean.system.CatchSignals;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT;
@@ -40,24 +40,27 @@ public class ProxyServer implements Runnable {
private static final int JRT_TRANSPORT_THREADS = 4;
static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";
- private static final Logger log = Logger.getLogger(ProxyServer.class.getName());
-
+ private final static Logger log = Logger.getLogger(ProxyServer.class.getName());
private final AtomicBoolean signalCaught = new AtomicBoolean(false);
private final Supervisor supervisor;
private final ConfigProxyRpcServer rpcServer;
- private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;
-
private ConfigSourceSet configSource;
+
private volatile ConfigSourceClient configClient;
+
+ private final MemoryCache memoryCache;
+ private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload;
+
private volatile Mode mode = new Mode(DEFAULT);
- ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) {
- this.configSource = Objects.requireNonNull(source);
+ ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) {
+ this.configSource = source;
+ supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
log.log(Level.FINE, () -> "Using config source '" + source);
- this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
+ this.memoryCache = memoryCache;
this.rpcServer = createRpcServer(spec);
- this.configClient = Objects.requireNonNull(configClient);
+ this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient;
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -94,12 +97,12 @@ public class ProxyServer implements Runnable {
switch (newMode.getMode()) {
case MEMORYCACHE:
configClient.shutdownSourceConnections();
- configClient = new MemoryCacheConfigClient();
+ configClient = new MemoryCacheConfigClient(memoryCache);
this.mode = new Mode(modeName);
break;
case DEFAULT:
flush();
- configClient = createRpcClient(configSource);
+ configClient = createRpcClient(rpcServer, configSource, memoryCache);
this.mode = new Mode(modeName);
break;
default:
@@ -112,8 +115,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) {
- return new RpcConfigSourceClient(new ResponseHandler(), source);
+ private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) {
+ return new RpcConfigSourceClient(rpcServer, source, memoryCache);
}
private void setupSignalHandler() {
@@ -156,7 +159,7 @@ public class ProxyServer implements Runnable {
Event.started("configproxy");
ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources);
- ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources));
+ ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null);
// catch termination and interrupt signal
proxyServer.setupSignalHandler();
Thread proxyserverThread = threadFactory.newThread(proxyServer);
@@ -166,8 +169,7 @@ public class ProxyServer implements Runnable {
}
static Properties getSystemProperties() {
- String[] inputConfigSources = System.getProperty("proxyconfigsources",
- DEFAULT_PROXY_CONFIG_SOURCES).split(",");
+ final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
return new Properties(inputConfigSources);
}
@@ -182,15 +184,15 @@ public class ProxyServer implements Runnable {
// Cancels all config instances and flushes the cache. When this method returns,
// the cache will not be updated again before someone calls getConfig().
private synchronized void flush() {
- configClient.memoryCache().clear();
- configClient.shutdown();
+ memoryCache.clear();
+ configClient.cancel();
}
void stop() {
Event.stopping("configproxy", "shutdown rpcServer");
if (rpcServer != null) rpcServer.shutdown();
Event.stopping("configproxy", "cancel configClient");
- configClient.shutdown();
+ if (configClient != null) configClient.cancel();
Event.stopping("configproxy", "flush");
flush();
Event.stopping("configproxy", "close fileDistribution");
@@ -198,8 +200,8 @@ public class ProxyServer implements Runnable {
Event.stopping("configproxy", "stop complete");
}
- MemoryCache memoryCache() {
- return configClient.memoryCache();
+ MemoryCache getMemoryCache() {
+ return memoryCache;
}
String getActiveSourceConnection() {
@@ -213,7 +215,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(configSource);
+ configClient = createRpcClient(rpcServer, configSource, memoryCache);
}
DelayedResponses delayedResponses() {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
deleted file mode 100644
index c9cfbdd3e16..00000000000
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy;
-
-import com.yahoo.vespa.config.RawConfig;
-import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL;
-
-/**
- * An RPC server that handles config and file distribution requests.
- *
- * @author hmusum
- */
-public class ResponseHandler {
-
- private final Optional<AtomicLong> sentResponses;
-
- public ResponseHandler() {
- this(false);
- }
-
- // For testing only
- ResponseHandler(boolean trackResponses) {
- sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty();
- }
-
- private final static Logger log = Logger.getLogger(ResponseHandler.class.getName());
-
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
- request.addOkResponse(config.getPayload(),
- config.getGeneration(),
- config.applyOnRestart(),
- config.getPayloadChecksums());
- log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
- ",generation=" + config.getGeneration());
- log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
-
-
- // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
- // TODO Move logic so that all requests are returned in CheckDelayedResponse
- try {
- request.getRequest().returnRequest();
- } catch (IllegalStateException e) {
- log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
- }
- sentResponses.ifPresent(AtomicLong::getAndIncrement);
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
- request.addErrorResponse(errorCode, message);
- request.getRequest().returnRequest();
- }
-
- public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); }
-
-}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index 362ca1164b8..5df7b1fc021 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
- private final ResponseHandler responseHandler;
+ private final RpcServer rpcServer;
private final ConfigSourceSet configSourceSet;
private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
@@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
private final ScheduledFuture<?> delayedResponsesFuture;
- RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) {
- this.responseHandler = responseHandler;
+ RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) {
+ this.rpcServer = rpcServer;
this.configSourceSet = configSourceSet;
- this.memoryCache = new MemoryCache();
+ this.memoryCache = memoryCache;
this.delayedResponses = new DelayedResponses();
checkConfigSources();
nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
this.requester = JRTConfigRequester.create(configSourceSet, timingValues);
- DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
@@ -163,7 +163,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public void shutdown() {
+ public void cancel() {
log.log(Level.FINE, "shutdownSourceConnections");
shutdownSourceConnections();
log.log(Level.FINE, "delayedResponsesFuture.cancel");
@@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation);
if (config.getPayload().getData().getByteLength() == 0)
log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config");
- responseHandler.returnOkResponse(request, config);
+ rpcServer.returnOkResponse(request, config);
} else {
log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed");
}
@@ -243,10 +243,9 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
}
@Override
- public DelayedResponses delayedResponses() { return delayedResponses; }
-
- @Override
- public MemoryCache memoryCache() { return memoryCache; }
+ public DelayedResponses delayedResponses() {
+ return delayedResponses;
+ }
private void updateWithNewConfig(RawConfig newConfig) {
log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() +
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
index 691bc6c43a7..1bcf8d5d8be 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
@@ -92,7 +92,7 @@ public class ConfigProxyRpcServerTest {
assertThat(ret.length, is(0));
final RawConfig config = ProxyServerTest.fooConfig;
- server.proxyServer().memoryCache().update(config);
+ server.proxyServer().getMemoryCache().update(config);
req = new Request("listCachedConfig");
client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
@@ -119,7 +119,7 @@ public class ConfigProxyRpcServerTest {
assertThat(ret.length, is(0));
final RawConfig config = ProxyServerTest.fooConfig;
- server.proxyServer().memoryCache().update(config);
+ server.proxyServer().getMemoryCache().update(config);
req = new Request("listCachedConfigFull");
client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
@@ -133,7 +133,7 @@ public class ConfigProxyRpcServerTest {
}
/**
- * Tests listSourceConnections RPC command
+ * Tests printStatistics RPC command
*/
@Test
public void testRpcMethodListSourceConnections() throws ListenFailedException {
@@ -151,6 +151,20 @@ public class ConfigProxyRpcServerTest {
}
/**
+ * Tests printStatistics RPC command
+ */
+ @Test
+ public void testRpcMethodPrintStatistics() {
+ Request req = new Request("printStatistics");
+ client.invoke(req);
+ assertFalse(req.errorMessage(), req.isError());
+ assertThat(req.returnValues().size(), is(1));
+ assertThat(req.returnValues().get(0).asString(), is("\n" +
+ "Delayed responses queue size: 0\n" +
+ "Contents: "));
+ }
+
+ /**
* Tests invalidateCache RPC command
*/
@Test
@@ -261,7 +275,7 @@ public class ConfigProxyRpcServerTest {
}
private static ProxyServer createTestServer(ConfigSourceSet source) {
- return new ProxyServer(null, source, new RpcConfigSourceClient(new ResponseHandler(), source));
+ return new ProxyServer(null, source, new MemoryCache(), null);
}
private static class TestServer implements AutoCloseable {
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
index 8a668b34fd0..c2a0282fd05 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
@@ -6,7 +6,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
/**
* @author hmusum
@@ -28,15 +29,16 @@ public class DelayedResponseHandlerTest {
public void basic() {
ConfigTester tester = new ConfigTester();
DelayedResponses delayedResponses = new DelayedResponses();
- MemoryCache memoryCache = new MemoryCache();
+ final MockRpcServer mockRpcServer = new MockRpcServer();
+ final MemoryCache memoryCache = new MemoryCache();
memoryCache.update(ConfigTester.fooConfig);
- DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, new ResponseHandler());
+ final DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, mockRpcServer);
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 0)));
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 1200000))); // should not be returned yet
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.errorConfig, 0))); // will not give a config when resolving
delayedResponseHandler.checkDelayedResponses();
- assertEquals(1, delayedResponseHandler.sentResponses());
+ assertThat(mockRpcServer.responses, is(1L));
}
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
index ae7350b11e0..51d0b983764 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java
@@ -16,8 +16,9 @@ public class MemoryCacheConfigClientTest {
@Test
public void basic() {
- MemoryCacheConfigClient client = new MemoryCacheConfigClient();
- client.memoryCache().update(ConfigTester.fooConfig);
+ MemoryCache cache = new MemoryCache();
+ cache.update(ConfigTester.fooConfig);
+ MemoryCacheConfigClient client = new MemoryCacheConfigClient(cache);
assertThat(client.getConfig(ConfigTester.fooConfig, null), is(ConfigTester.fooConfig));
assertNull(client.getConfig(ConfigTester.barConfig, null));
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
index d0724b9dbd0..c0efc1cb355 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java
@@ -18,9 +18,9 @@ public class MockConfigSourceClient implements ConfigSourceClient{
private final MemoryCache memoryCache;
private final DelayedResponses delayedResponses = new DelayedResponses();
- MockConfigSourceClient(MockConfigSource configSource) {
+ MockConfigSourceClient(MockConfigSource configSource, MemoryCache memoryCache) {
this.configSource = configSource;
- this.memoryCache = new MemoryCache();
+ this.memoryCache = memoryCache;
}
@Override
@@ -35,7 +35,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{
}
@Override
- public void shutdown() {
+ public void cancel() {
configSource.clear();
}
@@ -56,7 +56,4 @@ public class MockConfigSourceClient implements ConfigSourceClient{
@Override
public DelayedResponses delayedResponses() { return delayedResponses; }
- @Override
- public MemoryCache memoryCache() { return memoryCache; }
-
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
new file mode 100644
index 00000000000..56fcca191de
--- /dev/null
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+/**
+ * @author hmusum
+ */
+public class MockRpcServer implements RpcServer {
+
+ volatile long responses = 0;
+ volatile long errorResponses = 0;
+
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ responses++;
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ responses++;
+ errorResponses++;
+ }
+}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
index 15de93b748f..cdda2bf6e77 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
@@ -2,10 +2,7 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.config.subscription.ConfigSourceSet;
-import com.yahoo.vespa.config.ConfigCacheKey;
-import com.yahoo.vespa.config.ConfigKey;
-import com.yahoo.vespa.config.ErrorCode;
-import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.*;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.Payload;
import org.junit.After;
@@ -28,8 +25,9 @@ import static org.junit.Assert.assertTrue;
*/
public class ProxyServerTest {
+ private final MemoryCache memoryCache = new MemoryCache();
private final MockConfigSource source = new MockConfigSource();
- private final ConfigSourceClient client = new MockConfigSourceClient(source);
+ private final MockConfigSourceClient client = new MockConfigSourceClient(source, memoryCache);
private ProxyServer proxy;
static final RawConfig fooConfig = ConfigTester.fooConfig;
@@ -48,7 +46,7 @@ public class ProxyServerTest {
source.clear();
source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, 0));
source.put(errorConfigKey, createConfigWithNextConfigGeneration(fooConfig, ErrorCode.UNKNOWN_DEFINITION));
- proxy = createTestServer(source, client);
+ proxy = createTestServer(source, client, memoryCache);
}
@After
@@ -59,10 +57,10 @@ public class ProxyServerTest {
@Test
public void basic() {
assertTrue(proxy.getMode().isDefault());
- assertThat(proxy.memoryCache().size(), is(0));
+ assertThat(proxy.getMemoryCache().size(), is(0));
ConfigTester tester = new ConfigTester();
- MemoryCache memoryCache = proxy.memoryCache();
+ final MemoryCache memoryCache = proxy.getMemoryCache();
assertEquals(0, memoryCache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
@@ -76,7 +74,7 @@ public class ProxyServerTest {
*/
@Test
public void testModeSwitch() {
- ProxyServer proxy = createTestServer(source, client);
+ ProxyServer proxy = createTestServer(source, client, new MemoryCache());
assertTrue(proxy.getMode().isDefault());
for (String mode : Mode.modes()) {
@@ -111,7 +109,7 @@ public class ProxyServerTest {
@Test
public void testGetConfigAndCaching() {
ConfigTester tester = new ConfigTester();
- MemoryCache memoryCache = proxy.memoryCache();
+ final MemoryCache memoryCache = proxy.getMemoryCache();
assertEquals(0, memoryCache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
@@ -136,14 +134,14 @@ public class ProxyServerTest {
// Simulate an error response
source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, ErrorCode.INTERNAL_ERROR));
- MemoryCache memoryCache = proxy.memoryCache();
- assertEquals(0, memoryCache.size());
+ final MemoryCache cacheManager = proxy.getMemoryCache();
+ assertEquals(0, cacheManager.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
assertNotNull(res);
assertNotNull(res.getPayload());
assertTrue(res.isError());
- assertEquals(0, memoryCache.size());
+ assertEquals(0, cacheManager.size());
// Put a version of the same config into backend without error and see that it now works (i.e. we are
// not getting a cached response (of the error in the previous request)
@@ -154,12 +152,12 @@ public class ProxyServerTest {
assertNotNull(res);
assertNotNull(res.getPayload().getData());
assertThat(res.getPayload().toString(), is(ConfigTester.fooPayload.toString()));
- assertEquals(1, memoryCache.size());
+ assertEquals(1, cacheManager.size());
JRTServerConfigRequest newRequestBasedOnResponse = tester.createRequest(res);
RawConfig res2 = proxy.resolveConfig(newRequestBasedOnResponse);
assertFalse(ProxyServer.configOrGenerationHasChanged(res2, newRequestBasedOnResponse));
- assertEquals(1, memoryCache.size());
+ assertEquals(1, cacheManager.size());
}
/**
@@ -171,7 +169,7 @@ public class ProxyServerTest {
@Test
public void testNoCachingOfEmptyConfig() {
ConfigTester tester = new ConfigTester();
- MemoryCache cache = proxy.memoryCache();
+ MemoryCache cache = proxy.getMemoryCache();
assertEquals(0, cache.size());
RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig));
@@ -224,8 +222,10 @@ public class ProxyServerTest {
assertThat(properties.configSources[0], is(ProxyServer.DEFAULT_PROXY_CONFIG_SOURCES));
}
- private static ProxyServer createTestServer(ConfigSourceSet source, ConfigSourceClient configSourceClient) {
- return new ProxyServer(null, source, configSourceClient);
+ private static ProxyServer createTestServer(ConfigSourceSet source,
+ ConfigSourceClient configSourceClient,
+ MemoryCache memoryCache) {
+ return new ProxyServer(null, source, memoryCache, configSourceClient);
}
static RawConfig createConfigWithNextConfigGeneration(RawConfig config, int errorCode) {
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
index ada98f4b30e..372c8c41c99 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
@@ -17,7 +17,7 @@ import static org.junit.Assert.assertEquals;
*/
public class RpcConfigSourceClientTest {
- private ResponseHandler responseHandler;
+ private MockRpcServer rpcServer;
private RpcConfigSourceClient rpcConfigSourceClient;
@Rule
@@ -26,8 +26,8 @@ public class RpcConfigSourceClientTest {
@Before
public void setup() {
- responseHandler = new ResponseHandler(true);
- rpcConfigSourceClient = new RpcConfigSourceClient(responseHandler, new MockConfigSource());
+ rpcServer = new MockRpcServer();
+ rpcConfigSourceClient = new RpcConfigSourceClient(rpcServer, new MockConfigSource(), new MemoryCache());
}
@Test
@@ -90,7 +90,7 @@ public class RpcConfigSourceClientTest {
}
private void assertSentResponses(int expected) {
- assertEquals(expected, responseHandler.sentResponses());
+ assertEquals(expected, rpcServer.responses);
}
private void simulateClientRequestingConfig(RawConfig config) {