From 134121cc3b046de15c5ba4360c84586eb8fe0836 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Fri, 10 Dec 2021 14:25:46 +0100 Subject: Revert "Hmusum/refactor config proxy [run-systemtest]" --- .../vespa/config/proxy/ConfigProxyRpcServer.java | 66 +++++++++++++++++++--- .../vespa/config/proxy/ConfigSourceClient.java | 4 +- .../vespa/config/proxy/DelayedResponseHandler.java | 25 ++++---- .../config/proxy/MemoryCacheConfigClient.java | 15 ++--- .../com/yahoo/vespa/config/proxy/ProxyServer.java | 50 ++++++++-------- .../yahoo/vespa/config/proxy/ResponseHandler.java | 63 --------------------- .../vespa/config/proxy/RpcConfigSourceClient.java | 21 ++++--- .../config/proxy/ConfigProxyRpcServerTest.java | 22 ++++++-- .../config/proxy/DelayedResponseHandlerTest.java | 10 ++-- .../config/proxy/MemoryCacheConfigClientTest.java | 5 +- .../vespa/config/proxy/MockConfigSourceClient.java | 9 +-- .../yahoo/vespa/config/proxy/MockRpcServer.java | 23 ++++++++ .../yahoo/vespa/config/proxy/ProxyServerTest.java | 36 ++++++------ .../config/proxy/RpcConfigSourceClientTest.java | 8 +-- 14 files changed, 184 insertions(+), 173 deletions(-) delete mode 100644 config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java create mode 100644 config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index 158df654439..8791918dab0 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -30,10 +30,10 @@ import java.util.logging.Logger; * * @author hmusum */ -public class ConfigProxyRpcServer implements Runnable, TargetWatcher { +public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); - static final int TRACELEVEL = 6; + private static final int TRACELEVEL = 6; private final Spec spec; private final Supervisor supervisor; @@ -79,6 +79,10 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { this::ping) .methodDesc("ping") .returnDesc(0, "ret code", "return code, 0 is OK")); + supervisor.addMethod(new Method("printStatistics", "", "s", + this::printStatistics) + .methodDesc("printStatistics") + .returnDesc(0, "statistics", "Statistics for server")); supervisor.addMethod(new Method("listCachedConfig", "", "S", this::listCachedConfig) .methodDesc("list cached configs)") @@ -141,6 +145,26 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { }); } + /** + * Returns a String with statistics data for the server. + * + * @param req a Request + */ + private void printStatistics(Request req) { + dispatchRpcRequest(req, () -> { + StringBuilder sb = new StringBuilder(); + sb.append("\nDelayed responses queue size: "); + sb.append(proxyServer.delayedResponses().size()); + sb.append("\nContents: "); + for (DelayedResponse delayed : proxyServer.delayedResponses().responses()) { + sb.append(delayed.getRequest().toString()).append("\n"); + } + + req.returnValues().add(new StringValue(sb.toString())); + req.returnRequest(); + }); + } + private void listCachedConfig(Request req) { dispatchRpcRequest(req, () -> listCachedConfig(req, false)); } @@ -177,7 +201,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { private void invalidateCache(Request req) { dispatchRpcRequest(req, () -> { - proxyServer.memoryCache().clear(); + proxyServer.getMemoryCache().clear(); String[] s = new String[2]; s[0] = "0"; s[1] = "success"; @@ -213,7 +237,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { private void dumpCache(Request req) { dispatchRpcRequest(req, () -> { - final MemoryCache memoryCache = proxyServer.memoryCache(); + final MemoryCache memoryCache = proxyServer.getMemoryCache(); req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); req.returnRequest(); }); @@ -245,13 +269,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { * @param request a Request */ private void getConfigImpl(JRTServerConfigRequest request) { - ResponseHandler responseHandler = new ResponseHandler(); request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums()); if (!request.validateParameters()) { // Error code is set in verifyParameters if parameters are not OK. log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); - responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); return; } try { @@ -259,13 +282,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { if (config == null) { log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { - responseHandler.returnOkResponse(request, config); + returnOkResponse(request, config); } else { log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response"); } } catch (Exception e) { e.printStackTrace(); - responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); } } @@ -279,7 +302,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { private void listCachedConfig(Request req, boolean full) { String[] ret; - MemoryCache cache = proxyServer.memoryCache(); + MemoryCache cache = proxyServer.getMemoryCache(); ret = new String[cache.size()]; int i = 0; for (RawConfig config : cache.values()) { @@ -325,4 +348,29 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher { // requesting this config? } + public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); + request.addOkResponse(config.getPayload(), + config.getGeneration(), + config.applyOnRestart(), + config.getPayloadChecksums()); + log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + + ",generation=" + config.getGeneration()); + log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); + + + // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse + // TODO Move logic so that all requests are returned in CheckDelayedResponse + try { + request.getRequest().returnRequest(); + } catch (IllegalStateException e) { + log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); + } + } + + public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); + request.addErrorResponse(errorCode, message); + request.getRequest().returnRequest(); + } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java index dae732e56ec..6e5fe2d3fd8 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java @@ -16,7 +16,7 @@ interface ConfigSourceClient { RawConfig getConfig(RawConfig input, JRTServerConfigRequest request); - void shutdown(); + void cancel(); void shutdownSourceConnections(); @@ -26,6 +26,4 @@ interface ConfigSourceClient { DelayedResponses delayedResponses(); - MemoryCache memoryCache(); - } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java index 0e8ebe0d9c9..f77bd4b9138 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java @@ -6,12 +6,10 @@ import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.yolean.Exceptions; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; -import static com.yahoo.protect.Process.logAndDie; - /** * The run method of this class is executed periodically to return delayed responses * (requests use long polling, so config proxy needs to return a response when they time out). @@ -24,13 +22,12 @@ public class DelayedResponseHandler implements Runnable { private final DelayedResponses delayedResponses; private final MemoryCache memoryCache; - private final ResponseHandler responseHandler; - private final AtomicLong sentResponses = new AtomicLong(); + private final RpcServer rpcServer; - DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) { + DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) { this.delayedResponses = delayedResponses; this.memoryCache = memoryCache; - this.responseHandler = responseHandler; + this.rpcServer = rpcServer; } @Override @@ -44,27 +41,25 @@ public class DelayedResponseHandler implements Runnable { log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() + " delayed responses. First one is " + delayedResponses.responses().peek()); DelayedResponse response; + AtomicInteger i = new AtomicInteger(0); while ((response = delayedResponses.responses().poll()) != null) { JRTServerConfigRequest request = response.getRequest(); ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5()); RawConfig config = memoryCache.get(cacheKey); if (config != null) { - responseHandler.returnOkResponse(request, config); - sentResponses.incrementAndGet(); + rpcServer.returnOkResponse(request, config); + i.incrementAndGet(); } else { log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " + request.getConfigKey() + ", will retry"); } } - log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() + - " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms"); + log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " + + (System.currentTimeMillis() - start) + " ms"); } catch (Exception e) { // To avoid thread throwing exception and executor never running this again log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e)); } catch (Throwable e) { - logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); + com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); } } - - public long sentResponses() { return sentResponses.get(); } - } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java index 7ae8501278d..6e90ad16f50 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java @@ -1,14 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.vespa.config.ConfigCacheKey; -import com.yahoo.vespa.config.ConfigKey; -import com.yahoo.vespa.config.RawConfig; +import java.util.logging.Level; +import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.Collections; import java.util.List; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -22,8 +20,8 @@ class MemoryCacheConfigClient implements ConfigSourceClient { private final MemoryCache cache; private final DelayedResponses delayedResponses = new DelayedResponses(); - MemoryCacheConfigClient() { - this.cache = new MemoryCache(); + MemoryCacheConfigClient(MemoryCache cache) { + this.cache = cache; } /** @@ -46,7 +44,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient { } @Override - public void shutdown() {} + public void cancel() {} @Override public void shutdownSourceConnections() {} @@ -66,7 +64,4 @@ class MemoryCacheConfigClient implements ConfigSourceClient { return delayedResponses; } - @Override - public MemoryCache memoryCache() { return cache; } - } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 8756090e420..d063c45a3f7 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -1,27 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; +import java.util.logging.Level; import com.yahoo.log.LogSetup; import com.yahoo.log.event.Event; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload; import com.yahoo.yolean.system.CatchSignals; import java.util.List; -import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT; @@ -40,24 +40,27 @@ public class ProxyServer implements Runnable { private static final int JRT_TRANSPORT_THREADS = 4; static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070"; - private static final Logger log = Logger.getLogger(ProxyServer.class.getName()); - + private final static Logger log = Logger.getLogger(ProxyServer.class.getName()); private final AtomicBoolean signalCaught = new AtomicBoolean(false); private final Supervisor supervisor; private final ConfigProxyRpcServer rpcServer; - private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload; - private ConfigSourceSet configSource; + private volatile ConfigSourceClient configClient; + + private final MemoryCache memoryCache; + private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload; + private volatile Mode mode = new Mode(DEFAULT); - ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) { - this.configSource = Objects.requireNonNull(source); + ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) { + this.configSource = source; + supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); log.log(Level.FINE, () -> "Using config source '" + source); - this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); + this.memoryCache = memoryCache; this.rpcServer = createRpcServer(spec); - this.configClient = Objects.requireNonNull(configClient); + this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient; this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source); } @@ -94,12 +97,12 @@ public class ProxyServer implements Runnable { switch (newMode.getMode()) { case MEMORYCACHE: configClient.shutdownSourceConnections(); - configClient = new MemoryCacheConfigClient(); + configClient = new MemoryCacheConfigClient(memoryCache); this.mode = new Mode(modeName); break; case DEFAULT: flush(); - configClient = createRpcClient(configSource); + configClient = createRpcClient(rpcServer, configSource, memoryCache); this.mode = new Mode(modeName); break; default: @@ -112,8 +115,8 @@ public class ProxyServer implements Runnable { return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } - private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) { - return new RpcConfigSourceClient(new ResponseHandler(), source); + private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) { + return new RpcConfigSourceClient(rpcServer, source, memoryCache); } private void setupSignalHandler() { @@ -156,7 +159,7 @@ public class ProxyServer implements Runnable { Event.started("configproxy"); ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources); - ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources)); + ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null); // catch termination and interrupt signal proxyServer.setupSignalHandler(); Thread proxyserverThread = threadFactory.newThread(proxyServer); @@ -166,8 +169,7 @@ public class ProxyServer implements Runnable { } static Properties getSystemProperties() { - String[] inputConfigSources = System.getProperty("proxyconfigsources", - DEFAULT_PROXY_CONFIG_SOURCES).split(","); + final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(","); return new Properties(inputConfigSources); } @@ -182,15 +184,15 @@ public class ProxyServer implements Runnable { // Cancels all config instances and flushes the cache. When this method returns, // the cache will not be updated again before someone calls getConfig(). private synchronized void flush() { - configClient.memoryCache().clear(); - configClient.shutdown(); + memoryCache.clear(); + configClient.cancel(); } void stop() { Event.stopping("configproxy", "shutdown rpcServer"); if (rpcServer != null) rpcServer.shutdown(); Event.stopping("configproxy", "cancel configClient"); - configClient.shutdown(); + if (configClient != null) configClient.cancel(); Event.stopping("configproxy", "flush"); flush(); Event.stopping("configproxy", "close fileDistribution"); @@ -198,8 +200,8 @@ public class ProxyServer implements Runnable { Event.stopping("configproxy", "stop complete"); } - MemoryCache memoryCache() { - return configClient.memoryCache(); + MemoryCache getMemoryCache() { + return memoryCache; } String getActiveSourceConnection() { @@ -213,7 +215,7 @@ public class ProxyServer implements Runnable { void updateSourceConnections(List sources) { configSource = new ConfigSourceSet(sources); flush(); - configClient = createRpcClient(configSource); + configClient = createRpcClient(rpcServer, configSource, memoryCache); } DelayedResponses delayedResponses() { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java deleted file mode 100644 index c9cfbdd3e16..00000000000 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy; - -import com.yahoo.vespa.config.RawConfig; -import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; - -import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL; - -/** - * An RPC server that handles config and file distribution requests. - * - * @author hmusum - */ -public class ResponseHandler { - - private final Optional sentResponses; - - public ResponseHandler() { - this(false); - } - - // For testing only - ResponseHandler(boolean trackResponses) { - sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty(); - } - - private final static Logger log = Logger.getLogger(ResponseHandler.class.getName()); - - public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); - request.addOkResponse(config.getPayload(), - config.getGeneration(), - config.applyOnRestart(), - config.getPayloadChecksums()); - log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + - ",generation=" + config.getGeneration()); - log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); - - - // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse - // TODO Move logic so that all requests are returned in CheckDelayedResponse - try { - request.getRequest().returnRequest(); - } catch (IllegalStateException e) { - log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); - } - sentResponses.ifPresent(AtomicLong::getAndIncrement); - } - - public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); - request.addErrorResponse(errorCode, message); - request.getRequest().returnRequest(); - } - - public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); } - -} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 362ca1164b8..5df7b1fc021 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final Supervisor supervisor = new Supervisor(new Transport("config-source-client")); - private final ResponseHandler responseHandler; + private final RpcServer rpcServer; private final ConfigSourceSet configSourceSet; private final Map activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; @@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); private final ScheduledFuture delayedResponsesFuture; - RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) { - this.responseHandler = responseHandler; + RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) { + this.rpcServer = rpcServer; this.configSourceSet = configSourceSet; - this.memoryCache = new MemoryCache(); + this.memoryCache = memoryCache; this.delayedResponses = new DelayedResponses(); checkConfigSources(); nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); this.requester = JRTConfigRequester.create(configSourceSet, timingValues); - DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler); + DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } @@ -163,7 +163,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } @Override - public void shutdown() { + public void cancel() { log.log(Level.FINE, "shutdownSourceConnections"); shutdownSourceConnections(); log.log(Level.FINE, "delayedResponsesFuture.cancel"); @@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation); if (config.getPayload().getData().getByteLength() == 0) log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config"); - responseHandler.returnOkResponse(request, config); + rpcServer.returnOkResponse(request, config); } else { log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed"); } @@ -243,10 +243,9 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } @Override - public DelayedResponses delayedResponses() { return delayedResponses; } - - @Override - public MemoryCache memoryCache() { return memoryCache; } + public DelayedResponses delayedResponses() { + return delayedResponses; + } private void updateWithNewConfig(RawConfig newConfig) { log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index 691bc6c43a7..1bcf8d5d8be 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -92,7 +92,7 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - server.proxyServer().memoryCache().update(config); + server.proxyServer().getMemoryCache().update(config); req = new Request("listCachedConfig"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); @@ -119,7 +119,7 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - server.proxyServer().memoryCache().update(config); + server.proxyServer().getMemoryCache().update(config); req = new Request("listCachedConfigFull"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); @@ -133,7 +133,7 @@ public class ConfigProxyRpcServerTest { } /** - * Tests listSourceConnections RPC command + * Tests printStatistics RPC command */ @Test public void testRpcMethodListSourceConnections() throws ListenFailedException { @@ -150,6 +150,20 @@ public class ConfigProxyRpcServerTest { assertThat(ret[1], is("All sources:\n" + configSourceAddress + "\n")); } + /** + * Tests printStatistics RPC command + */ + @Test + public void testRpcMethodPrintStatistics() { + Request req = new Request("printStatistics"); + client.invoke(req); + assertFalse(req.errorMessage(), req.isError()); + assertThat(req.returnValues().size(), is(1)); + assertThat(req.returnValues().get(0).asString(), is("\n" + + "Delayed responses queue size: 0\n" + + "Contents: ")); + } + /** * Tests invalidateCache RPC command */ @@ -261,7 +275,7 @@ public class ConfigProxyRpcServerTest { } private static ProxyServer createTestServer(ConfigSourceSet source) { - return new ProxyServer(null, source, new RpcConfigSourceClient(new ResponseHandler(), source)); + return new ProxyServer(null, source, new MemoryCache(), null); } private static class TestServer implements AutoCloseable { diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java index 8a668b34fd0..c2a0282fd05 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java @@ -6,7 +6,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; /** * @author hmusum @@ -28,15 +29,16 @@ public class DelayedResponseHandlerTest { public void basic() { ConfigTester tester = new ConfigTester(); DelayedResponses delayedResponses = new DelayedResponses(); - MemoryCache memoryCache = new MemoryCache(); + final MockRpcServer mockRpcServer = new MockRpcServer(); + final MemoryCache memoryCache = new MemoryCache(); memoryCache.update(ConfigTester.fooConfig); - DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, new ResponseHandler()); + final DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, mockRpcServer); delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 0))); delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 1200000))); // should not be returned yet delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.errorConfig, 0))); // will not give a config when resolving delayedResponseHandler.checkDelayedResponses(); - assertEquals(1, delayedResponseHandler.sentResponses()); + assertThat(mockRpcServer.responses, is(1L)); } } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java index ae7350b11e0..51d0b983764 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java @@ -16,8 +16,9 @@ public class MemoryCacheConfigClientTest { @Test public void basic() { - MemoryCacheConfigClient client = new MemoryCacheConfigClient(); - client.memoryCache().update(ConfigTester.fooConfig); + MemoryCache cache = new MemoryCache(); + cache.update(ConfigTester.fooConfig); + MemoryCacheConfigClient client = new MemoryCacheConfigClient(cache); assertThat(client.getConfig(ConfigTester.fooConfig, null), is(ConfigTester.fooConfig)); assertNull(client.getConfig(ConfigTester.barConfig, null)); diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java index d0724b9dbd0..c0efc1cb355 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java @@ -18,9 +18,9 @@ public class MockConfigSourceClient implements ConfigSourceClient{ private final MemoryCache memoryCache; private final DelayedResponses delayedResponses = new DelayedResponses(); - MockConfigSourceClient(MockConfigSource configSource) { + MockConfigSourceClient(MockConfigSource configSource, MemoryCache memoryCache) { this.configSource = configSource; - this.memoryCache = new MemoryCache(); + this.memoryCache = memoryCache; } @Override @@ -35,7 +35,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{ } @Override - public void shutdown() { + public void cancel() { configSource.clear(); } @@ -56,7 +56,4 @@ public class MockConfigSourceClient implements ConfigSourceClient{ @Override public DelayedResponses delayedResponses() { return delayedResponses; } - @Override - public MemoryCache memoryCache() { return memoryCache; } - } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java new file mode 100644 index 00000000000..56fcca191de --- /dev/null +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +/** + * @author hmusum + */ +public class MockRpcServer implements RpcServer { + + volatile long responses = 0; + volatile long errorResponses = 0; + + public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { + responses++; + } + + public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { + responses++; + errorResponses++; + } +} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java index 15de93b748f..cdda2bf6e77 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java @@ -2,10 +2,7 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.vespa.config.ConfigCacheKey; -import com.yahoo.vespa.config.ConfigKey; -import com.yahoo.vespa.config.ErrorCode; -import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.Payload; import org.junit.After; @@ -28,8 +25,9 @@ import static org.junit.Assert.assertTrue; */ public class ProxyServerTest { + private final MemoryCache memoryCache = new MemoryCache(); private final MockConfigSource source = new MockConfigSource(); - private final ConfigSourceClient client = new MockConfigSourceClient(source); + private final MockConfigSourceClient client = new MockConfigSourceClient(source, memoryCache); private ProxyServer proxy; static final RawConfig fooConfig = ConfigTester.fooConfig; @@ -48,7 +46,7 @@ public class ProxyServerTest { source.clear(); source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, 0)); source.put(errorConfigKey, createConfigWithNextConfigGeneration(fooConfig, ErrorCode.UNKNOWN_DEFINITION)); - proxy = createTestServer(source, client); + proxy = createTestServer(source, client, memoryCache); } @After @@ -59,10 +57,10 @@ public class ProxyServerTest { @Test public void basic() { assertTrue(proxy.getMode().isDefault()); - assertThat(proxy.memoryCache().size(), is(0)); + assertThat(proxy.getMemoryCache().size(), is(0)); ConfigTester tester = new ConfigTester(); - MemoryCache memoryCache = proxy.memoryCache(); + final MemoryCache memoryCache = proxy.getMemoryCache(); assertEquals(0, memoryCache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); @@ -76,7 +74,7 @@ public class ProxyServerTest { */ @Test public void testModeSwitch() { - ProxyServer proxy = createTestServer(source, client); + ProxyServer proxy = createTestServer(source, client, new MemoryCache()); assertTrue(proxy.getMode().isDefault()); for (String mode : Mode.modes()) { @@ -111,7 +109,7 @@ public class ProxyServerTest { @Test public void testGetConfigAndCaching() { ConfigTester tester = new ConfigTester(); - MemoryCache memoryCache = proxy.memoryCache(); + final MemoryCache memoryCache = proxy.getMemoryCache(); assertEquals(0, memoryCache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); @@ -136,14 +134,14 @@ public class ProxyServerTest { // Simulate an error response source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, ErrorCode.INTERNAL_ERROR)); - MemoryCache memoryCache = proxy.memoryCache(); - assertEquals(0, memoryCache.size()); + final MemoryCache cacheManager = proxy.getMemoryCache(); + assertEquals(0, cacheManager.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); assertNotNull(res.getPayload()); assertTrue(res.isError()); - assertEquals(0, memoryCache.size()); + assertEquals(0, cacheManager.size()); // Put a version of the same config into backend without error and see that it now works (i.e. we are // not getting a cached response (of the error in the previous request) @@ -154,12 +152,12 @@ public class ProxyServerTest { assertNotNull(res); assertNotNull(res.getPayload().getData()); assertThat(res.getPayload().toString(), is(ConfigTester.fooPayload.toString())); - assertEquals(1, memoryCache.size()); + assertEquals(1, cacheManager.size()); JRTServerConfigRequest newRequestBasedOnResponse = tester.createRequest(res); RawConfig res2 = proxy.resolveConfig(newRequestBasedOnResponse); assertFalse(ProxyServer.configOrGenerationHasChanged(res2, newRequestBasedOnResponse)); - assertEquals(1, memoryCache.size()); + assertEquals(1, cacheManager.size()); } /** @@ -171,7 +169,7 @@ public class ProxyServerTest { @Test public void testNoCachingOfEmptyConfig() { ConfigTester tester = new ConfigTester(); - MemoryCache cache = proxy.memoryCache(); + MemoryCache cache = proxy.getMemoryCache(); assertEquals(0, cache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); @@ -224,8 +222,10 @@ public class ProxyServerTest { assertThat(properties.configSources[0], is(ProxyServer.DEFAULT_PROXY_CONFIG_SOURCES)); } - private static ProxyServer createTestServer(ConfigSourceSet source, ConfigSourceClient configSourceClient) { - return new ProxyServer(null, source, configSourceClient); + private static ProxyServer createTestServer(ConfigSourceSet source, + ConfigSourceClient configSourceClient, + MemoryCache memoryCache) { + return new ProxyServer(null, source, memoryCache, configSourceClient); } static RawConfig createConfigWithNextConfigGeneration(RawConfig config, int errorCode) { diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java index ada98f4b30e..372c8c41c99 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java @@ -17,7 +17,7 @@ import static org.junit.Assert.assertEquals; */ public class RpcConfigSourceClientTest { - private ResponseHandler responseHandler; + private MockRpcServer rpcServer; private RpcConfigSourceClient rpcConfigSourceClient; @Rule @@ -26,8 +26,8 @@ public class RpcConfigSourceClientTest { @Before public void setup() { - responseHandler = new ResponseHandler(true); - rpcConfigSourceClient = new RpcConfigSourceClient(responseHandler, new MockConfigSource()); + rpcServer = new MockRpcServer(); + rpcConfigSourceClient = new RpcConfigSourceClient(rpcServer, new MockConfigSource(), new MemoryCache()); } @Test @@ -90,7 +90,7 @@ public class RpcConfigSourceClientTest { } private void assertSentResponses(int expected) { - assertEquals(expected, responseHandler.sentResponses()); + assertEquals(expected, rpcServer.responses); } private void simulateClientRequestingConfig(RawConfig config) { -- cgit v1.2.3