diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /config-proxy/src/main/java |
Publish
Diffstat (limited to 'config-proxy/src/main/java')
18 files changed, 1826 insertions, 0 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java new file mode 100644 index 00000000000..993edd2962f --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.RawConfig; + +/** + * Manages memory and disk caches. + * + * @author musum + */ +public class CacheManager { + + private final MemoryCache memoryCache; + + public CacheManager(MemoryCache memoryCache) { + this.memoryCache = memoryCache; + } + + static CacheManager createTestCacheManager() { + return CacheManager.createTestCacheManager(new MemoryCache()); + } + + static CacheManager createTestCacheManager(MemoryCache memoryCache) { + return new CacheManager(memoryCache); + } + + void putInCache(RawConfig config) { + putInMemoryCache(config); + } + + private void putInMemoryCache(RawConfig config) { + if (!config.isError()) { + memoryCache.put(config); + } + } + + public MemoryCache getMemoryCache() { + return memoryCache; + } + + // Only for testing. + int getCacheSize() { + return getMemoryCache().size(); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java new file mode 100644 index 00000000000..e8a2f5455a1 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; +import com.yahoo.yolean.Exceptions; +import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.RawConfig; + +import java.util.Date; +import java.util.logging.Logger; + +/** + * The run method of this class is executed periodically to + * return requests that are about to time out. + * + * @author musum + */ +public class CheckDelayedResponses implements Runnable { + + final static Logger log = Logger.getLogger(CheckDelayedResponses.class.getName()); + + private final DelayedResponses delayedResponses; + private final MemoryCache memoryCache; + private final RpcServer rpcServer; + + public CheckDelayedResponses(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) { + this.delayedResponses = delayedResponses; + this.memoryCache = memoryCache; + this.rpcServer = rpcServer; + } + + @Override + public void run() { + checkDelayedResponses(); + } + + void checkDelayedResponses() { + try { + long start = System.currentTimeMillis(); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Running CheckDelayedResponses. There are " + delayedResponses.size() + " delayed responses. First one is " + delayedResponses.responses().peek()); + } + DelayedResponse response; + int i = 0; + + while ((response = delayedResponses.responses().poll()) != null) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Returning with response that has return time " + new Date(response.getReturnTime())); + } + JRTServerConfigRequest request = response.getRequest(); + ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getConfigKey().getMd5()); + RawConfig config = memoryCache.get(cacheKey); + if (config != null) { + rpcServer.returnOkResponse(request, config); + i++; + } else { + log.log(LogLevel.WARNING, "No config found for " + request.getConfigKey() + " within timeout, will retry"); + } + } + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Finished running CheckDelayedResponses. " + i + " delayed responses sent in " + + (System.currentTimeMillis() - start) + " ms"); + } + } catch (Exception e) { // To avoid thread throwing exception and executor never running this again + log.log(LogLevel.WARNING, "Got exception in CheckDelayedResponses: " + Exceptions.toMessageString(e)); + } catch (Throwable e) { + com.yahoo.protect.Process.logAndDie("Got error in CheckDelayedResponses, exiting: " + Exceptions.toMessageString(e)); + } + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java new file mode 100644 index 00000000000..133ecf4c94f --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.concurrent.DelayQueue; +import java.util.logging.Logger; + +/** + * Updates clients subscribing to config when config changes or the + * timeout they have specified has elapsed. + * + * @author musum + */ +public class ClientUpdater { + final static Logger log = Logger.getLogger(ClientUpdater.class.getName()); + + private final CacheManager cacheManager; + private final ConfigProxyStatistics statistics; + private final RpcServer rpcServer; + private final DelayedResponses delayedResponses; + private final Mode mode; + + public ClientUpdater(CacheManager cacheManager, + RpcServer rpcServer, + ConfigProxyStatistics statistics, + DelayedResponses delayedResponses, + Mode mode) { + this.cacheManager = cacheManager; + this.rpcServer = rpcServer; + this.statistics = statistics; + this.delayedResponses = delayedResponses; + this.mode = mode; + } + + /** + * This method will be called when a response with changed config is received from upstream + * (content or generation has changed) or the server timeout has elapsed. + * Updates the cache with the returned config. + * + * @param config new config + */ + void updateSubscribers(RawConfig config) { + // ignore updates if we are in one of these modes (we will then only serve from cache). + if (!mode.requiresConfigSource()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Not updating " + config.getKey() + "," + config.getGeneration() + + ", since we are in '" + mode + "' mode"); + } + return; + } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Config updated for " + config.getKey() + "," + config.getGeneration()); + } + cacheManager.putInCache(config); + sendResponse(config); + } + + void sendResponse(RawConfig config) { + if (config.isError()) { statistics.incErrorCount(); } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Sending response for " + config.getKey() + "," + config.getGeneration()); + } + DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses(); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Delayed response queue: " + responseDelayQueue); + } + if (responseDelayQueue.size() == 0) { + log.log(LogLevel.DEBUG, "There exists no matching element on delayed response queue for " + config.getKey()); + return; + } else { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Delayed response queue has " + responseDelayQueue.size() + " elements"); + } + } + DelayedResponse[] responses = new DelayedResponse[1]; + responses = responseDelayQueue.toArray(responses); + boolean found = false; + if (responses.length > 0) { + for (DelayedResponse response : responses) { + JRTServerConfigRequest request = response.getRequest(); + if (request.getConfigKey().equals(config.getKey())) { + if (!delayedResponses.remove(response)) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Could not remove " + config.getKey() + " from delayed delayedResponses queue, already removed"); + } + continue; + } + found = true; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration()); + } + rpcServer.returnOkResponse(request, config); + } + } + + } + if (!found) { + log.log(LogLevel.DEBUG, "Found no recipient for " + config.getKey() + " in delayed response queue"); + } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Finished updating config for " + config.getKey() + "," + config.getGeneration()); + } + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java new file mode 100644 index 00000000000..b1cf0c01165 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -0,0 +1,340 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.jrt.*; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.ErrorCode; +import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; + +import java.lang.*; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Set; +import java.util.logging.Logger; + + +/** + * A proxy server that handles RPC config requests. + * + * @author <a href="musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1 + */ +public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer { + + private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName()); + static final int TRACELEVEL = 6; + + private final Spec spec; + private final Supervisor supervisor = new Supervisor(new Transport()); + private final ProxyServer proxyServer; + + public ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) { + this.proxyServer = proxyServer; + this.spec = spec; + setUp(); + } + + public void run() { + try { + Acceptor acceptor = supervisor.listen(spec); + log.log(LogLevel.DEBUG, "Ready for requests on " + spec); + supervisor.transport().join(); + acceptor.shutdown().join(); + } catch (ListenFailedException e) { + proxyServer.stop(); + throw new RuntimeException("Could not listen on " + spec, e); + } + } + + void shutdown() { + supervisor.transport().shutdown(); + } + + public Spec getSpec() { + return spec; + } + + void setUp() { + // The getConfig method in this class will handle RPC calls for getting config + supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3")); + supervisor.addMethod(new Method("ping", "", "i", + this, "ping") + .methodDesc("ping") + .returnDesc(0, "ret code", "return code, 0 is OK")); + supervisor.addMethod(new Method("printStatistics", "", "s", + this, "printStatistics") + .methodDesc("printStatistics") + .returnDesc(0, "statistics", "Statistics for server")); + supervisor.addMethod(new Method("listCachedConfig", "", "S", + this, "listCachedConfig") + .methodDesc("list cached configs)") + .returnDesc(0, "data", "string array of configs")); + supervisor.addMethod(new Method("listCachedConfigFull", "", "S", + this, "listCachedConfigFull") + .methodDesc("list cached configs with cache content)") + .returnDesc(0, "data", "string array of configs")); + supervisor.addMethod(new Method("listSourceConnections", "", "S", + this, "listSourceConnections") + .methodDesc("list config source connections)") + .returnDesc(0, "data", "string array of source connections")); + supervisor.addMethod(new Method("invalidateCache", "", "S", + this, "invalidateCache") + .methodDesc("list config source connections)") + .returnDesc(0, "data", "0 if success, 1 otherwise")); + supervisor.addMethod(new Method("updateSources", "s", "s", + this, "updateSources") + .methodDesc("update list of config sources") + .returnDesc(0, "ret", "list of updated config sources")); + supervisor.addMethod(new Method("setMode", "s", "S", + this, "setMode") + .methodDesc("Set config proxy mode { default | memorycache }") + .returnDesc(0, "ret", "0 if success, 1 otherwise as first element, description as second element")); + supervisor.addMethod(new Method("getMode", "", "s", + this, "getMode") + .methodDesc("What serving mode the config proxy is in (default, memorycache)") + .returnDesc(0, "ret", "mode as a string")); + supervisor.addMethod(new Method("dumpCache", "s", "s", + this, "dumpCache") + .methodDesc("Dump cache to disk") + .paramDesc(0, "path", "path to write cache contents to") + .returnDesc(0, "ret", "Empty string or error message")); + } + + /** + * Handles RPC method "config.v3.getConfig" requests. + * + * @param req a Request + */ + @SuppressWarnings({"UnusedDeclaration"}) + public final void getConfigV3(Request req) { + log.log(LogLevel.DEBUG, "getConfigV2"); + JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req); + if (isProtocolVersionSupported(request)) { + preHandle(req); + getConfigImpl(request); + } + } + + private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { + Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); + if (supportedProtocolVersions.contains(request.getProtocolVersion())) { + return true; + } else { + String message = "Illegal protocol version " + request.getProtocolVersion() + + " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported"; + log.log(LogLevel.ERROR, message); + request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message); + } + return false; + } + + private void preHandle(Request req) { + proxyServer.getStatistics().incRpcRequests(); + req.detach(); + req.target().addWatcher(this); + } + + /** + * Handles all versions of "getConfig" requests. + * + * @param request a Request + */ + private void getConfigImpl(JRTServerConfigRequest request) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()"); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5()); + } + if (!request.validateParameters()) { + // Error code is set in verifyParameters if parameters are not OK. + log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage()); + returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage()); + return; + } + try { + RawConfig config = proxyServer.resolveConfig(request); + if (ProxyServer.configOrGenerationHasChanged(config, request)) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Should send response for " + request.getShortDescription() + ",config=" + config); + } + returnOkResponse(request, config); + } else { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Should not send response for " + request.getShortDescription() + ", config=" + config); + } + } + } catch (Exception e) { + e.printStackTrace(); + returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage()); + } + } + + /** + * Returns 0 if server is alive. + * + * @param req a Request + */ + public final void ping(Request req) { + req.returnValues().add(new Int32Value(0)); + } + + /** + * Returns a String with statistics data for the server. + * + * @param req a Request + */ + public final void printStatistics(Request req) { + StringBuilder sb = new StringBuilder(); + sb.append("\nDelayed responses queue size: "); + sb.append(proxyServer.delayedResponses.size()); + sb.append("\nContents: "); + for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) { + sb.append(delayed.getRequest().toString()).append("\n"); + } + + req.returnValues().add(new StringValue(sb.toString())); + } + + public final void listCachedConfig(Request req) { + listCachedConfig(req, false); + } + + public final void listCachedConfigFull(Request req) { + listCachedConfig(req, true); + } + + public final void listSourceConnections(Request req) { + String[] ret = new String[2]; + ret[0] = "Current source: " + proxyServer.getActiveSourceConnection(); + ret[1] = "All sources:\n" + printSourceConnections(); + req.returnValues().add(new StringArray(ret)); + } + + private String printSourceConnections() { + StringBuilder sb = new StringBuilder(); + for (String s : proxyServer.getSourceConnections()) { + sb.append(s).append("\n"); + } + return sb.toString(); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void updateSources(Request req) { + String sources = req.parameters().get(0).asString(); + String ret; + System.out.println(proxyServer.getMode()); + if (proxyServer.getMode().requiresConfigSource()) { + proxyServer.updateSourceConnections(Arrays.asList(sources.split(","))); + ret = "Updated config sources to: " + sources; + } else { + ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode"; + } + req.returnValues().add(new StringValue(ret)); + } + + public final void invalidateCache(Request req) { + proxyServer.getCacheManager().getMemoryCache().clear(); + String[] s = new String[2]; + s[0] = "0"; + s[1] = "success"; + req.returnValues().add(new StringArray(s)); + } + + public final void setMode(Request req) { + String suppliedMode = req.parameters().get(0).asString(); + log.log(LogLevel.DEBUG, "Supplied mode=" + suppliedMode); + String[] s = new String[2]; + if (Mode.validModeName(suppliedMode.toLowerCase())) { + proxyServer.setMode(suppliedMode); + s[0] = "0"; + s[1] = "success"; + } else { + s[0] = "1"; + s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'"; + } + + req.returnValues().add(new StringArray(s)); + } + + public final void getMode(Request req) { + req.returnValues().add(new StringValue(proxyServer.getMode().name())); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void dumpCache(Request req) { + final CacheManager cacheManager = proxyServer.getCacheManager(); + req.returnValues().add(new StringValue(cacheManager.getMemoryCache().dumpCacheToDisk(req.parameters().get(0).asString(), cacheManager.getMemoryCache()))); + } + + final void listCachedConfig(Request req, boolean full) { + String[] ret; + MemoryCache cache = proxyServer.getCacheManager().getMemoryCache(); + ret = new String[cache.size()]; + int i = 0; + for (RawConfig config : cache.values()) { + StringBuilder sb = new StringBuilder(); + sb.append(config.getNamespace()); + sb.append("."); + sb.append(config.getName()); + sb.append(","); + sb.append(config.getConfigId()); + sb.append(","); + sb.append(config.getGeneration()); + sb.append(","); + sb.append(config.getConfigMd5()); + if (full) { + sb.append(","); + sb.append(config.getPayload()); + } + ret[i] = sb.toString(); + i++; + } + Arrays.sort(ret); + req.returnValues().add(new StringArray(ret)); + } + + /** + * Removes invalid targets (closed client connections) from delayedResponsesQueue. + * + * @param target a Target that has become invalid (i.e, client has closed connection) + */ + @Override + public void notifyTargetInvalid(Target target) { + log.log(LogLevel.DEBUG, "Target invalid " + target); + for (Iterator<DelayedResponse> it = proxyServer.delayedResponses.responses().iterator(); it.hasNext(); ) { + DelayedResponse delayed = it.next(); + JRTServerConfigRequest request = delayed.getRequest(); + if (request.getRequest().target().equals(target)) { + log.log(LogLevel.DEBUG, "Removing " + request.getShortDescription()); + it.remove(); + } + } + // TODO: Could we also cancel active getConfig requests upstream if the client was the only one + // requesting this config? + } + + public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()"); + request.addOkResponse(config.getPayload(), config.getGeneration(), config.getConfigMd5()); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Return response: " + request.getShortDescription() + ",configMd5=" + config.getConfigMd5() + + ",config=" + config.getPayload()); + } + + // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse + // TODO Move logic so that all requests are returned in CheckDelayedResponse + try { + request.getRequest().returnRequest(); + } catch (IllegalStateException e) { + log.log(LogLevel.DEBUG, "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage()); + } + } + + public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) { + request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()"); + request.addErrorResponse(errorCode, message); + request.getRequest().returnRequest(); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java new file mode 100644 index 00000000000..8e0543d7914 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java @@ -0,0 +1,105 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.log.LogLevel; +import com.yahoo.log.event.Event; + +/** + * Statistics/metrics for config proxy. + * //TODO Use metrics framework + * + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.7 + */ +class ConfigProxyStatistics implements Runnable { + static final long defaultEventInterval = 5 * 60; // in seconds + + private final long eventInterval; // in seconds + private boolean stopped; + private long lastRun = System.currentTimeMillis(); + + /* Number of RPC getConfig requests */ + private long rpcRequests = 0; + private long processedRequests = 0; + private long errors = 0; + private long delayedResponses = 0; + + ConfigProxyStatistics() { + this(defaultEventInterval); + } + + ConfigProxyStatistics(long eventInterval) { + this.eventInterval = eventInterval; + } + + // Send events every eventInterval seconds + public void run() { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + ProxyServer.log.log(LogLevel.WARNING, e.getMessage()); + } + if (stopped) { + return; + } + ProxyServer.log.log(LogLevel.SPAM, "Running ConfigProxyStatistics"); + // Only send events every eventInterval seconds + if ((System.currentTimeMillis() - lastRun) > eventInterval * 1000) { + lastRun = System.currentTimeMillis(); + sendEvents(); + } + } + } + + private void sendEvents() { + Event.count("rpc_requests", rpcRequests()); + Event.count("processed_messages", processedRequests()); + Event.count("errors", errors()); + Event.value("delayed_responses", delayedResponses()); + } + + public void stop() { + stopped = true; + } + + public Long getEventInterval() { + return eventInterval; + } + + void incRpcRequests() { + rpcRequests++; + } + + void incProcessedRequests() { + processedRequests++; + } + + void incErrorCount() { + errors++; + } + + long processedRequests() { + return processedRequests; + } + + long rpcRequests() { + return rpcRequests; + } + + long errors() { + return errors; + } + + long delayedResponses() { + return delayedResponses; + } + + void delayedResponses(long count) { + delayedResponses = count; + } + + void decDelayedResponses() { + delayedResponses--; + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java new file mode 100644 index 00000000000..8ef63cff5c3 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.TimingValues; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.List; + +/** + * A client to a config source, which could be an RPC config server or some other backing for + * getting config. + * + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.9 + */ +public abstract class ConfigSourceClient { + static ConfigSourceClient createClient(ConfigSource source, + ClientUpdater clientUpdater, + CacheManager cacheManager, + TimingValues timingValues, + ConfigProxyStatistics statistics, + DelayedResponses delayedResponses) { + if (source instanceof ConfigSourceSet) { + return new RpcConfigSourceClient((ConfigSourceSet) source, clientUpdater, cacheManager, timingValues, statistics, delayedResponses); + } else if (source instanceof MapBackedConfigSource) { + return (ConfigSourceClient) source; + } else { + throw new IllegalArgumentException("config source of type " + source.getClass().getName() + " is not allowed"); + } + } + + abstract RawConfig getConfig(RawConfig input, JRTServerConfigRequest request); + + abstract void cancel(); + + // TODO Should only be in rpc config source client + abstract void shutdownSourceConnections(); + + abstract String getActiveSourceConnection(); + + abstract List<String> getSourceConnections(); +}
\ No newline at end of file diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java new file mode 100644 index 00000000000..257b94038b6 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * Class that handles responses that are put on delayedResponses queue. Implements the <code>Delayed</code> + * interface to keep track of the order of responses (used when checking + * delayed responses and returning requests that are about to time out, in cases where no new + * config has been returned from upstream) + * + * @see com.yahoo.vespa.config.proxy.CheckDelayedResponses + */ +public class DelayedResponse implements Delayed { + private final JRTServerConfigRequest request; + private final long returnTime; + + public DelayedResponse(JRTServerConfigRequest request) { + this(request, System.currentTimeMillis() + request.getTimeout()); + } + + DelayedResponse(JRTServerConfigRequest request, long returnTime) { + this.request = request; + this.returnTime = returnTime; + } + + public Long getReturnTime() { + return returnTime; + } + + public JRTServerConfigRequest getRequest() { + return request; + } + + @Override + public long getDelay(TimeUnit unit) { + return returnTime - System.currentTimeMillis(); + } + + @Override + public int compareTo(Delayed delayed) { + if (this == delayed) { + return 0; + } + if (delayed instanceof com.yahoo.vespa.config.proxy.DelayedResponse) { + if (this.returnTime < ((com.yahoo.vespa.config.proxy.DelayedResponse) delayed).getReturnTime()) { + return -1; + } else { + return 1; + } + } else { + return 0; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(request.getShortDescription()).append(", delayLeft=").append(getDelay(TimeUnit.MILLISECONDS)).append(" ms"); + return sb.toString(); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java new file mode 100644 index 00000000000..f3d303c840c --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import java.util.concurrent.DelayQueue; + +/** + * Queue for requests that have no corresponding config in cache and which we are awaiting response from server for + * + * @author <a href="musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.7 + */ +class DelayedResponses { + + private final DelayQueue<DelayedResponse> delayedResponses = new DelayQueue<>(); + private final ConfigProxyStatistics statistics; + + DelayedResponses(ConfigProxyStatistics statistics) { + this.statistics = statistics; + } + + void add(DelayedResponse response) { + delayedResponses.add(response); + statistics.delayedResponses(delayedResponses.size()); + } + + boolean remove(DelayedResponse response) { + statistics.decDelayedResponses(); + return delayedResponses.remove(response); + } + + DelayQueue<DelayedResponse> responses() { + return delayedResponses; + } + + int size() { + return responses().size(); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java new file mode 100644 index 00000000000..aa214ec0410 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +/** + * A simple class to be able to test config proxy without having an RPC config + * source. + * + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.10 + */ +public class MapBackedConfigSource extends ConfigSourceClient implements ConfigSource { + private final HashMap<ConfigKey<?>, RawConfig> backing = new HashMap<>(); + private final ClientUpdater clientUpdater; + + MapBackedConfigSource(ClientUpdater clientUpdater) { + this.clientUpdater = clientUpdater; + } + + MapBackedConfigSource put(ConfigKey<?> key, RawConfig config) { + backing.put(key, config); + clientUpdater.updateSubscribers(config); + return this; + } + + @Override + RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) { + return getConfig(input.getKey()); + } + + RawConfig getConfig(ConfigKey<?> configKey) { + return backing.get(configKey); + } + + @Override + void cancel() { + clear(); + } + + @Override + void shutdownSourceConnections() { + } + + public void clear() { + backing.clear(); + } + + @Override + String getActiveSourceConnection() { + return "N/A"; + } + + @Override + List<String> getSourceConnections() { + return Collections.singletonList("N/A"); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java new file mode 100644 index 00000000000..d5531aa0ee4 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java @@ -0,0 +1,136 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.io.IOUtils; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.ConfigKey; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.CompressionType; +import com.yahoo.vespa.config.protocol.Payload; +import com.yahoo.vespa.defaults.Defaults; + +import java.io.File; +import java.io.IOException; +import java.io.Writer; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.9 + */ +public class MemoryCache { + private static final Logger log = Logger.getLogger(MemoryCache.class.getName()); + + // Separator in file names between different fields of config key + private final static String separator = ":"; + private static final String DEFAULT_DUMP_DIR = Defaults.getDefaults().vespaHome() + "var/vespa/cache/config"; + private final String dumpDir; + + private final ConcurrentHashMap<ConfigCacheKey, RawConfig> cache = new ConcurrentHashMap<>(500, 0.75f); + + public MemoryCache() { + dumpDir = DEFAULT_DUMP_DIR; + } + + public RawConfig get(ConfigCacheKey key) { + return cache.get(key); + } + + public RawConfig put(RawConfig config) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Putting '" + config + "' into memory cache"); + } + return cache.put(new ConfigCacheKey(config.getKey(), config.getDefMd5()), config); + } + + boolean containsKey(ConfigCacheKey key) { + return cache.containsKey(key); + } + + public Collection<RawConfig> values() { + return cache.values(); + } + + public int size() { + return cache.size(); + } + + public void clear() { + cache.clear(); + } + + @Override + public String toString() { + return cache.toString(); + } + + String dumpCacheToDisk(String path, MemoryCache cache) { + if (path == null || path.isEmpty()) { + path = dumpDir; + log.log(LogLevel.DEBUG, "dumpCache. No path or empty path. Using dir '" + path + "'"); + } + if (path.endsWith("/")) { + path = path.substring(0, path.length() - 1); + } + log.log(LogLevel.DEBUG, "Dumping cache to path '" + path + "'"); + IOUtils.createDirectory(path); + File dir = new File(path); + + if (!dir.isDirectory() || !dir.canWrite()) { + return "Not a dir or not able to write to '" + dir + "'"; + } + for (RawConfig config : cache.values()) { + put(config, path); + } + return "success"; + } + + void put(RawConfig config, String path) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Putting '" + config.getKey() + "' into disk cache"); + } + String filename = null; + Writer writer = null; + try { + filename = path + File.separator + createCacheFileName(config); + final Payload payload = config.getPayload(); + long protocolVersion = 3; + log.log(LogLevel.DEBUG, "Writing config '" + config + "' to file '" + filename + "' with protocol version " + protocolVersion); + writer = IOUtils.createWriter(filename, "UTF-8", false); + + // First three lines are meta-data about config as comment lines, fourth line is empty + writer.write("# defMd5:" + config.getDefMd5() + "\n"); + writer.write("# configMd5:" + config.getConfigMd5() + "\n"); + writer.write("# generation:" + Long.toString(config.getGeneration()) + "\n"); + writer.write("# protocolVersion:" + Long.toString(protocolVersion) + "\n"); + writer.write("\n"); + writer.write(payload.withCompression(CompressionType.UNCOMPRESSED).toString()); + writer.write("\n"); + writer.close(); + } catch (IOException e) { + log.log(LogLevel.WARNING, "Could not write to file '" + filename + "'"); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private static String createCacheFileName(RawConfig config) { + return createCacheFileName(new ConfigCacheKey(config.getKey(), config.getDefMd5())); + } + + static String createCacheFileName(ConfigCacheKey key) { + final ConfigKey<?> configKey = key.getKey(); + return configKey.getNamespace() + "." + configKey.getName() + separator + configKey.getConfigId().replaceAll("/", "_") + + separator + key.getDefMd5(); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java new file mode 100644 index 00000000000..af3182dd919 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java @@ -0,0 +1,62 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.Collections; +import java.util.List; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.10 + */ +public class MemoryCacheConfigClient extends ConfigSourceClient { + + private final static Logger log = Logger.getLogger(MemoryCacheConfigClient.class.getName()); + private final MemoryCache cache; + + MemoryCacheConfigClient(MemoryCache cache) { + this.cache = cache; + } + + @Override + /** + * Retrieves the requested config from the cache. Used when in 'memorycache' mode</p> + * + * @param request The config to retrieve - can be empty (no payload), or have a valid payload. + * @return A Config with a payload. + */ + RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) { + log.log(LogLevel.DEBUG, "Getting config from cache"); + ConfigKey<?> key = input.getKey(); + RawConfig cached = cache.get(new ConfigCacheKey(key, input.getDefMd5())); + if (cached != null) { + log.log(LogLevel.DEBUG, "Found config " + key + " in cache"); + return cached; + } else { + return null; + } + } + + @Override + void cancel() { + } + + @Override + void shutdownSourceConnections() { + } + + @Override + String getActiveSourceConnection() { + return "N/A"; + } + + @Override + List<String> getSourceConnections() { + return Collections.singletonList("N/A"); + } + +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java new file mode 100644 index 00000000000..ccb92c491f0 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java @@ -0,0 +1,80 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import java.util.HashSet; +import java.util.Set; + +/** + * + * Different modes the config proxy can be running in. + * + * 'default' mode is requesting config from server, serving from cache only when known config + * and no new config having been sent from server. When in 'memorycache' mode, there is no connection + * to another config source, the proxy only serves from (memory) cache. + * + * @author musum + */ +class Mode { + private final ModeName mode; + + enum ModeName { + DEFAULT, MEMORYCACHE + } + + public Mode() { + this(ModeName.DEFAULT.name()); + } + + public Mode(String modeString) { + switch (modeString.toLowerCase()) { + case "" : + mode = ModeName.DEFAULT; + break; + case "default" : + mode = ModeName.DEFAULT; + break; + case "memorycache" : + mode = ModeName.MEMORYCACHE; + break; + default: + throw new IllegalArgumentException("Unrecognized mode'" + modeString + "' supplied"); + } + } + + public ModeName getMode() { + return mode; + } + + public boolean isDefault() { + return mode.equals(ModeName.DEFAULT); + } + + public boolean isMemoryCache() { + return mode.equals(ModeName.MEMORYCACHE); + } + + public boolean requiresConfigSource() { + return mode.equals(ModeName.DEFAULT); + } + + public static boolean validModeName(String modeString) { + return (modeString != null) && modes().contains(modeString); + } + + static Set<String> modes() { + Set<String> modes = new HashSet<>(); + for (ModeName mode : ModeName.values()) { + modes.add(mode.name().toLowerCase()); + } + return modes; + } + + public String name() { + return mode.name().toLowerCase(); + } + + @Override + public String toString() { + return mode.name().toLowerCase(); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java new file mode 100644 index 00000000000..ab053f31ac9 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -0,0 +1,285 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Spec; + +import com.yahoo.log.LogLevel; +import com.yahoo.log.LogSetup; +import com.yahoo.log.event.Event; +import com.yahoo.system.CatchSigTerm; +import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * A proxy server that handles RPC config requests. The proxy can run in three modes: + * 'default' and 'memorycache', where the last one will not get config from an upstream + * config source, but will serve config only from memory cache. + * + * @author <a href="musum@yahoo-inc.com">Harald Musum</a> + */ +public class ProxyServer implements Runnable { + + private static final int DEFAULT_RPC_PORT = 19090; + static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070"; + + final static Logger log = Logger.getLogger(ProxyServer.class.getName()); + private final AtomicBoolean signalCaught = new AtomicBoolean(false); + + // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); + private final ClientUpdater clientUpdater; + private ScheduledFuture<?> checkerHandle; + + private final ConfigProxyRpcServer rpcServer; + final DelayedResponses delayedResponses; + private ConfigSource configSource; + + private volatile ConfigSourceClient configSourceClient; + + private final ConfigProxyStatistics statistics; + private final TimingValues timingValues; + private final CacheManager cacheManager; + private static final double timingvaluesRatio = 0.8; + private final static TimingValues defaultTimingValues; + private final boolean delayedResponseHandling; + + private volatile Mode mode; + + + static { + // Proxy should time out before clients upon subscription. + TimingValues tv = new TimingValues(); + tv.setUnconfiguredDelay((long)(tv.getUnconfiguredDelay()* timingvaluesRatio)). + setConfiguredErrorDelay((long)(tv.getConfiguredErrorDelay()* timingvaluesRatio)). + setSubscribeTimeout((long)(tv.getSubscribeTimeout()* timingvaluesRatio)). + setConfiguredErrorTimeout(-1); // Never cache errors + defaultTimingValues = tv; + } + + private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSource source, + ConfigProxyStatistics statistics, TimingValues timingValues, Mode mode, boolean delayedResponseHandling, + CacheManager cacheManager) { + this.delayedResponses = delayedResponses; + this.configSource = source; + log.log(LogLevel.DEBUG, "Using config sources: " + source); + this.statistics = statistics; + this.timingValues = timingValues; + this.mode = mode; + this.delayedResponseHandling = delayedResponseHandling; + this.cacheManager = cacheManager; + if (spec == null) { + rpcServer = null; + } else { + rpcServer = new ConfigProxyRpcServer(this, spec); + } + clientUpdater = new ClientUpdater(cacheManager, rpcServer, statistics, delayedResponses, mode); + this.configSourceClient = ConfigSourceClient.createClient(source, clientUpdater, cacheManager, timingValues, statistics, delayedResponses); + } + + static ProxyServer create(int port, DelayedResponses delayedResponses, ConfigSource source, + ConfigProxyStatistics statistics, Mode mode) { + return new ProxyServer(new Spec(null, port), delayedResponses, source, statistics, defaultTimingValues(), mode, true, new CacheManager(new MemoryCache())); + } + + static ProxyServer createTestServer(ConfigSource source) { + final Mode mode = new Mode(Mode.ModeName.DEFAULT.name()); + return ProxyServer.createTestServer(source, false, mode, CacheManager.createTestCacheManager()); + } + + static ProxyServer createTestServer(ConfigSource source, boolean delayedResponseHandling, Mode mode, CacheManager cacheManager) { + final ConfigProxyStatistics statistics = new ConfigProxyStatistics(); + return new ProxyServer(null, new DelayedResponses(statistics), source, statistics, defaultTimingValues(), mode, delayedResponseHandling, cacheManager); + } + + public void run() { + if (rpcServer != null) { + Thread t = new Thread(rpcServer); + t.setName("RpcServer"); + t.start(); + } + if (delayedResponseHandling) { + // Wait for 5 seconds initially, then run every second + checkerHandle = scheduler.scheduleAtFixedRate(new CheckDelayedResponses(delayedResponses, cacheManager.getMemoryCache(), rpcServer), 5, 1, SECONDS); + } else { + log.log(LogLevel.INFO, "Running without delayed response handling"); + } + } + + public RawConfig resolveConfig(JRTServerConfigRequest req) { + statistics.incProcessedRequests(); + // Calling getConfig() will either return with an answer immediately or + // create a background thread that retrieves config from the server and + // calls updateSubscribers when new config is returned from the config source. + // In the last case the method below will return null. + RawConfig config = configSourceClient.getConfig(RawConfig.createFromServerRequest(req), req); + if (configOrGenerationHasChanged(config, req)) { + cacheManager.putInCache(config); + } + return config; + } + + static boolean configOrGenerationHasChanged(RawConfig config, JRTServerConfigRequest request) { + return (config != null && (!config.hasEqualConfig(request) || config.hasNewerGeneration(request))); + } + + Mode getMode() { + return mode; + } + + void setMode(String modeName) { + if (modeName.equals(this.mode.name())) return; + + log.log(LogLevel.INFO, "Switching from " + this.mode + " mode to " + modeName.toLowerCase() + " mode"); + this.mode = new Mode(modeName); + if (mode.isMemoryCache()) { + configSourceClient.shutdownSourceConnections(); + configSourceClient = new MemoryCacheConfigClient(cacheManager.getMemoryCache()); + } else if (mode.isDefault()) { + flush(); + configSourceClient = createRpcClient(); + } else { + throw new IllegalArgumentException("Not able to handle mode '" + modeName + "'"); + } + } + + private RpcConfigSourceClient createRpcClient() { + return new RpcConfigSourceClient((ConfigSourceSet) configSource, clientUpdater, cacheManager, timingValues, statistics, delayedResponses); + } + + private void setupSigTermHandler() { + CatchSigTerm.setup(signalCaught); // catch termination signal + } + + private void waitForShutdown() { + synchronized (signalCaught) { + while (!signalCaught.get()) { + try { + signalCaught.wait(); + } catch (InterruptedException e) { + // empty + } + } + } + stop(); + System.exit(0); + } + + public static void main(String[] args) { + /* Initialize the log handler */ + LogSetup.clearHandlers(); + LogSetup.initVespaLogging("configproxy"); + + Properties properties = getSystemProperties(); + + int port = DEFAULT_RPC_PORT; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } + Event.started("configproxy"); + ConfigProxyStatistics statistics = new ConfigProxyStatistics(properties.eventInterval); + Thread t = new Thread(statistics); + t.setName("Metrics generator"); + t.setDaemon(true); + t.start(); + + Mode startupMode = new Mode(properties.mode); + if (!startupMode.isDefault()) log.log(LogLevel.INFO, "Starting config proxy in '" + startupMode + "' mode"); + + if (startupMode.isMemoryCache()) { + log.log(LogLevel.ERROR, "Starting config proxy in '" + startupMode + "' mode is not allowed"); + System.exit(1); + } + + ConfigSourceSet configSources = new ConfigSourceSet(); + if (startupMode.requiresConfigSource()) { + configSources = new ConfigSourceSet(properties.configSources); + } + DelayedResponses delayedResponses = new DelayedResponses(statistics); + ProxyServer proxyServer = ProxyServer.create(port, delayedResponses, configSources, statistics, startupMode); + // catch termination signal + proxyServer.setupSigTermHandler(); + Thread proxyserverThread = new Thread(proxyServer); + proxyserverThread.setName("configproxy"); + proxyserverThread.start(); + proxyServer.waitForShutdown(); + } + + static Properties getSystemProperties() { + // Read system properties + long eventInterval = Long.getLong("eventinterval", ConfigProxyStatistics.defaultEventInterval); + String mode = System.getProperty("mode", Mode.ModeName.DEFAULT.name()); + final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(","); + return new Properties(eventInterval, mode, inputConfigSources); + } + + static class Properties { + final long eventInterval; + final String mode; + final String[] configSources; + + Properties(long eventInterval, String mode, String[] configSources) { + this.eventInterval = eventInterval; + this.mode = mode; + this.configSources = configSources; + } + } + + static TimingValues defaultTimingValues() { + return defaultTimingValues; + } + + TimingValues getTimingValues() { + return timingValues; + } + + ConfigProxyStatistics getStatistics() { + return statistics; + } + + // Cancels all config instances and flushes the cache. When this method returns, + // the cache will not be updated again before someone calls getConfig(). + synchronized void flush() { + cacheManager.getMemoryCache().clear(); + configSourceClient.cancel(); + } + + public void stop() { + Event.stopping("configproxy", "shutdown"); + if (rpcServer != null) rpcServer.shutdown(); + if (checkerHandle != null) checkerHandle.cancel(true); + flush(); + if (statistics != null) { + statistics.stop(); + } + } + + public CacheManager getCacheManager() { + return cacheManager; + } + + String getActiveSourceConnection() { + return configSourceClient.getActiveSourceConnection(); + } + + List<String> getSourceConnections() { + return configSourceClient.getSourceConnections(); + } + + public void updateSourceConnections(List<String> sources) { + configSource = new ConfigSourceSet(sources); + flush(); + configSourceClient = createRpcClient(); + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java new file mode 100644 index 00000000000..100de46901e --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java @@ -0,0 +1,197 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.impl.JRTConfigRequester; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.Transport; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.config.*; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +/** + * An Rpc client to a config source + * + * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a> + * @since 5.1.9 + */ +public class RpcConfigSourceClient extends ConfigSourceClient { + + private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName()); + private final Supervisor supervisor = new Supervisor(new Transport()); + + private final ConfigSourceSet configSourceSet; + private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>(); + private final Object activeSubscribersLock = new Object(); + private final MemoryCache memoryCache; + private final ClientUpdater clientUpdater; + private final DelayedResponses delayedResponses; + private final TimingValues timingValues; + + private ExecutorService exec; + private Map<ConfigSourceSet, JRTConfigRequester> requesterPool; + + + RpcConfigSourceClient(ConfigSourceSet configSourceSet, ClientUpdater clientUpdater, + CacheManager cacheManager, + TimingValues timingValues, + ConfigProxyStatistics statistics, + DelayedResponses delayedResponses) { + this.configSourceSet = configSourceSet; + this.clientUpdater = clientUpdater; + this.memoryCache = cacheManager.getMemoryCache(); + this.delayedResponses = delayedResponses; + this.timingValues = timingValues; + checkConfigSources(); + exec = Executors.newCachedThreadPool(new DaemonThreadFactory()); + requesterPool = createRequesterPool(configSourceSet, timingValues); + } + + /** + * Creates a requester (connection) pool of one entry, to be used each time this {@link RpcConfigSourceClient} is used + * @param ccs a {@link ConfigSourceSet} + * @param timingValues a {@link TimingValues} + * @return requester map + */ + private Map<ConfigSourceSet, JRTConfigRequester> createRequesterPool(ConfigSourceSet ccs, TimingValues timingValues) { + Map<ConfigSourceSet, JRTConfigRequester> ret = new HashMap<>(); + if (ccs.getSources().isEmpty()) return ret; // unit test, just skip creating any requester + ret.put(ccs, JRTConfigRequester.get(new JRTConnectionPool(ccs), timingValues)); + return ret; + } + + /** + * Checks if config sources are available + */ + private boolean checkConfigSources() { + if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) { + log.log(LogLevel.WARNING, "No config sources defined, could not check connection"); + return false; + } else { + Request req = new Request("ping"); + for (String configSource : configSourceSet.getSources()) { + Spec spec = new Spec(configSource); + Target target = supervisor.connect(spec); + target.invokeSync(req, 10.0); + if (target.isValid()) { + log.log(LogLevel.DEBUG, "Created connection to config source at " + spec.toString()); + return true; + } else { + log.log(LogLevel.WARNING, "Could not connect to config source at " + spec.toString()); + } + target.close(); + } + String extra = ""; + log.log(LogLevel.ERROR, "Could not connect to any config source in set " + configSourceSet.toString() + + ", please make sure config server(s) are running. " + extra); + } + return false; + } + + /** + * Retrieves the requested config from the cache or the remote server. + * <p> + * If the requested config is different from the one in cache, the cached request is returned immediately. + * If they are equal, this method returns null. + * <p> + * If the config was not in cache, this method starts a <em>Subscriber</em> in a separate thread + * that gets the config and calls updateSubscribers(). + * + * @param input The config to retrieve - can be empty (no payload), or have a valid payload. + * @return A Config with a payload. + */ + @Override + RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) { + long start = System.currentTimeMillis(); + RawConfig ret = null; + final ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5()); + RawConfig cachedConfig = memoryCache.get(configCacheKey); + boolean needToGetConfig = true; + + if (cachedConfig != null) { + log.log(LogLevel.DEBUG, "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() + + ",configmd5=" + cachedConfig.getConfigMd5()); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "input config=" + input + ",cached config=" + cachedConfig); + } + // equals() also takes generation into account + if (!cachedConfig.equals(input)) { + log.log(LogLevel.SPAM, "Cached config is not equal to requested, will return it"); + ret = cachedConfig; + } + if (!cachedConfig.isError()) { + needToGetConfig = false; + } + } + if (!ProxyServer.configOrGenerationHasChanged(ret, request)) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Delaying response " + request.getShortDescription() + " (" + + (System.currentTimeMillis() - start) + " ms)"); + } + delayedResponses.add(new DelayedResponse(request)); + } + if (needToGetConfig) { + synchronized (activeSubscribersLock) { + if (activeSubscribers.containsKey(configCacheKey)) { + log.log(LogLevel.DEBUG, "Already a subscriber running for: " + configCacheKey); + } else { + log.log(LogLevel.DEBUG, "Could not find good config in cache, creating subscriber for: " + configCacheKey); + Subscriber subscriber = new UpstreamConfigSubscriber(input, clientUpdater, configSourceSet, timingValues, requesterPool, activeSubscribers); + activeSubscribers.put(configCacheKey, subscriber); + exec.execute(subscriber); + } + } + } + return ret; + } + + @Override + void cancel() { + shutdownSourceConnections(); + } + + /** + * Takes down connection(s) to config sources and running tasks + */ + @Override + void shutdownSourceConnections() { + synchronized (activeSubscribersLock) { + for (Subscriber subscriber : activeSubscribers.values()) { + subscriber.cancel(); + } + activeSubscribers.clear(); + } + exec.shutdown(); + for (JRTConfigRequester requester : requesterPool.values()) { + requester.close(); + } + } + + @Override + String getActiveSourceConnection() { + if (requesterPool.get(configSourceSet) != null) { + return requesterPool.get(configSourceSet).getConnectionPool().getCurrent().getAddress(); + } else { + return ""; + } + } + + @Override + List<String> getSourceConnections() { + ArrayList<String> ret = new ArrayList<>(); + final JRTConfigRequester jrtConfigRequester = requesterPool.get(configSourceSet); + if (jrtConfigRequester != null) { + ret.addAll(configSourceSet.getSources()); + } + return ret; + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java new file mode 100644 index 00000000000..c4c31b315dc --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; + +/** + * @author musum + */ +interface RpcServer { + + void returnOkResponse(JRTServerConfigRequest request, RawConfig config); + + void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message); +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java new file mode 100644 index 00000000000..49e2bb86a15 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java @@ -0,0 +1,13 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +/** + * Interface for subscribing to config from upstream config sources. + * + * @author musum + * @since 5.5 + */ +public interface Subscriber extends Runnable { + + void cancel(); +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java new file mode 100644 index 00000000000..849eb3f7910 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java @@ -0,0 +1,108 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy; + +import com.yahoo.config.ConfigurationRuntimeException; +import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.config.subscription.impl.GenericConfigHandle; +import com.yahoo.config.subscription.impl.GenericConfigSubscriber; +import com.yahoo.config.subscription.impl.JRTConfigRequester; +import com.yahoo.log.LogLevel; +import com.yahoo.yolean.Exceptions; +import com.yahoo.vespa.config.ConfigCacheKey; +import com.yahoo.vespa.config.RawConfig; +import com.yahoo.vespa.config.TimingValues; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +/** + * @author musum + * @since 5.5 + */ +public class UpstreamConfigSubscriber implements Subscriber { + private final static Logger log = Logger.getLogger(UpstreamConfigSubscriber.class.getName()); + + private final RawConfig config; + private final ClientUpdater clientUpdater; + private final ConfigSource configSourceSet; + private final TimingValues timingValues; + private Map<ConfigSourceSet, JRTConfigRequester> requesterPool; + private final Map<ConfigCacheKey, Subscriber> activeSubscribers; + private GenericConfigSubscriber subscriber; + + public UpstreamConfigSubscriber(RawConfig config, + ClientUpdater clientUpdater, + ConfigSource configSourceSet, + TimingValues timingValues, + Map<ConfigSourceSet, JRTConfigRequester> requesterPool, + Map<ConfigCacheKey, Subscriber> activeSubscribers) { + this.config = config; + this.clientUpdater = clientUpdater; + this.configSourceSet = configSourceSet; + this.timingValues = timingValues; + this.requesterPool = requesterPool; + this.activeSubscribers = activeSubscribers; + } + + UpstreamConfigSubscriber(RawConfig config, + ClientUpdater clientUpdater, + ConfigSource configSourceSet, + TimingValues timingValues, + Map<ConfigSourceSet, JRTConfigRequester> requesterPool) { + this(config, clientUpdater, configSourceSet, timingValues, requesterPool, new HashMap<>()); + } + + @Override + public void run() { + GenericConfigHandle handle; + subscriber = new GenericConfigSubscriber(requesterPool); + try { + handle = subscriber.subscribe(config.getKey(), config.getDefContent(), configSourceSet, timingValues); + } catch (ConfigurationRuntimeException e) { + log.log(LogLevel.INFO, "Subscribe for '" + config + "' failed, closing subscriber"); + final ConfigCacheKey key = new ConfigCacheKey(config.getKey(), config.getDefMd5()); + synchronized (activeSubscribers) { + final Subscriber activeSubscriber = activeSubscribers.get(key); + if (activeSubscriber != null) { + activeSubscriber.cancel(); + activeSubscribers.remove(key); + } + } + return; + } + + do { + try { + if (subscriber.nextGeneration()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "nextGeneration returned for " + config.getKey() + ", subscriber generation=" + subscriber.getGeneration()); + } + updateWithNewConfig(handle); + } + } catch (Exception e) { // To avoid thread throwing exception and loop never running this again + log.log(LogLevel.WARNING, "Got exception: " + Exceptions.toMessageString(e)); + } catch (Throwable e) { + com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e)); + } + } while (!subscriber.isClosed()); + } + + private void updateWithNewConfig(GenericConfigHandle handle) { + final RawConfig newConfig = handle.getRawConfig(); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "config to be returned for '" + newConfig.getKey() + + "', generation=" + newConfig.getGeneration() + + ", payload=" + newConfig.getPayload()); + } + clientUpdater.updateSubscribers(newConfig); + } + + @Override + public void cancel() { + if (subscriber != null) { + subscriber.close(); + } + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java new file mode 100644 index 00000000000..9ccbf4f7749 --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + Provides the classes for the Vespa config proxy. + + <p>The {@link com.yahoo.vespa.config.proxy.ProxyServer ProxyServer} + contains the main method that instantiates a new ProxyServer that + listens for incoming config requests from clients at a given + port. The ProxyServer handles communication with the remote server, + and keeps a cache of {@link com.yahoo.vespa.config.RawConfig RawConfig} + objects. + </p> + + <p>This package is very slim, due to extensive reuse of low-level code from + the config client library. + </p> + + <h3>Why Vespa needs a config proxy</h3> + + <p>It is possible for a client to subscribe + to config from the config server directly. However, if all Vespa + services applied this philosophy, it would cause a tremendous load + on the server that would need to handle a very large number of + requests from individual clients. Even more importantly, it would + inflict a huge load on the network if all config requests were + sent to the remote server. + </p> + + <p> + Typically, one config proxy is running on each host in a Vespa + installation, so each subscriber on that host sends requests to + the proxy and never to the remote server. The proxy is responsible + for keeping that config up-to-date on behalf of all clients that + subscribe to it. This means that multiple subscribers to the same + config (and using the same config id) maps to only one request + from the proxy to the external server. + </p> + + <p>Another advantage with a local config proxy on each node is + that subscribers become less vulnerable to network + failures. Should the network or the remote server go down for a + short period, only the proxy will notice, hence removing overhead from + the subscribers. + </p> +*/ + +@com.yahoo.api.annotations.PackageMarker +package com.yahoo.vespa.config.proxy; |