diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-12-10 12:00:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-10 12:00:56 +0100 |
commit | f2885492b5b7637b69c5284a65070c0b5202b42c (patch) | |
tree | d3aafaff7782d37393d5836e8ee28724a633a31a /config-proxy | |
parent | 16eab63752e78f962345e474112a23327a136c65 (diff) | |
parent | b92a7cba38cf8ae3c4682abaccab89026f6a5d4e (diff) |
Merge pull request #20448 from vespa-engine/hmusum/refactor-config-proxy
Hmusum/refactor config proxy [run-systemtest]
Diffstat (limited to 'config-proxy')
14 files changed, 173 insertions, 184 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index c7f6530f81c..3f03725fb60 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -30,10 +30,10 @@ import java.util.logging.Logger; * * @author hmusum */ -public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { +public class ConfigProxyRpcServer implements Runnable, TargetWatcher { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); - private static final int TRACELEVEL = 6; + static final int TRACELEVEL = 6; private final Spec spec; private final Supervisor supervisor; @@ -79,10 +79,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer this::ping) .methodDesc("ping") .returnDesc(0, "ret code", "return code, 0 is OK")); - supervisor.addMethod(new Method("printStatistics", "", "s", - this::printStatistics) - .methodDesc("printStatistics") - .returnDesc(0, "statistics", "Statistics for server")); supervisor.addMethod(new Method("listCachedConfig", "", "S", this::listCachedConfig) .methodDesc("list cached configs)") @@ -145,26 +141,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer }); } - /** - * Returns a String with statistics data for the server. - * - * @param req a Request - */ - private void printStatistics(Request req) { - dispatchRpcRequest(req, () -> { - StringBuilder sb = new StringBuilder(); - sb.append("\nDelayed responses queue size: "); - sb.append(proxyServer.delayedResponses().size()); - sb.append("\nContents: "); - for (DelayedResponse delayed : proxyServer.delayedResponses().responses()) { - sb.append(delayed.getRequest().toString()).append("\n"); - } - - req.returnValues().add(new StringValue(sb.toString())); - req.returnRequest(); - }); - } - private void listCachedConfig(Request req) { dispatchRpcRequest(req, () -> listCachedConfig(req, false)); } @@ -201,7 +177,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private void invalidateCache(Request req) { dispatchRpcRequest(req, () -> { - proxyServer.getMemoryCache().clear(); + proxyServer.memoryCache().clear(); String[] s = new String[2]; s[0] = "0"; s[1] = "success"; @@ -237,7 +213,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private void dumpCache(Request req) { dispatchRpcRequest(req, () -> { - final MemoryCache memoryCache = proxyServer.getMemoryCache(); + final MemoryCache memoryCache = proxyServer.memoryCache(); req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); req.returnRequest(); }); @@ -269,12 +245,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * @param request a Request */ private void getConfigImpl(JRTServerConfigRequest request) { + ResponseHandler responseHandler = new ResponseHandler(); request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums()); if (!request.validateParameters()) { // Error code is set in verifyParameters if parameters are not OK. log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); - returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); return; } try { @@ -282,13 +259,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer if (config == null) { log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { - returnOkResponse(request, config); + responseHandler.returnOkResponse(request, config); } else { log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response"); } } catch (Exception e) { e.printStackTrace(); - returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); } } @@ -302,7 +279,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private void listCachedConfig(Request req, boolean full) { String[] ret; - MemoryCache cache = proxyServer.getMemoryCache(); + MemoryCache cache = proxyServer.memoryCache(); ret = new String[cache.size()]; int i = 0; for (RawConfig config : cache.values()) { @@ -348,29 +325,4 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer // requesting this config? } - public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); - request.addOkResponse(config.getPayload(), - config.getGeneration(), - config.applyOnRestart(), - config.getPayloadChecksums()); - log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + - ",generation=" + config.getGeneration()); - log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); - - - // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse - // TODO Move logic so that all requests are returned in CheckDelayedResponse - try { - request.getRequest().returnRequest(); - } catch (IllegalStateException e) { - log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); - } - } - - public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); - request.addErrorResponse(errorCode, message); - request.getRequest().returnRequest(); - } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java index 6e5fe2d3fd8..dae732e56ec 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java @@ -16,7 +16,7 @@ interface ConfigSourceClient { RawConfig getConfig(RawConfig input, JRTServerConfigRequest request); - void cancel(); + void shutdown(); void shutdownSourceConnections(); @@ -26,4 +26,6 @@ interface ConfigSourceClient { DelayedResponses delayedResponses(); + MemoryCache memoryCache(); + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java index f77bd4b9138..0e8ebe0d9c9 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java @@ -6,10 +6,12 @@ import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.yolean.Exceptions; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.protect.Process.logAndDie; + /** * The run method of this class is executed periodically to return delayed responses * (requests use long polling, so config proxy needs to return a response when they time out). @@ -22,12 +24,13 @@ public class DelayedResponseHandler implements Runnable { private final DelayedResponses delayedResponses; private final MemoryCache memoryCache; - private final RpcServer rpcServer; + private final ResponseHandler responseHandler; + private final AtomicLong sentResponses = new AtomicLong(); - DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) { + DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) { this.delayedResponses = delayedResponses; this.memoryCache = memoryCache; - this.rpcServer = rpcServer; + this.responseHandler = responseHandler; } @Override @@ -41,25 +44,27 @@ public class DelayedResponseHandler implements Runnable { log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() + " delayed responses. First one is " + delayedResponses.responses().peek()); DelayedResponse response; - AtomicInteger i = new AtomicInteger(0); while ((response = delayedResponses.responses().poll()) != null) { JRTServerConfigRequest request = response.getRequest(); ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5()); RawConfig config = memoryCache.get(cacheKey); if (config != null) { - rpcServer.returnOkResponse(request, config); - i.incrementAndGet(); + responseHandler.returnOkResponse(request, config); + sentResponses.incrementAndGet(); } else { log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " + request.getConfigKey() + ", will retry"); } } - log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " + - (System.currentTimeMillis() - start) + " ms"); + log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() + + " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms"); } catch (Exception e) { // To avoid thread throwing exception and executor never running this again log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e)); } catch (Throwable e) { - com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); + logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); } } + + public long sentResponses() { return sentResponses.get(); } + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java index 6e90ad16f50..7ae8501278d 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java @@ -1,12 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import java.util.logging.Level; -import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import java.util.Collections; import java.util.List; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -20,8 +22,8 @@ class MemoryCacheConfigClient implements ConfigSourceClient { private final MemoryCache cache; private final DelayedResponses delayedResponses = new DelayedResponses(); - MemoryCacheConfigClient(MemoryCache cache) { - this.cache = cache; + MemoryCacheConfigClient() { + this.cache = new MemoryCache(); } /** @@ -44,7 +46,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient { } @Override - public void cancel() {} + public void shutdown() {} @Override public void shutdownSourceConnections() {} @@ -64,4 +66,7 @@ class MemoryCacheConfigClient implements ConfigSourceClient { return delayedResponses; } + @Override + public MemoryCache memoryCache() { return cache; } + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index d063c45a3f7..8756090e420 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -1,27 +1,27 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; +import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Transport; -import java.util.logging.Level; import com.yahoo.log.LogSetup; import com.yahoo.log.event.Event; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionAndUrlDownload; import com.yahoo.yolean.system.CatchSignals; import java.util.List; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.vespa.config.proxy.Mode.ModeName.DEFAULT; @@ -40,27 +40,24 @@ public class ProxyServer implements Runnable { private static final int JRT_TRANSPORT_THREADS = 4; static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070"; - private final static Logger log = Logger.getLogger(ProxyServer.class.getName()); + private static final Logger log = Logger.getLogger(ProxyServer.class.getName()); + private final AtomicBoolean signalCaught = new AtomicBoolean(false); private final Supervisor supervisor; private final ConfigProxyRpcServer rpcServer; - private ConfigSourceSet configSource; - - private volatile ConfigSourceClient configClient; - - private final MemoryCache memoryCache; private final FileDistributionAndUrlDownload fileDistributionAndUrlDownload; + private ConfigSourceSet configSource; + private volatile ConfigSourceClient configClient; private volatile Mode mode = new Mode(DEFAULT); - ProxyServer(Spec spec, ConfigSourceSet source, MemoryCache memoryCache, ConfigSourceClient configClient) { - this.configSource = source; - supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); + ProxyServer(Spec spec, ConfigSourceSet source, ConfigSourceClient configClient) { + this.configSource = Objects.requireNonNull(source); log.log(Level.FINE, () -> "Using config source '" + source); - this.memoryCache = memoryCache; + this.supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); this.rpcServer = createRpcServer(spec); - this.configClient = (configClient == null) ? createRpcClient(rpcServer, source, memoryCache) : configClient; + this.configClient = Objects.requireNonNull(configClient); this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source); } @@ -97,12 +94,12 @@ public class ProxyServer implements Runnable { switch (newMode.getMode()) { case MEMORYCACHE: configClient.shutdownSourceConnections(); - configClient = new MemoryCacheConfigClient(memoryCache); + configClient = new MemoryCacheConfigClient(); this.mode = new Mode(modeName); break; case DEFAULT: flush(); - configClient = createRpcClient(rpcServer, configSource, memoryCache); + configClient = createRpcClient(configSource); this.mode = new Mode(modeName); break; default: @@ -115,8 +112,8 @@ public class ProxyServer implements Runnable { return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } - private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source, MemoryCache memoryCache) { - return new RpcConfigSourceClient(rpcServer, source, memoryCache); + private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) { + return new RpcConfigSourceClient(new ResponseHandler(), source); } private void setupSignalHandler() { @@ -159,7 +156,7 @@ public class ProxyServer implements Runnable { Event.started("configproxy"); ConfigSourceSet configSources = new ConfigSourceSet(properties.configSources); - ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, new MemoryCache(), null); + ProxyServer proxyServer = new ProxyServer(new Spec(null, port), configSources, createRpcClient(configSources)); // catch termination and interrupt signal proxyServer.setupSignalHandler(); Thread proxyserverThread = threadFactory.newThread(proxyServer); @@ -169,7 +166,8 @@ public class ProxyServer implements Runnable { } static Properties getSystemProperties() { - final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(","); + String[] inputConfigSources = System.getProperty("proxyconfigsources", + DEFAULT_PROXY_CONFIG_SOURCES).split(","); return new Properties(inputConfigSources); } @@ -184,15 +182,15 @@ public class ProxyServer implements Runnable { // Cancels all config instances and flushes the cache. When this method returns, // the cache will not be updated again before someone calls getConfig(). private synchronized void flush() { - memoryCache.clear(); - configClient.cancel(); + configClient.memoryCache().clear(); + configClient.shutdown(); } void stop() { Event.stopping("configproxy", "shutdown rpcServer"); if (rpcServer != null) rpcServer.shutdown(); Event.stopping("configproxy", "cancel configClient"); - if (configClient != null) configClient.cancel(); + configClient.shutdown(); Event.stopping("configproxy", "flush"); flush(); Event.stopping("configproxy", "close fileDistribution"); @@ -200,8 +198,8 @@ public class ProxyServer implements Runnable { Event.stopping("configproxy", "stop complete"); } - MemoryCache getMemoryCache() { - return memoryCache; + MemoryCache memoryCache() { + return configClient.memoryCache(); } String getActiveSourceConnection() { @@ -215,7 +213,7 @@ public class ProxyServer implements Runnable { void updateSourceConnections(List<String> sources) { configSource = new ConfigSourceSet(sources); flush(); - configClient = createRpcClient(rpcServer, configSource, memoryCache); + configClient = createRpcClient(configSource); } DelayedResponses delayedResponses() { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java new file mode 100644 index 00000000000..c9cfbdd3e16 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL; + +/** + * An RPC server that handles config and file distribution requests. + * + * @author hmusum + */ +public class ResponseHandler { + + private final Optional<AtomicLong> sentResponses; + + public ResponseHandler() { + this(false); + } + + // For testing only + ResponseHandler(boolean trackResponses) { + sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty(); + } + + private final static Logger log = Logger.getLogger(ResponseHandler.class.getName()); + + public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); + request.addOkResponse(config.getPayload(), + config.getGeneration(), + config.applyOnRestart(), + config.getPayloadChecksums()); + log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + + ",generation=" + config.getGeneration()); + log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); + + + // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse + // TODO Move logic so that all requests are returned in CheckDelayedResponse + try { + request.getRequest().returnRequest(); + } catch (IllegalStateException e) { + log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); + } + sentResponses.ifPresent(AtomicLong::getAndIncrement); + } + + public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); + request.addErrorResponse(errorCode, message); + request.getRequest().returnRequest(); + } + + public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index 5df7b1fc021..362ca1164b8 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final Supervisor supervisor = new Supervisor(new Transport("config-source-client")); - private final RpcServer rpcServer; + private final ResponseHandler responseHandler; private final ConfigSourceSet configSourceSet; private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; @@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); private final ScheduledFuture<?> delayedResponsesFuture; - RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet, MemoryCache memoryCache) { - this.rpcServer = rpcServer; + RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) { + this.responseHandler = responseHandler; this.configSourceSet = configSourceSet; - this.memoryCache = memoryCache; + this.memoryCache = new MemoryCache(); this.delayedResponses = new DelayedResponses(); checkConfigSources(); nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); this.requester = JRTConfigRequester.create(configSourceSet, timingValues); - DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); + DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler); this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } @@ -163,7 +163,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } @Override - public void cancel() { + public void shutdown() { log.log(Level.FINE, "shutdownSourceConnections"); shutdownSourceConnections(); log.log(Level.FINE, "delayedResponsesFuture.cancel"); @@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation); if (config.getPayload().getData().getByteLength() == 0) log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config"); - rpcServer.returnOkResponse(request, config); + responseHandler.returnOkResponse(request, config); } else { log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed"); } @@ -243,9 +243,10 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { } @Override - public DelayedResponses delayedResponses() { - return delayedResponses; - } + public DelayedResponses delayedResponses() { return delayedResponses; } + + @Override + public MemoryCache memoryCache() { return memoryCache; } private void updateWithNewConfig(RawConfig newConfig) { log.log(Level.FINE, () -> "config to be returned for '" + newConfig.getKey() + diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index 1bcf8d5d8be..691bc6c43a7 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -92,7 +92,7 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - server.proxyServer().getMemoryCache().update(config); + server.proxyServer().memoryCache().update(config); req = new Request("listCachedConfig"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); @@ -119,7 +119,7 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - server.proxyServer().getMemoryCache().update(config); + server.proxyServer().memoryCache().update(config); req = new Request("listCachedConfigFull"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); @@ -133,7 +133,7 @@ public class ConfigProxyRpcServerTest { } /** - * Tests printStatistics RPC command + * Tests listSourceConnections RPC command */ @Test public void testRpcMethodListSourceConnections() throws ListenFailedException { @@ -151,20 +151,6 @@ public class ConfigProxyRpcServerTest { } /** - * Tests printStatistics RPC command - */ - @Test - public void testRpcMethodPrintStatistics() { - Request req = new Request("printStatistics"); - client.invoke(req); - assertFalse(req.errorMessage(), req.isError()); - assertThat(req.returnValues().size(), is(1)); - assertThat(req.returnValues().get(0).asString(), is("\n" + - "Delayed responses queue size: 0\n" + - "Contents: ")); - } - - /** * Tests invalidateCache RPC command */ @Test @@ -275,7 +261,7 @@ public class ConfigProxyRpcServerTest { } private static ProxyServer createTestServer(ConfigSourceSet source) { - return new ProxyServer(null, source, new MemoryCache(), null); + return new ProxyServer(null, source, new RpcConfigSourceClient(new ResponseHandler(), source)); } private static class TestServer implements AutoCloseable { diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java index c2a0282fd05..8a668b34fd0 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java @@ -6,8 +6,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; /** * @author hmusum @@ -29,16 +28,15 @@ public class DelayedResponseHandlerTest { public void basic() { ConfigTester tester = new ConfigTester(); DelayedResponses delayedResponses = new DelayedResponses(); - final MockRpcServer mockRpcServer = new MockRpcServer(); - final MemoryCache memoryCache = new MemoryCache(); + MemoryCache memoryCache = new MemoryCache(); memoryCache.update(ConfigTester.fooConfig); - final DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, mockRpcServer); + DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, new ResponseHandler()); delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 0))); delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 1200000))); // should not be returned yet delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.errorConfig, 0))); // will not give a config when resolving delayedResponseHandler.checkDelayedResponses(); - assertThat(mockRpcServer.responses, is(1L)); + assertEquals(1, delayedResponseHandler.sentResponses()); } } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java index 51d0b983764..ae7350b11e0 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClientTest.java @@ -16,9 +16,8 @@ public class MemoryCacheConfigClientTest { @Test public void basic() { - MemoryCache cache = new MemoryCache(); - cache.update(ConfigTester.fooConfig); - MemoryCacheConfigClient client = new MemoryCacheConfigClient(cache); + MemoryCacheConfigClient client = new MemoryCacheConfigClient(); + client.memoryCache().update(ConfigTester.fooConfig); assertThat(client.getConfig(ConfigTester.fooConfig, null), is(ConfigTester.fooConfig)); assertNull(client.getConfig(ConfigTester.barConfig, null)); diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java index c0efc1cb355..d0724b9dbd0 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockConfigSourceClient.java @@ -18,9 +18,9 @@ public class MockConfigSourceClient implements ConfigSourceClient{ private final MemoryCache memoryCache; private final DelayedResponses delayedResponses = new DelayedResponses(); - MockConfigSourceClient(MockConfigSource configSource, MemoryCache memoryCache) { + MockConfigSourceClient(MockConfigSource configSource) { this.configSource = configSource; - this.memoryCache = memoryCache; + this.memoryCache = new MemoryCache(); } @Override @@ -35,7 +35,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{ } @Override - public void cancel() { + public void shutdown() { configSource.clear(); } @@ -56,4 +56,7 @@ public class MockConfigSourceClient implements ConfigSourceClient{ @Override public DelayedResponses delayedResponses() { return delayedResponses; } + @Override + public MemoryCache memoryCache() { return memoryCache; } + } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java deleted file mode 100644 index 56fcca191de..00000000000 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.proxy; - -import com.yahoo.vespa.config.RawConfig; -import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; - -/** - * @author hmusum - */ -public class MockRpcServer implements RpcServer { - - volatile long responses = 0; - volatile long errorResponses = 0; - - public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { - responses++; - } - - public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { - responses++; - errorResponses++; - } -} diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java index cdda2bf6e77..15de93b748f 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java @@ -2,7 +2,10 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSourceSet; -import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.ErrorCode; +import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.Payload; import org.junit.After; @@ -25,9 +28,8 @@ import static org.junit.Assert.assertTrue; */ public class ProxyServerTest { - private final MemoryCache memoryCache = new MemoryCache(); private final MockConfigSource source = new MockConfigSource(); - private final MockConfigSourceClient client = new MockConfigSourceClient(source, memoryCache); + private final ConfigSourceClient client = new MockConfigSourceClient(source); private ProxyServer proxy; static final RawConfig fooConfig = ConfigTester.fooConfig; @@ -46,7 +48,7 @@ public class ProxyServerTest { source.clear(); source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, 0)); source.put(errorConfigKey, createConfigWithNextConfigGeneration(fooConfig, ErrorCode.UNKNOWN_DEFINITION)); - proxy = createTestServer(source, client, memoryCache); + proxy = createTestServer(source, client); } @After @@ -57,10 +59,10 @@ public class ProxyServerTest { @Test public void basic() { assertTrue(proxy.getMode().isDefault()); - assertThat(proxy.getMemoryCache().size(), is(0)); + assertThat(proxy.memoryCache().size(), is(0)); ConfigTester tester = new ConfigTester(); - final MemoryCache memoryCache = proxy.getMemoryCache(); + MemoryCache memoryCache = proxy.memoryCache(); assertEquals(0, memoryCache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); @@ -74,7 +76,7 @@ public class ProxyServerTest { */ @Test public void testModeSwitch() { - ProxyServer proxy = createTestServer(source, client, new MemoryCache()); + ProxyServer proxy = createTestServer(source, client); assertTrue(proxy.getMode().isDefault()); for (String mode : Mode.modes()) { @@ -109,7 +111,7 @@ public class ProxyServerTest { @Test public void testGetConfigAndCaching() { ConfigTester tester = new ConfigTester(); - final MemoryCache memoryCache = proxy.getMemoryCache(); + MemoryCache memoryCache = proxy.memoryCache(); assertEquals(0, memoryCache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); @@ -134,14 +136,14 @@ public class ProxyServerTest { // Simulate an error response source.put(fooConfig.getKey(), createConfigWithNextConfigGeneration(fooConfig, ErrorCode.INTERNAL_ERROR)); - final MemoryCache cacheManager = proxy.getMemoryCache(); - assertEquals(0, cacheManager.size()); + MemoryCache memoryCache = proxy.memoryCache(); + assertEquals(0, memoryCache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); assertNotNull(res); assertNotNull(res.getPayload()); assertTrue(res.isError()); - assertEquals(0, cacheManager.size()); + assertEquals(0, memoryCache.size()); // Put a version of the same config into backend without error and see that it now works (i.e. we are // not getting a cached response (of the error in the previous request) @@ -152,12 +154,12 @@ public class ProxyServerTest { assertNotNull(res); assertNotNull(res.getPayload().getData()); assertThat(res.getPayload().toString(), is(ConfigTester.fooPayload.toString())); - assertEquals(1, cacheManager.size()); + assertEquals(1, memoryCache.size()); JRTServerConfigRequest newRequestBasedOnResponse = tester.createRequest(res); RawConfig res2 = proxy.resolveConfig(newRequestBasedOnResponse); assertFalse(ProxyServer.configOrGenerationHasChanged(res2, newRequestBasedOnResponse)); - assertEquals(1, cacheManager.size()); + assertEquals(1, memoryCache.size()); } /** @@ -169,7 +171,7 @@ public class ProxyServerTest { @Test public void testNoCachingOfEmptyConfig() { ConfigTester tester = new ConfigTester(); - MemoryCache cache = proxy.getMemoryCache(); + MemoryCache cache = proxy.memoryCache(); assertEquals(0, cache.size()); RawConfig res = proxy.resolveConfig(tester.createRequest(fooConfig)); @@ -222,10 +224,8 @@ public class ProxyServerTest { assertThat(properties.configSources[0], is(ProxyServer.DEFAULT_PROXY_CONFIG_SOURCES)); } - private static ProxyServer createTestServer(ConfigSourceSet source, - ConfigSourceClient configSourceClient, - MemoryCache memoryCache) { - return new ProxyServer(null, source, memoryCache, configSourceClient); + private static ProxyServer createTestServer(ConfigSourceSet source, ConfigSourceClient configSourceClient) { + return new ProxyServer(null, source, configSourceClient); } static RawConfig createConfigWithNextConfigGeneration(RawConfig config, int errorCode) { diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java index 372c8c41c99..ada98f4b30e 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java @@ -17,7 +17,7 @@ import static org.junit.Assert.assertEquals; */ public class RpcConfigSourceClientTest { - private MockRpcServer rpcServer; + private ResponseHandler responseHandler; private RpcConfigSourceClient rpcConfigSourceClient; @Rule @@ -26,8 +26,8 @@ public class RpcConfigSourceClientTest { @Before public void setup() { - rpcServer = new MockRpcServer(); - rpcConfigSourceClient = new RpcConfigSourceClient(rpcServer, new MockConfigSource(), new MemoryCache()); + responseHandler = new ResponseHandler(true); + rpcConfigSourceClient = new RpcConfigSourceClient(responseHandler, new MockConfigSource()); } @Test @@ -90,7 +90,7 @@ public class RpcConfigSourceClientTest { } private void assertSentResponses(int expected) { - assertEquals(expected, rpcServer.responses); + assertEquals(expected, responseHandler.sentResponses()); } private void simulateClientRequestingConfig(RawConfig config) { |