summaryrefslogtreecommitdiffstats
path: root/config-proxy/src/main/java
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /config-proxy/src/main/java
Publish
Diffstat (limited to 'config-proxy/src/main/java')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java46
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java71
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java107
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java340
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java105
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java45
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java65
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java39
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java65
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java136
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java62
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java80
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java285
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java197
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java15
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java13
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java108
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java47
18 files changed, 1826 insertions, 0 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java
new file mode 100644
index 00000000000..993edd2962f
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CacheManager.java
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+
+/**
+ * Manages memory and disk caches.
+ *
+ * @author musum
+ */
+public class CacheManager {
+
+ private final MemoryCache memoryCache;
+
+ public CacheManager(MemoryCache memoryCache) {
+ this.memoryCache = memoryCache;
+ }
+
+ static CacheManager createTestCacheManager() {
+ return CacheManager.createTestCacheManager(new MemoryCache());
+ }
+
+ static CacheManager createTestCacheManager(MemoryCache memoryCache) {
+ return new CacheManager(memoryCache);
+ }
+
+ void putInCache(RawConfig config) {
+ putInMemoryCache(config);
+ }
+
+ private void putInMemoryCache(RawConfig config) {
+ if (!config.isError()) {
+ memoryCache.put(config);
+ }
+ }
+
+ public MemoryCache getMemoryCache() {
+ return memoryCache;
+ }
+
+ // Only for testing.
+ int getCacheSize() {
+ return getMemoryCache().size();
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java
new file mode 100644
index 00000000000..e8a2f5455a1
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/CheckDelayedResponses.java
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.RawConfig;
+
+import java.util.Date;
+import java.util.logging.Logger;
+
+/**
+ * The run method of this class is executed periodically to
+ * return requests that are about to time out.
+ *
+ * @author musum
+ */
+public class CheckDelayedResponses implements Runnable {
+
+ final static Logger log = Logger.getLogger(CheckDelayedResponses.class.getName());
+
+ private final DelayedResponses delayedResponses;
+ private final MemoryCache memoryCache;
+ private final RpcServer rpcServer;
+
+ public CheckDelayedResponses(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) {
+ this.delayedResponses = delayedResponses;
+ this.memoryCache = memoryCache;
+ this.rpcServer = rpcServer;
+ }
+
+ @Override
+ public void run() {
+ checkDelayedResponses();
+ }
+
+ void checkDelayedResponses() {
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Running CheckDelayedResponses. There are " + delayedResponses.size() + " delayed responses. First one is " + delayedResponses.responses().peek());
+ }
+ DelayedResponse response;
+ int i = 0;
+
+ while ((response = delayedResponses.responses().poll()) != null) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Returning with response that has return time " + new Date(response.getReturnTime()));
+ }
+ JRTServerConfigRequest request = response.getRequest();
+ ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getConfigKey().getMd5());
+ RawConfig config = memoryCache.get(cacheKey);
+ if (config != null) {
+ rpcServer.returnOkResponse(request, config);
+ i++;
+ } else {
+ log.log(LogLevel.WARNING, "No config found for " + request.getConfigKey() + " within timeout, will retry");
+ }
+ }
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Finished running CheckDelayedResponses. " + i + " delayed responses sent in " +
+ (System.currentTimeMillis() - start) + " ms");
+ }
+ } catch (Exception e) { // To avoid thread throwing exception and executor never running this again
+ log.log(LogLevel.WARNING, "Got exception in CheckDelayedResponses: " + Exceptions.toMessageString(e));
+ } catch (Throwable e) {
+ com.yahoo.protect.Process.logAndDie("Got error in CheckDelayedResponses, exiting: " + Exceptions.toMessageString(e));
+ }
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java
new file mode 100644
index 00000000000..133ecf4c94f
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ClientUpdater.java
@@ -0,0 +1,107 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.concurrent.DelayQueue;
+import java.util.logging.Logger;
+
+/**
+ * Updates clients subscribing to config when config changes or the
+ * timeout they have specified has elapsed.
+ *
+ * @author musum
+ */
+public class ClientUpdater {
+ final static Logger log = Logger.getLogger(ClientUpdater.class.getName());
+
+ private final CacheManager cacheManager;
+ private final ConfigProxyStatistics statistics;
+ private final RpcServer rpcServer;
+ private final DelayedResponses delayedResponses;
+ private final Mode mode;
+
+ public ClientUpdater(CacheManager cacheManager,
+ RpcServer rpcServer,
+ ConfigProxyStatistics statistics,
+ DelayedResponses delayedResponses,
+ Mode mode) {
+ this.cacheManager = cacheManager;
+ this.rpcServer = rpcServer;
+ this.statistics = statistics;
+ this.delayedResponses = delayedResponses;
+ this.mode = mode;
+ }
+
+ /**
+ * This method will be called when a response with changed config is received from upstream
+ * (content or generation has changed) or the server timeout has elapsed.
+ * Updates the cache with the returned config.
+ *
+ * @param config new config
+ */
+ void updateSubscribers(RawConfig config) {
+ // ignore updates if we are in one of these modes (we will then only serve from cache).
+ if (!mode.requiresConfigSource()) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Not updating " + config.getKey() + "," + config.getGeneration() +
+ ", since we are in '" + mode + "' mode");
+ }
+ return;
+ }
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Config updated for " + config.getKey() + "," + config.getGeneration());
+ }
+ cacheManager.putInCache(config);
+ sendResponse(config);
+ }
+
+ void sendResponse(RawConfig config) {
+ if (config.isError()) { statistics.incErrorCount(); }
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Sending response for " + config.getKey() + "," + config.getGeneration());
+ }
+ DelayQueue<DelayedResponse> responseDelayQueue = delayedResponses.responses();
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Delayed response queue: " + responseDelayQueue);
+ }
+ if (responseDelayQueue.size() == 0) {
+ log.log(LogLevel.DEBUG, "There exists no matching element on delayed response queue for " + config.getKey());
+ return;
+ } else {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Delayed response queue has " + responseDelayQueue.size() + " elements");
+ }
+ }
+ DelayedResponse[] responses = new DelayedResponse[1];
+ responses = responseDelayQueue.toArray(responses);
+ boolean found = false;
+ if (responses.length > 0) {
+ for (DelayedResponse response : responses) {
+ JRTServerConfigRequest request = response.getRequest();
+ if (request.getConfigKey().equals(config.getKey())) {
+ if (!delayedResponses.remove(response)) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Could not remove " + config.getKey() + " from delayed delayedResponses queue, already removed");
+ }
+ continue;
+ }
+ found = true;
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Call returnOkResponse for " + config.getKey() + "," + config.getGeneration());
+ }
+ rpcServer.returnOkResponse(request, config);
+ }
+ }
+
+ }
+ if (!found) {
+ log.log(LogLevel.DEBUG, "Found no recipient for " + config.getKey() + " in delayed response queue");
+ }
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Finished updating config for " + config.getKey() + "," + config.getGeneration());
+ }
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
new file mode 100644
index 00000000000..b1cf0c01165
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -0,0 +1,340 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.jrt.*;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.ErrorCode;
+import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
+
+import java.lang.*;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.logging.Logger;
+
+
+/**
+ * A proxy server that handles RPC config requests.
+ *
+ * @author <a href="musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1
+ */
+public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
+
+ private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
+ static final int TRACELEVEL = 6;
+
+ private final Spec spec;
+ private final Supervisor supervisor = new Supervisor(new Transport());
+ private final ProxyServer proxyServer;
+
+ public ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) {
+ this.proxyServer = proxyServer;
+ this.spec = spec;
+ setUp();
+ }
+
+ public void run() {
+ try {
+ Acceptor acceptor = supervisor.listen(spec);
+ log.log(LogLevel.DEBUG, "Ready for requests on " + spec);
+ supervisor.transport().join();
+ acceptor.shutdown().join();
+ } catch (ListenFailedException e) {
+ proxyServer.stop();
+ throw new RuntimeException("Could not listen on " + spec, e);
+ }
+ }
+
+ void shutdown() {
+ supervisor.transport().shutdown();
+ }
+
+ public Spec getSpec() {
+ return spec;
+ }
+
+ void setUp() {
+ // The getConfig method in this class will handle RPC calls for getting config
+ supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this, "getConfigV3"));
+ supervisor.addMethod(new Method("ping", "", "i",
+ this, "ping")
+ .methodDesc("ping")
+ .returnDesc(0, "ret code", "return code, 0 is OK"));
+ supervisor.addMethod(new Method("printStatistics", "", "s",
+ this, "printStatistics")
+ .methodDesc("printStatistics")
+ .returnDesc(0, "statistics", "Statistics for server"));
+ supervisor.addMethod(new Method("listCachedConfig", "", "S",
+ this, "listCachedConfig")
+ .methodDesc("list cached configs)")
+ .returnDesc(0, "data", "string array of configs"));
+ supervisor.addMethod(new Method("listCachedConfigFull", "", "S",
+ this, "listCachedConfigFull")
+ .methodDesc("list cached configs with cache content)")
+ .returnDesc(0, "data", "string array of configs"));
+ supervisor.addMethod(new Method("listSourceConnections", "", "S",
+ this, "listSourceConnections")
+ .methodDesc("list config source connections)")
+ .returnDesc(0, "data", "string array of source connections"));
+ supervisor.addMethod(new Method("invalidateCache", "", "S",
+ this, "invalidateCache")
+ .methodDesc("list config source connections)")
+ .returnDesc(0, "data", "0 if success, 1 otherwise"));
+ supervisor.addMethod(new Method("updateSources", "s", "s",
+ this, "updateSources")
+ .methodDesc("update list of config sources")
+ .returnDesc(0, "ret", "list of updated config sources"));
+ supervisor.addMethod(new Method("setMode", "s", "S",
+ this, "setMode")
+ .methodDesc("Set config proxy mode { default | memorycache }")
+ .returnDesc(0, "ret", "0 if success, 1 otherwise as first element, description as second element"));
+ supervisor.addMethod(new Method("getMode", "", "s",
+ this, "getMode")
+ .methodDesc("What serving mode the config proxy is in (default, memorycache)")
+ .returnDesc(0, "ret", "mode as a string"));
+ supervisor.addMethod(new Method("dumpCache", "s", "s",
+ this, "dumpCache")
+ .methodDesc("Dump cache to disk")
+ .paramDesc(0, "path", "path to write cache contents to")
+ .returnDesc(0, "ret", "Empty string or error message"));
+ }
+
+ /**
+ * Handles RPC method "config.v3.getConfig" requests.
+ *
+ * @param req a Request
+ */
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void getConfigV3(Request req) {
+ log.log(LogLevel.DEBUG, "getConfigV2");
+ JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req);
+ if (isProtocolVersionSupported(request)) {
+ preHandle(req);
+ getConfigImpl(request);
+ }
+ }
+
+ private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
+ Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
+ if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
+ return true;
+ } else {
+ String message = "Illegal protocol version " + request.getProtocolVersion() +
+ " in request " + request.getShortDescription() + ", only protocol versions " + supportedProtocolVersions + " are supported";
+ log.log(LogLevel.ERROR, message);
+ request.addErrorResponse(ErrorCode.ILLEGAL_PROTOCOL_VERSION, message);
+ }
+ return false;
+ }
+
+ private void preHandle(Request req) {
+ proxyServer.getStatistics().incRpcRequests();
+ req.detach();
+ req.target().addWatcher(this);
+ }
+
+ /**
+ * Handles all versions of "getConfig" requests.
+ *
+ * @param request a Request
+ */
+ private void getConfigImpl(JRTServerConfigRequest request) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "getConfig: " + request.getShortDescription() + ",configmd5=" + request.getRequestConfigMd5());
+ }
+ if (!request.validateParameters()) {
+ // Error code is set in verifyParameters if parameters are not OK.
+ log.log(LogLevel.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
+ returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ return;
+ }
+ try {
+ RawConfig config = proxyServer.resolveConfig(request);
+ if (ProxyServer.configOrGenerationHasChanged(config, request)) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Should send response for " + request.getShortDescription() + ",config=" + config);
+ }
+ returnOkResponse(request, config);
+ } else {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Should not send response for " + request.getShortDescription() + ", config=" + config);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+
+ /**
+ * Returns 0 if server is alive.
+ *
+ * @param req a Request
+ */
+ public final void ping(Request req) {
+ req.returnValues().add(new Int32Value(0));
+ }
+
+ /**
+ * Returns a String with statistics data for the server.
+ *
+ * @param req a Request
+ */
+ public final void printStatistics(Request req) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nDelayed responses queue size: ");
+ sb.append(proxyServer.delayedResponses.size());
+ sb.append("\nContents: ");
+ for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) {
+ sb.append(delayed.getRequest().toString()).append("\n");
+ }
+
+ req.returnValues().add(new StringValue(sb.toString()));
+ }
+
+ public final void listCachedConfig(Request req) {
+ listCachedConfig(req, false);
+ }
+
+ public final void listCachedConfigFull(Request req) {
+ listCachedConfig(req, true);
+ }
+
+ public final void listSourceConnections(Request req) {
+ String[] ret = new String[2];
+ ret[0] = "Current source: " + proxyServer.getActiveSourceConnection();
+ ret[1] = "All sources:\n" + printSourceConnections();
+ req.returnValues().add(new StringArray(ret));
+ }
+
+ private String printSourceConnections() {
+ StringBuilder sb = new StringBuilder();
+ for (String s : proxyServer.getSourceConnections()) {
+ sb.append(s).append("\n");
+ }
+ return sb.toString();
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void updateSources(Request req) {
+ String sources = req.parameters().get(0).asString();
+ String ret;
+ System.out.println(proxyServer.getMode());
+ if (proxyServer.getMode().requiresConfigSource()) {
+ proxyServer.updateSourceConnections(Arrays.asList(sources.split(",")));
+ ret = "Updated config sources to: " + sources;
+ } else {
+ ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode";
+ }
+ req.returnValues().add(new StringValue(ret));
+ }
+
+ public final void invalidateCache(Request req) {
+ proxyServer.getCacheManager().getMemoryCache().clear();
+ String[] s = new String[2];
+ s[0] = "0";
+ s[1] = "success";
+ req.returnValues().add(new StringArray(s));
+ }
+
+ public final void setMode(Request req) {
+ String suppliedMode = req.parameters().get(0).asString();
+ log.log(LogLevel.DEBUG, "Supplied mode=" + suppliedMode);
+ String[] s = new String[2];
+ if (Mode.validModeName(suppliedMode.toLowerCase())) {
+ proxyServer.setMode(suppliedMode);
+ s[0] = "0";
+ s[1] = "success";
+ } else {
+ s[0] = "1";
+ s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'";
+ }
+
+ req.returnValues().add(new StringArray(s));
+ }
+
+ public final void getMode(Request req) {
+ req.returnValues().add(new StringValue(proxyServer.getMode().name()));
+ }
+
+ @SuppressWarnings({"UnusedDeclaration"})
+ public final void dumpCache(Request req) {
+ final CacheManager cacheManager = proxyServer.getCacheManager();
+ req.returnValues().add(new StringValue(cacheManager.getMemoryCache().dumpCacheToDisk(req.parameters().get(0).asString(), cacheManager.getMemoryCache())));
+ }
+
+ final void listCachedConfig(Request req, boolean full) {
+ String[] ret;
+ MemoryCache cache = proxyServer.getCacheManager().getMemoryCache();
+ ret = new String[cache.size()];
+ int i = 0;
+ for (RawConfig config : cache.values()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(config.getNamespace());
+ sb.append(".");
+ sb.append(config.getName());
+ sb.append(",");
+ sb.append(config.getConfigId());
+ sb.append(",");
+ sb.append(config.getGeneration());
+ sb.append(",");
+ sb.append(config.getConfigMd5());
+ if (full) {
+ sb.append(",");
+ sb.append(config.getPayload());
+ }
+ ret[i] = sb.toString();
+ i++;
+ }
+ Arrays.sort(ret);
+ req.returnValues().add(new StringArray(ret));
+ }
+
+ /**
+ * Removes invalid targets (closed client connections) from delayedResponsesQueue.
+ *
+ * @param target a Target that has become invalid (i.e, client has closed connection)
+ */
+ @Override
+ public void notifyTargetInvalid(Target target) {
+ log.log(LogLevel.DEBUG, "Target invalid " + target);
+ for (Iterator<DelayedResponse> it = proxyServer.delayedResponses.responses().iterator(); it.hasNext(); ) {
+ DelayedResponse delayed = it.next();
+ JRTServerConfigRequest request = delayed.getRequest();
+ if (request.getRequest().target().equals(target)) {
+ log.log(LogLevel.DEBUG, "Removing " + request.getShortDescription());
+ it.remove();
+ }
+ }
+ // TODO: Could we also cancel active getConfig requests upstream if the client was the only one
+ // requesting this config?
+ }
+
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
+ request.addOkResponse(config.getPayload(), config.getGeneration(), config.getConfigMd5());
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Return response: " + request.getShortDescription() + ",configMd5=" + config.getConfigMd5() +
+ ",config=" + config.getPayload());
+ }
+
+ // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
+ // TODO Move logic so that all requests are returned in CheckDelayedResponse
+ try {
+ request.getRequest().returnRequest();
+ } catch (IllegalStateException e) {
+ log.log(LogLevel.DEBUG, "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
+ }
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
+ request.addErrorResponse(errorCode, message);
+ request.getRequest().returnRequest();
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java
new file mode 100644
index 00000000000..8e0543d7914
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyStatistics.java
@@ -0,0 +1,105 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.event.Event;
+
+/**
+ * Statistics/metrics for config proxy.
+ * //TODO Use metrics framework
+ *
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.7
+ */
+class ConfigProxyStatistics implements Runnable {
+ static final long defaultEventInterval = 5 * 60; // in seconds
+
+ private final long eventInterval; // in seconds
+ private boolean stopped;
+ private long lastRun = System.currentTimeMillis();
+
+ /* Number of RPC getConfig requests */
+ private long rpcRequests = 0;
+ private long processedRequests = 0;
+ private long errors = 0;
+ private long delayedResponses = 0;
+
+ ConfigProxyStatistics() {
+ this(defaultEventInterval);
+ }
+
+ ConfigProxyStatistics(long eventInterval) {
+ this.eventInterval = eventInterval;
+ }
+
+ // Send events every eventInterval seconds
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ ProxyServer.log.log(LogLevel.WARNING, e.getMessage());
+ }
+ if (stopped) {
+ return;
+ }
+ ProxyServer.log.log(LogLevel.SPAM, "Running ConfigProxyStatistics");
+ // Only send events every eventInterval seconds
+ if ((System.currentTimeMillis() - lastRun) > eventInterval * 1000) {
+ lastRun = System.currentTimeMillis();
+ sendEvents();
+ }
+ }
+ }
+
+ private void sendEvents() {
+ Event.count("rpc_requests", rpcRequests());
+ Event.count("processed_messages", processedRequests());
+ Event.count("errors", errors());
+ Event.value("delayed_responses", delayedResponses());
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public Long getEventInterval() {
+ return eventInterval;
+ }
+
+ void incRpcRequests() {
+ rpcRequests++;
+ }
+
+ void incProcessedRequests() {
+ processedRequests++;
+ }
+
+ void incErrorCount() {
+ errors++;
+ }
+
+ long processedRequests() {
+ return processedRequests;
+ }
+
+ long rpcRequests() {
+ return rpcRequests;
+ }
+
+ long errors() {
+ return errors;
+ }
+
+ long delayedResponses() {
+ return delayedResponses;
+ }
+
+ void delayedResponses(long count) {
+ delayedResponses = count;
+ }
+
+ void decDelayedResponses() {
+ delayedResponses--;
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
new file mode 100644
index 00000000000..8ef63cff5c3
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigSourceClient.java
@@ -0,0 +1,45 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.config.subscription.ConfigSource;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.TimingValues;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.List;
+
+/**
+ * A client to a config source, which could be an RPC config server or some other backing for
+ * getting config.
+ *
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.9
+ */
+public abstract class ConfigSourceClient {
+ static ConfigSourceClient createClient(ConfigSource source,
+ ClientUpdater clientUpdater,
+ CacheManager cacheManager,
+ TimingValues timingValues,
+ ConfigProxyStatistics statistics,
+ DelayedResponses delayedResponses) {
+ if (source instanceof ConfigSourceSet) {
+ return new RpcConfigSourceClient((ConfigSourceSet) source, clientUpdater, cacheManager, timingValues, statistics, delayedResponses);
+ } else if (source instanceof MapBackedConfigSource) {
+ return (ConfigSourceClient) source;
+ } else {
+ throw new IllegalArgumentException("config source of type " + source.getClass().getName() + " is not allowed");
+ }
+ }
+
+ abstract RawConfig getConfig(RawConfig input, JRTServerConfigRequest request);
+
+ abstract void cancel();
+
+ // TODO Should only be in rpc config source client
+ abstract void shutdownSourceConnections();
+
+ abstract String getActiveSourceConnection();
+
+ abstract List<String> getSourceConnections();
+} \ No newline at end of file
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java
new file mode 100644
index 00000000000..257b94038b6
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponse.java
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class that handles responses that are put on delayedResponses queue. Implements the <code>Delayed</code>
+ * interface to keep track of the order of responses (used when checking
+ * delayed responses and returning requests that are about to time out, in cases where no new
+ * config has been returned from upstream)
+ *
+ * @see com.yahoo.vespa.config.proxy.CheckDelayedResponses
+ */
+public class DelayedResponse implements Delayed {
+ private final JRTServerConfigRequest request;
+ private final long returnTime;
+
+ public DelayedResponse(JRTServerConfigRequest request) {
+ this(request, System.currentTimeMillis() + request.getTimeout());
+ }
+
+ DelayedResponse(JRTServerConfigRequest request, long returnTime) {
+ this.request = request;
+ this.returnTime = returnTime;
+ }
+
+ public Long getReturnTime() {
+ return returnTime;
+ }
+
+ public JRTServerConfigRequest getRequest() {
+ return request;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return returnTime - System.currentTimeMillis();
+ }
+
+ @Override
+ public int compareTo(Delayed delayed) {
+ if (this == delayed) {
+ return 0;
+ }
+ if (delayed instanceof com.yahoo.vespa.config.proxy.DelayedResponse) {
+ if (this.returnTime < ((com.yahoo.vespa.config.proxy.DelayedResponse) delayed).getReturnTime()) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(request.getShortDescription()).append(", delayLeft=").append(getDelay(TimeUnit.MILLISECONDS)).append(" ms");
+ return sb.toString();
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java
new file mode 100644
index 00000000000..f3d303c840c
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponses.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import java.util.concurrent.DelayQueue;
+
+/**
+ * Queue for requests that have no corresponding config in cache and which we are awaiting response from server for
+ *
+ * @author <a href="musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.7
+ */
+class DelayedResponses {
+
+ private final DelayQueue<DelayedResponse> delayedResponses = new DelayQueue<>();
+ private final ConfigProxyStatistics statistics;
+
+ DelayedResponses(ConfigProxyStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ void add(DelayedResponse response) {
+ delayedResponses.add(response);
+ statistics.delayedResponses(delayedResponses.size());
+ }
+
+ boolean remove(DelayedResponse response) {
+ statistics.decDelayedResponses();
+ return delayedResponses.remove(response);
+ }
+
+ DelayQueue<DelayedResponse> responses() {
+ return delayedResponses;
+ }
+
+ int size() {
+ return responses().size();
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java
new file mode 100644
index 00000000000..aa214ec0410
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MapBackedConfigSource.java
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.config.subscription.ConfigSource;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A simple class to be able to test config proxy without having an RPC config
+ * source.
+ *
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.10
+ */
+public class MapBackedConfigSource extends ConfigSourceClient implements ConfigSource {
+ private final HashMap<ConfigKey<?>, RawConfig> backing = new HashMap<>();
+ private final ClientUpdater clientUpdater;
+
+ MapBackedConfigSource(ClientUpdater clientUpdater) {
+ this.clientUpdater = clientUpdater;
+ }
+
+ MapBackedConfigSource put(ConfigKey<?> key, RawConfig config) {
+ backing.put(key, config);
+ clientUpdater.updateSubscribers(config);
+ return this;
+ }
+
+ @Override
+ RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) {
+ return getConfig(input.getKey());
+ }
+
+ RawConfig getConfig(ConfigKey<?> configKey) {
+ return backing.get(configKey);
+ }
+
+ @Override
+ void cancel() {
+ clear();
+ }
+
+ @Override
+ void shutdownSourceConnections() {
+ }
+
+ public void clear() {
+ backing.clear();
+ }
+
+ @Override
+ String getActiveSourceConnection() {
+ return "N/A";
+ }
+
+ @Override
+ List<String> getSourceConnections() {
+ return Collections.singletonList("N/A");
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java
new file mode 100644
index 00000000000..d5531aa0ee4
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCache.java
@@ -0,0 +1,136 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.io.IOUtils;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.CompressionType;
+import com.yahoo.vespa.config.protocol.Payload;
+import com.yahoo.vespa.defaults.Defaults;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.9
+ */
+public class MemoryCache {
+ private static final Logger log = Logger.getLogger(MemoryCache.class.getName());
+
+ // Separator in file names between different fields of config key
+ private final static String separator = ":";
+ private static final String DEFAULT_DUMP_DIR = Defaults.getDefaults().vespaHome() + "var/vespa/cache/config";
+ private final String dumpDir;
+
+ private final ConcurrentHashMap<ConfigCacheKey, RawConfig> cache = new ConcurrentHashMap<>(500, 0.75f);
+
+ public MemoryCache() {
+ dumpDir = DEFAULT_DUMP_DIR;
+ }
+
+ public RawConfig get(ConfigCacheKey key) {
+ return cache.get(key);
+ }
+
+ public RawConfig put(RawConfig config) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Putting '" + config + "' into memory cache");
+ }
+ return cache.put(new ConfigCacheKey(config.getKey(), config.getDefMd5()), config);
+ }
+
+ boolean containsKey(ConfigCacheKey key) {
+ return cache.containsKey(key);
+ }
+
+ public Collection<RawConfig> values() {
+ return cache.values();
+ }
+
+ public int size() {
+ return cache.size();
+ }
+
+ public void clear() {
+ cache.clear();
+ }
+
+ @Override
+ public String toString() {
+ return cache.toString();
+ }
+
+ String dumpCacheToDisk(String path, MemoryCache cache) {
+ if (path == null || path.isEmpty()) {
+ path = dumpDir;
+ log.log(LogLevel.DEBUG, "dumpCache. No path or empty path. Using dir '" + path + "'");
+ }
+ if (path.endsWith("/")) {
+ path = path.substring(0, path.length() - 1);
+ }
+ log.log(LogLevel.DEBUG, "Dumping cache to path '" + path + "'");
+ IOUtils.createDirectory(path);
+ File dir = new File(path);
+
+ if (!dir.isDirectory() || !dir.canWrite()) {
+ return "Not a dir or not able to write to '" + dir + "'";
+ }
+ for (RawConfig config : cache.values()) {
+ put(config, path);
+ }
+ return "success";
+ }
+
+ void put(RawConfig config, String path) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Putting '" + config.getKey() + "' into disk cache");
+ }
+ String filename = null;
+ Writer writer = null;
+ try {
+ filename = path + File.separator + createCacheFileName(config);
+ final Payload payload = config.getPayload();
+ long protocolVersion = 3;
+ log.log(LogLevel.DEBUG, "Writing config '" + config + "' to file '" + filename + "' with protocol version " + protocolVersion);
+ writer = IOUtils.createWriter(filename, "UTF-8", false);
+
+ // First three lines are meta-data about config as comment lines, fourth line is empty
+ writer.write("# defMd5:" + config.getDefMd5() + "\n");
+ writer.write("# configMd5:" + config.getConfigMd5() + "\n");
+ writer.write("# generation:" + Long.toString(config.getGeneration()) + "\n");
+ writer.write("# protocolVersion:" + Long.toString(protocolVersion) + "\n");
+ writer.write("\n");
+ writer.write(payload.withCompression(CompressionType.UNCOMPRESSED).toString());
+ writer.write("\n");
+ writer.close();
+ } catch (IOException e) {
+ log.log(LogLevel.WARNING, "Could not write to file '" + filename + "'");
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private static String createCacheFileName(RawConfig config) {
+ return createCacheFileName(new ConfigCacheKey(config.getKey(), config.getDefMd5()));
+ }
+
+ static String createCacheFileName(ConfigCacheKey key) {
+ final ConfigKey<?> configKey = key.getKey();
+ return configKey.getNamespace() + "." + configKey.getName() + separator + configKey.getConfigId().replaceAll("/", "_") +
+ separator + key.getDefMd5();
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
new file mode 100644
index 00000000000..af3182dd919
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/MemoryCacheConfigClient.java
@@ -0,0 +1,62 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.10
+ */
+public class MemoryCacheConfigClient extends ConfigSourceClient {
+
+ private final static Logger log = Logger.getLogger(MemoryCacheConfigClient.class.getName());
+ private final MemoryCache cache;
+
+ MemoryCacheConfigClient(MemoryCache cache) {
+ this.cache = cache;
+ }
+
+ @Override
+ /**
+ * Retrieves the requested config from the cache. Used when in 'memorycache' mode</p>
+ *
+ * @param request The config to retrieve - can be empty (no payload), or have a valid payload.
+ * @return A Config with a payload.
+ */
+ RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) {
+ log.log(LogLevel.DEBUG, "Getting config from cache");
+ ConfigKey<?> key = input.getKey();
+ RawConfig cached = cache.get(new ConfigCacheKey(key, input.getDefMd5()));
+ if (cached != null) {
+ log.log(LogLevel.DEBUG, "Found config " + key + " in cache");
+ return cached;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ void cancel() {
+ }
+
+ @Override
+ void shutdownSourceConnections() {
+ }
+
+ @Override
+ String getActiveSourceConnection() {
+ return "N/A";
+ }
+
+ @Override
+ List<String> getSourceConnections() {
+ return Collections.singletonList("N/A");
+ }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java
new file mode 100644
index 00000000000..ccb92c491f0
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Mode.java
@@ -0,0 +1,80 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ * Different modes the config proxy can be running in.
+ *
+ * 'default' mode is requesting config from server, serving from cache only when known config
+ * and no new config having been sent from server. When in 'memorycache' mode, there is no connection
+ * to another config source, the proxy only serves from (memory) cache.
+ *
+ * @author musum
+ */
+class Mode {
+ private final ModeName mode;
+
+ enum ModeName {
+ DEFAULT, MEMORYCACHE
+ }
+
+ public Mode() {
+ this(ModeName.DEFAULT.name());
+ }
+
+ public Mode(String modeString) {
+ switch (modeString.toLowerCase()) {
+ case "" :
+ mode = ModeName.DEFAULT;
+ break;
+ case "default" :
+ mode = ModeName.DEFAULT;
+ break;
+ case "memorycache" :
+ mode = ModeName.MEMORYCACHE;
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized mode'" + modeString + "' supplied");
+ }
+ }
+
+ public ModeName getMode() {
+ return mode;
+ }
+
+ public boolean isDefault() {
+ return mode.equals(ModeName.DEFAULT);
+ }
+
+ public boolean isMemoryCache() {
+ return mode.equals(ModeName.MEMORYCACHE);
+ }
+
+ public boolean requiresConfigSource() {
+ return mode.equals(ModeName.DEFAULT);
+ }
+
+ public static boolean validModeName(String modeString) {
+ return (modeString != null) && modes().contains(modeString);
+ }
+
+ static Set<String> modes() {
+ Set<String> modes = new HashSet<>();
+ for (ModeName mode : ModeName.values()) {
+ modes.add(mode.name().toLowerCase());
+ }
+ return modes;
+ }
+
+ public String name() {
+ return mode.name().toLowerCase();
+ }
+
+ @Override
+ public String toString() {
+ return mode.name().toLowerCase();
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
new file mode 100644
index 00000000000..ab053f31ac9
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -0,0 +1,285 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.subscription.ConfigSource;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.jrt.Spec;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogSetup;
+import com.yahoo.log.event.Event;
+import com.yahoo.system.CatchSigTerm;
+import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A proxy server that handles RPC config requests. The proxy can run in three modes:
+ * 'default' and 'memorycache', where the last one will not get config from an upstream
+ * config source, but will serve config only from memory cache.
+ *
+ * @author <a href="musum@yahoo-inc.com">Harald Musum</a>
+ */
+public class ProxyServer implements Runnable {
+
+ private static final int DEFAULT_RPC_PORT = 19090;
+ static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070";
+
+ final static Logger log = Logger.getLogger(ProxyServer.class.getName());
+ private final AtomicBoolean signalCaught = new AtomicBoolean(false);
+
+ // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
+ private final ClientUpdater clientUpdater;
+ private ScheduledFuture<?> checkerHandle;
+
+ private final ConfigProxyRpcServer rpcServer;
+ final DelayedResponses delayedResponses;
+ private ConfigSource configSource;
+
+ private volatile ConfigSourceClient configSourceClient;
+
+ private final ConfigProxyStatistics statistics;
+ private final TimingValues timingValues;
+ private final CacheManager cacheManager;
+ private static final double timingvaluesRatio = 0.8;
+ private final static TimingValues defaultTimingValues;
+ private final boolean delayedResponseHandling;
+
+ private volatile Mode mode;
+
+
+ static {
+ // Proxy should time out before clients upon subscription.
+ TimingValues tv = new TimingValues();
+ tv.setUnconfiguredDelay((long)(tv.getUnconfiguredDelay()* timingvaluesRatio)).
+ setConfiguredErrorDelay((long)(tv.getConfiguredErrorDelay()* timingvaluesRatio)).
+ setSubscribeTimeout((long)(tv.getSubscribeTimeout()* timingvaluesRatio)).
+ setConfiguredErrorTimeout(-1); // Never cache errors
+ defaultTimingValues = tv;
+ }
+
+ private ProxyServer(Spec spec, DelayedResponses delayedResponses, ConfigSource source,
+ ConfigProxyStatistics statistics, TimingValues timingValues, Mode mode, boolean delayedResponseHandling,
+ CacheManager cacheManager) {
+ this.delayedResponses = delayedResponses;
+ this.configSource = source;
+ log.log(LogLevel.DEBUG, "Using config sources: " + source);
+ this.statistics = statistics;
+ this.timingValues = timingValues;
+ this.mode = mode;
+ this.delayedResponseHandling = delayedResponseHandling;
+ this.cacheManager = cacheManager;
+ if (spec == null) {
+ rpcServer = null;
+ } else {
+ rpcServer = new ConfigProxyRpcServer(this, spec);
+ }
+ clientUpdater = new ClientUpdater(cacheManager, rpcServer, statistics, delayedResponses, mode);
+ this.configSourceClient = ConfigSourceClient.createClient(source, clientUpdater, cacheManager, timingValues, statistics, delayedResponses);
+ }
+
+ static ProxyServer create(int port, DelayedResponses delayedResponses, ConfigSource source,
+ ConfigProxyStatistics statistics, Mode mode) {
+ return new ProxyServer(new Spec(null, port), delayedResponses, source, statistics, defaultTimingValues(), mode, true, new CacheManager(new MemoryCache()));
+ }
+
+ static ProxyServer createTestServer(ConfigSource source) {
+ final Mode mode = new Mode(Mode.ModeName.DEFAULT.name());
+ return ProxyServer.createTestServer(source, false, mode, CacheManager.createTestCacheManager());
+ }
+
+ static ProxyServer createTestServer(ConfigSource source, boolean delayedResponseHandling, Mode mode, CacheManager cacheManager) {
+ final ConfigProxyStatistics statistics = new ConfigProxyStatistics();
+ return new ProxyServer(null, new DelayedResponses(statistics), source, statistics, defaultTimingValues(), mode, delayedResponseHandling, cacheManager);
+ }
+
+ public void run() {
+ if (rpcServer != null) {
+ Thread t = new Thread(rpcServer);
+ t.setName("RpcServer");
+ t.start();
+ }
+ if (delayedResponseHandling) {
+ // Wait for 5 seconds initially, then run every second
+ checkerHandle = scheduler.scheduleAtFixedRate(new CheckDelayedResponses(delayedResponses, cacheManager.getMemoryCache(), rpcServer), 5, 1, SECONDS);
+ } else {
+ log.log(LogLevel.INFO, "Running without delayed response handling");
+ }
+ }
+
+ public RawConfig resolveConfig(JRTServerConfigRequest req) {
+ statistics.incProcessedRequests();
+ // Calling getConfig() will either return with an answer immediately or
+ // create a background thread that retrieves config from the server and
+ // calls updateSubscribers when new config is returned from the config source.
+ // In the last case the method below will return null.
+ RawConfig config = configSourceClient.getConfig(RawConfig.createFromServerRequest(req), req);
+ if (configOrGenerationHasChanged(config, req)) {
+ cacheManager.putInCache(config);
+ }
+ return config;
+ }
+
+ static boolean configOrGenerationHasChanged(RawConfig config, JRTServerConfigRequest request) {
+ return (config != null && (!config.hasEqualConfig(request) || config.hasNewerGeneration(request)));
+ }
+
+ Mode getMode() {
+ return mode;
+ }
+
+ void setMode(String modeName) {
+ if (modeName.equals(this.mode.name())) return;
+
+ log.log(LogLevel.INFO, "Switching from " + this.mode + " mode to " + modeName.toLowerCase() + " mode");
+ this.mode = new Mode(modeName);
+ if (mode.isMemoryCache()) {
+ configSourceClient.shutdownSourceConnections();
+ configSourceClient = new MemoryCacheConfigClient(cacheManager.getMemoryCache());
+ } else if (mode.isDefault()) {
+ flush();
+ configSourceClient = createRpcClient();
+ } else {
+ throw new IllegalArgumentException("Not able to handle mode '" + modeName + "'");
+ }
+ }
+
+ private RpcConfigSourceClient createRpcClient() {
+ return new RpcConfigSourceClient((ConfigSourceSet) configSource, clientUpdater, cacheManager, timingValues, statistics, delayedResponses);
+ }
+
+ private void setupSigTermHandler() {
+ CatchSigTerm.setup(signalCaught); // catch termination signal
+ }
+
+ private void waitForShutdown() {
+ synchronized (signalCaught) {
+ while (!signalCaught.get()) {
+ try {
+ signalCaught.wait();
+ } catch (InterruptedException e) {
+ // empty
+ }
+ }
+ }
+ stop();
+ System.exit(0);
+ }
+
+ public static void main(String[] args) {
+ /* Initialize the log handler */
+ LogSetup.clearHandlers();
+ LogSetup.initVespaLogging("configproxy");
+
+ Properties properties = getSystemProperties();
+
+ int port = DEFAULT_RPC_PORT;
+ if (args.length > 0) {
+ port = Integer.parseInt(args[0]);
+ }
+ Event.started("configproxy");
+ ConfigProxyStatistics statistics = new ConfigProxyStatistics(properties.eventInterval);
+ Thread t = new Thread(statistics);
+ t.setName("Metrics generator");
+ t.setDaemon(true);
+ t.start();
+
+ Mode startupMode = new Mode(properties.mode);
+ if (!startupMode.isDefault()) log.log(LogLevel.INFO, "Starting config proxy in '" + startupMode + "' mode");
+
+ if (startupMode.isMemoryCache()) {
+ log.log(LogLevel.ERROR, "Starting config proxy in '" + startupMode + "' mode is not allowed");
+ System.exit(1);
+ }
+
+ ConfigSourceSet configSources = new ConfigSourceSet();
+ if (startupMode.requiresConfigSource()) {
+ configSources = new ConfigSourceSet(properties.configSources);
+ }
+ DelayedResponses delayedResponses = new DelayedResponses(statistics);
+ ProxyServer proxyServer = ProxyServer.create(port, delayedResponses, configSources, statistics, startupMode);
+ // catch termination signal
+ proxyServer.setupSigTermHandler();
+ Thread proxyserverThread = new Thread(proxyServer);
+ proxyserverThread.setName("configproxy");
+ proxyserverThread.start();
+ proxyServer.waitForShutdown();
+ }
+
+ static Properties getSystemProperties() {
+ // Read system properties
+ long eventInterval = Long.getLong("eventinterval", ConfigProxyStatistics.defaultEventInterval);
+ String mode = System.getProperty("mode", Mode.ModeName.DEFAULT.name());
+ final String[] inputConfigSources = System.getProperty("proxyconfigsources", DEFAULT_PROXY_CONFIG_SOURCES).split(",");
+ return new Properties(eventInterval, mode, inputConfigSources);
+ }
+
+ static class Properties {
+ final long eventInterval;
+ final String mode;
+ final String[] configSources;
+
+ Properties(long eventInterval, String mode, String[] configSources) {
+ this.eventInterval = eventInterval;
+ this.mode = mode;
+ this.configSources = configSources;
+ }
+ }
+
+ static TimingValues defaultTimingValues() {
+ return defaultTimingValues;
+ }
+
+ TimingValues getTimingValues() {
+ return timingValues;
+ }
+
+ ConfigProxyStatistics getStatistics() {
+ return statistics;
+ }
+
+ // Cancels all config instances and flushes the cache. When this method returns,
+ // the cache will not be updated again before someone calls getConfig().
+ synchronized void flush() {
+ cacheManager.getMemoryCache().clear();
+ configSourceClient.cancel();
+ }
+
+ public void stop() {
+ Event.stopping("configproxy", "shutdown");
+ if (rpcServer != null) rpcServer.shutdown();
+ if (checkerHandle != null) checkerHandle.cancel(true);
+ flush();
+ if (statistics != null) {
+ statistics.stop();
+ }
+ }
+
+ public CacheManager getCacheManager() {
+ return cacheManager;
+ }
+
+ String getActiveSourceConnection() {
+ return configSourceClient.getActiveSourceConnection();
+ }
+
+ List<String> getSourceConnections() {
+ return configSourceClient.getSourceConnections();
+ }
+
+ public void updateSourceConnections(List<String> sources) {
+ configSource = new ConfigSourceSet(sources);
+ flush();
+ configSourceClient = createRpcClient();
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
new file mode 100644
index 00000000000..100de46901e
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -0,0 +1,197 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.config.subscription.impl.JRTConfigRequester;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
+import com.yahoo.jrt.Transport;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+/**
+ * An Rpc client to a config source
+ *
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ * @since 5.1.9
+ */
+public class RpcConfigSourceClient extends ConfigSourceClient {
+
+ private final static Logger log = Logger.getLogger(RpcConfigSourceClient.class.getName());
+ private final Supervisor supervisor = new Supervisor(new Transport());
+
+ private final ConfigSourceSet configSourceSet;
+ private final HashMap<ConfigCacheKey, Subscriber> activeSubscribers = new HashMap<>();
+ private final Object activeSubscribersLock = new Object();
+ private final MemoryCache memoryCache;
+ private final ClientUpdater clientUpdater;
+ private final DelayedResponses delayedResponses;
+ private final TimingValues timingValues;
+
+ private ExecutorService exec;
+ private Map<ConfigSourceSet, JRTConfigRequester> requesterPool;
+
+
+ RpcConfigSourceClient(ConfigSourceSet configSourceSet, ClientUpdater clientUpdater,
+ CacheManager cacheManager,
+ TimingValues timingValues,
+ ConfigProxyStatistics statistics,
+ DelayedResponses delayedResponses) {
+ this.configSourceSet = configSourceSet;
+ this.clientUpdater = clientUpdater;
+ this.memoryCache = cacheManager.getMemoryCache();
+ this.delayedResponses = delayedResponses;
+ this.timingValues = timingValues;
+ checkConfigSources();
+ exec = Executors.newCachedThreadPool(new DaemonThreadFactory());
+ requesterPool = createRequesterPool(configSourceSet, timingValues);
+ }
+
+ /**
+ * Creates a requester (connection) pool of one entry, to be used each time this {@link RpcConfigSourceClient} is used
+ * @param ccs a {@link ConfigSourceSet}
+ * @param timingValues a {@link TimingValues}
+ * @return requester map
+ */
+ private Map<ConfigSourceSet, JRTConfigRequester> createRequesterPool(ConfigSourceSet ccs, TimingValues timingValues) {
+ Map<ConfigSourceSet, JRTConfigRequester> ret = new HashMap<>();
+ if (ccs.getSources().isEmpty()) return ret; // unit test, just skip creating any requester
+ ret.put(ccs, JRTConfigRequester.get(new JRTConnectionPool(ccs), timingValues));
+ return ret;
+ }
+
+ /**
+ * Checks if config sources are available
+ */
+ private boolean checkConfigSources() {
+ if (configSourceSet == null || configSourceSet.getSources() == null || configSourceSet.getSources().size() == 0) {
+ log.log(LogLevel.WARNING, "No config sources defined, could not check connection");
+ return false;
+ } else {
+ Request req = new Request("ping");
+ for (String configSource : configSourceSet.getSources()) {
+ Spec spec = new Spec(configSource);
+ Target target = supervisor.connect(spec);
+ target.invokeSync(req, 10.0);
+ if (target.isValid()) {
+ log.log(LogLevel.DEBUG, "Created connection to config source at " + spec.toString());
+ return true;
+ } else {
+ log.log(LogLevel.WARNING, "Could not connect to config source at " + spec.toString());
+ }
+ target.close();
+ }
+ String extra = "";
+ log.log(LogLevel.ERROR, "Could not connect to any config source in set " + configSourceSet.toString() +
+ ", please make sure config server(s) are running. " + extra);
+ }
+ return false;
+ }
+
+ /**
+ * Retrieves the requested config from the cache or the remote server.
+ * <p>
+ * If the requested config is different from the one in cache, the cached request is returned immediately.
+ * If they are equal, this method returns null.
+ * <p>
+ * If the config was not in cache, this method starts a <em>Subscriber</em> in a separate thread
+ * that gets the config and calls updateSubscribers().
+ *
+ * @param input The config to retrieve - can be empty (no payload), or have a valid payload.
+ * @return A Config with a payload.
+ */
+ @Override
+ RawConfig getConfig(RawConfig input, JRTServerConfigRequest request) {
+ long start = System.currentTimeMillis();
+ RawConfig ret = null;
+ final ConfigCacheKey configCacheKey = new ConfigCacheKey(input.getKey(), input.getDefMd5());
+ RawConfig cachedConfig = memoryCache.get(configCacheKey);
+ boolean needToGetConfig = true;
+
+ if (cachedConfig != null) {
+ log.log(LogLevel.DEBUG, "Found config " + configCacheKey + " in cache, generation=" + cachedConfig.getGeneration() +
+ ",configmd5=" + cachedConfig.getConfigMd5());
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "input config=" + input + ",cached config=" + cachedConfig);
+ }
+ // equals() also takes generation into account
+ if (!cachedConfig.equals(input)) {
+ log.log(LogLevel.SPAM, "Cached config is not equal to requested, will return it");
+ ret = cachedConfig;
+ }
+ if (!cachedConfig.isError()) {
+ needToGetConfig = false;
+ }
+ }
+ if (!ProxyServer.configOrGenerationHasChanged(ret, request)) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Delaying response " + request.getShortDescription() + " (" +
+ (System.currentTimeMillis() - start) + " ms)");
+ }
+ delayedResponses.add(new DelayedResponse(request));
+ }
+ if (needToGetConfig) {
+ synchronized (activeSubscribersLock) {
+ if (activeSubscribers.containsKey(configCacheKey)) {
+ log.log(LogLevel.DEBUG, "Already a subscriber running for: " + configCacheKey);
+ } else {
+ log.log(LogLevel.DEBUG, "Could not find good config in cache, creating subscriber for: " + configCacheKey);
+ Subscriber subscriber = new UpstreamConfigSubscriber(input, clientUpdater, configSourceSet, timingValues, requesterPool, activeSubscribers);
+ activeSubscribers.put(configCacheKey, subscriber);
+ exec.execute(subscriber);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ void cancel() {
+ shutdownSourceConnections();
+ }
+
+ /**
+ * Takes down connection(s) to config sources and running tasks
+ */
+ @Override
+ void shutdownSourceConnections() {
+ synchronized (activeSubscribersLock) {
+ for (Subscriber subscriber : activeSubscribers.values()) {
+ subscriber.cancel();
+ }
+ activeSubscribers.clear();
+ }
+ exec.shutdown();
+ for (JRTConfigRequester requester : requesterPool.values()) {
+ requester.close();
+ }
+ }
+
+ @Override
+ String getActiveSourceConnection() {
+ if (requesterPool.get(configSourceSet) != null) {
+ return requesterPool.get(configSourceSet).getConnectionPool().getCurrent().getAddress();
+ } else {
+ return "";
+ }
+ }
+
+ @Override
+ List<String> getSourceConnections() {
+ ArrayList<String> ret = new ArrayList<>();
+ final JRTConfigRequester jrtConfigRequester = requesterPool.get(configSourceSet);
+ if (jrtConfigRequester != null) {
+ ret.addAll(configSourceSet.getSources());
+ }
+ return ret;
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java
new file mode 100644
index 00000000000..c4c31b315dc
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcServer.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+/**
+ * @author musum
+ */
+interface RpcServer {
+
+ void returnOkResponse(JRTServerConfigRequest request, RawConfig config);
+
+ void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message);
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java
new file mode 100644
index 00000000000..49e2bb86a15
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/Subscriber.java
@@ -0,0 +1,13 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+/**
+ * Interface for subscribing to config from upstream config sources.
+ *
+ * @author musum
+ * @since 5.5
+ */
+public interface Subscriber extends Runnable {
+
+ void cancel();
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java
new file mode 100644
index 00000000000..849eb3f7910
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/UpstreamConfigSubscriber.java
@@ -0,0 +1,108 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.config.ConfigurationRuntimeException;
+import com.yahoo.config.subscription.ConfigSource;
+import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.config.subscription.impl.GenericConfigHandle;
+import com.yahoo.config.subscription.impl.GenericConfigSubscriber;
+import com.yahoo.config.subscription.impl.JRTConfigRequester;
+import com.yahoo.log.LogLevel;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.TimingValues;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * @author musum
+ * @since 5.5
+ */
+public class UpstreamConfigSubscriber implements Subscriber {
+ private final static Logger log = Logger.getLogger(UpstreamConfigSubscriber.class.getName());
+
+ private final RawConfig config;
+ private final ClientUpdater clientUpdater;
+ private final ConfigSource configSourceSet;
+ private final TimingValues timingValues;
+ private Map<ConfigSourceSet, JRTConfigRequester> requesterPool;
+ private final Map<ConfigCacheKey, Subscriber> activeSubscribers;
+ private GenericConfigSubscriber subscriber;
+
+ public UpstreamConfigSubscriber(RawConfig config,
+ ClientUpdater clientUpdater,
+ ConfigSource configSourceSet,
+ TimingValues timingValues,
+ Map<ConfigSourceSet, JRTConfigRequester> requesterPool,
+ Map<ConfigCacheKey, Subscriber> activeSubscribers) {
+ this.config = config;
+ this.clientUpdater = clientUpdater;
+ this.configSourceSet = configSourceSet;
+ this.timingValues = timingValues;
+ this.requesterPool = requesterPool;
+ this.activeSubscribers = activeSubscribers;
+ }
+
+ UpstreamConfigSubscriber(RawConfig config,
+ ClientUpdater clientUpdater,
+ ConfigSource configSourceSet,
+ TimingValues timingValues,
+ Map<ConfigSourceSet, JRTConfigRequester> requesterPool) {
+ this(config, clientUpdater, configSourceSet, timingValues, requesterPool, new HashMap<>());
+ }
+
+ @Override
+ public void run() {
+ GenericConfigHandle handle;
+ subscriber = new GenericConfigSubscriber(requesterPool);
+ try {
+ handle = subscriber.subscribe(config.getKey(), config.getDefContent(), configSourceSet, timingValues);
+ } catch (ConfigurationRuntimeException e) {
+ log.log(LogLevel.INFO, "Subscribe for '" + config + "' failed, closing subscriber");
+ final ConfigCacheKey key = new ConfigCacheKey(config.getKey(), config.getDefMd5());
+ synchronized (activeSubscribers) {
+ final Subscriber activeSubscriber = activeSubscribers.get(key);
+ if (activeSubscriber != null) {
+ activeSubscriber.cancel();
+ activeSubscribers.remove(key);
+ }
+ }
+ return;
+ }
+
+ do {
+ try {
+ if (subscriber.nextGeneration()) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "nextGeneration returned for " + config.getKey() + ", subscriber generation=" + subscriber.getGeneration());
+ }
+ updateWithNewConfig(handle);
+ }
+ } catch (Exception e) { // To avoid thread throwing exception and loop never running this again
+ log.log(LogLevel.WARNING, "Got exception: " + Exceptions.toMessageString(e));
+ } catch (Throwable e) {
+ com.yahoo.protect.Process.logAndDie("Got error, exiting: " + Exceptions.toMessageString(e));
+ }
+ } while (!subscriber.isClosed());
+ }
+
+ private void updateWithNewConfig(GenericConfigHandle handle) {
+ final RawConfig newConfig = handle.getRawConfig();
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "config to be returned for '" + newConfig.getKey() +
+ "', generation=" + newConfig.getGeneration() +
+ ", payload=" + newConfig.getPayload());
+ }
+ clientUpdater.updateSubscribers(newConfig);
+ }
+
+ @Override
+ public void cancel() {
+ if (subscriber != null) {
+ subscriber.close();
+ }
+ }
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java
new file mode 100644
index 00000000000..9ccbf4f7749
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/package-info.java
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ Provides the classes for the Vespa config proxy.
+
+ <p>The {@link com.yahoo.vespa.config.proxy.ProxyServer ProxyServer}
+ contains the main method that instantiates a new ProxyServer that
+ listens for incoming config requests from clients at a given
+ port. The ProxyServer handles communication with the remote server,
+ and keeps a cache of {@link com.yahoo.vespa.config.RawConfig RawConfig}
+ objects.
+ </p>
+
+ <p>This package is very slim, due to extensive reuse of low-level code from
+ the config client library.
+ </p>
+
+ <h3>Why Vespa needs a config proxy</h3>
+
+ <p>It is possible for a client to subscribe
+ to config from the config server directly. However, if all Vespa
+ services applied this philosophy, it would cause a tremendous load
+ on the server that would need to handle a very large number of
+ requests from individual clients. Even more importantly, it would
+ inflict a huge load on the network if all config requests were
+ sent to the remote server.
+ </p>
+
+ <p>
+ Typically, one config proxy is running on each host in a Vespa
+ installation, so each subscriber on that host sends requests to
+ the proxy and never to the remote server. The proxy is responsible
+ for keeping that config up-to-date on behalf of all clients that
+ subscribe to it. This means that multiple subscribers to the same
+ config (and using the same config id) maps to only one request
+ from the proxy to the external server.
+ </p>
+
+ <p>Another advantage with a local config proxy on each node is
+ that subscribers become less vulnerable to network
+ failures. Should the network or the remote server go down for a
+ short period, only the proxy will notice, hence removing overhead from
+ the subscribers.
+ </p>
+*/
+
+@com.yahoo.api.annotations.PackageMarker
+package com.yahoo.vespa.config.proxy;