diff options
Diffstat (limited to 'config-proxy/src/main/java')
5 files changed, 94 insertions, 50 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index 207a6385b5a..3f03725fb60 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -30,10 +30,10 @@ import java.util.logging.Logger; * * @author hmusum */ -public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { +public class ConfigProxyRpcServer implements Runnable, TargetWatcher { private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); - private static final int TRACELEVEL = 6; + static final int TRACELEVEL = 6; private final Spec spec; private final Supervisor supervisor; @@ -245,12 +245,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * @param request a Request */ private void getConfigImpl(JRTServerConfigRequest request) { + ResponseHandler responseHandler = new ResponseHandler(); request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums()); if (!request.validateParameters()) { // Error code is set in verifyParameters if parameters are not OK. log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); - returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); return; } try { @@ -258,13 +259,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer if (config == null) { log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response"); } else if (ProxyServer.configOrGenerationHasChanged(config, request)) { - returnOkResponse(request, config); + responseHandler.returnOkResponse(request, config); } else { log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response"); } } catch (Exception e) { e.printStackTrace(); - returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); } } @@ -324,29 +325,4 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer // requesting this config? } - public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); - request.addOkResponse(config.getPayload(), - config.getGeneration(), - config.applyOnRestart(), - config.getPayloadChecksums()); - log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + - ",generation=" + config.getGeneration()); - log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); - - - // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse - // TODO Move logic so that all requests are returned in CheckDelayedResponse - try { - request.getRequest().returnRequest(); - } catch (IllegalStateException e) { - log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); - } - } - - public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { - request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); - request.addErrorResponse(errorCode, message); - request.getRequest().returnRequest(); - } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java index f77bd4b9138..0e8ebe0d9c9 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java @@ -6,10 +6,12 @@ import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.yolean.Exceptions; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import static com.yahoo.protect.Process.logAndDie; + /** * The run method of this class is executed periodically to return delayed responses * (requests use long polling, so config proxy needs to return a response when they time out). @@ -22,12 +24,13 @@ public class DelayedResponseHandler implements Runnable { private final DelayedResponses delayedResponses; private final MemoryCache memoryCache; - private final RpcServer rpcServer; + private final ResponseHandler responseHandler; + private final AtomicLong sentResponses = new AtomicLong(); - DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) { + DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) { this.delayedResponses = delayedResponses; this.memoryCache = memoryCache; - this.rpcServer = rpcServer; + this.responseHandler = responseHandler; } @Override @@ -41,25 +44,27 @@ public class DelayedResponseHandler implements Runnable { log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() + " delayed responses. First one is " + delayedResponses.responses().peek()); DelayedResponse response; - AtomicInteger i = new AtomicInteger(0); while ((response = delayedResponses.responses().poll()) != null) { JRTServerConfigRequest request = response.getRequest(); ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5()); RawConfig config = memoryCache.get(cacheKey); if (config != null) { - rpcServer.returnOkResponse(request, config); - i.incrementAndGet(); + responseHandler.returnOkResponse(request, config); + sentResponses.incrementAndGet(); } else { log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " + request.getConfigKey() + ", will retry"); } } - log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " + - (System.currentTimeMillis() - start) + " ms"); + log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() + + " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms"); } catch (Exception e) { // To avoid thread throwing exception and executor never running this again log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e)); } catch (Throwable e) { - com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); + logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e)); } } + + public long sentResponses() { return sentResponses.get(); } + } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 8492381786b..63e5bd69bc3 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -56,7 +56,7 @@ public class ProxyServer implements Runnable { supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true); log.log(Level.FINE, () -> "Using config source '" + source); this.rpcServer = createRpcServer(spec); - this.configClient = (configClient == null) ? createRpcClient(rpcServer, source) : configClient; + this.configClient = (configClient == null) ? createRpcClient(source) : configClient; this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source); } @@ -98,7 +98,7 @@ public class ProxyServer implements Runnable { break; case DEFAULT: flush(); - configClient = createRpcClient(rpcServer, configSource); + configClient = createRpcClient(configSource); this.mode = new Mode(modeName); break; default: @@ -111,8 +111,8 @@ public class ProxyServer implements Runnable { return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } - private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source) { - return new RpcConfigSourceClient(rpcServer, source); + private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) { + return new RpcConfigSourceClient(new ResponseHandler(), source); } private void setupSignalHandler() { @@ -211,7 +211,7 @@ public class ProxyServer implements Runnable { void updateSourceConnections(List<String> sources) { configSource = new ConfigSourceSet(sources); flush(); - configClient = createRpcClient(rpcServer, configSource); + configClient = createRpcClient(configSource); } DelayedResponses delayedResponses() { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java new file mode 100644 index 00000000000..c9cfbdd3e16 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL; + +/** + * An RPC server that handles config and file distribution requests. + * + * @author hmusum + */ +public class ResponseHandler { + + private final Optional<AtomicLong> sentResponses; + + public ResponseHandler() { + this(false); + } + + // For testing only + ResponseHandler(boolean trackResponses) { + sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty(); + } + + private final static Logger log = Logger.getLogger(ResponseHandler.class.getName()); + + public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); + request.addOkResponse(config.getPayload(), + config.getGeneration(), + config.applyOnRestart(), + config.getPayloadChecksums()); + log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() + + ",generation=" + config.getGeneration()); + log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload()); + + + // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse + // TODO Move logic so that all requests are returned in CheckDelayedResponse + try { + request.getRequest().returnRequest(); + } catch (IllegalStateException e) { + log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); + } + sentResponses.ifPresent(AtomicLong::getAndIncrement); + } + + public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); + request.addErrorResponse(errorCode, message); + request.getRequest().returnRequest(); + } + + public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java index ce60e2cea67..0b9d0241890 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { private final Supervisor supervisor = new Supervisor(new Transport("config-source-client")); - private final RpcServer rpcServer; + private final ResponseHandler responseHandler; private final ConfigSourceSet configSourceSet; private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>(); private final MemoryCache memoryCache; @@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses")); private final ScheduledFuture<?> delayedResponsesFuture; - RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet) { - this.rpcServer = rpcServer; + RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) { + this.responseHandler = responseHandler; this.configSourceSet = configSourceSet; this.memoryCache = new MemoryCache(); this.delayedResponses = new DelayedResponses(); checkConfigSources(); nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS); this.requester = JRTConfigRequester.create(configSourceSet, timingValues); - DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer); + DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler); this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS); } @@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable { log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation); if (config.getPayload().getData().getByteLength() == 0) log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config"); - rpcServer.returnOkResponse(request, config); + responseHandler.returnOkResponse(request, config); } else { log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed"); } |