aboutsummaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2021-12-10 10:17:55 +0100
committerHarald Musum <musum@yahooinc.com>2021-12-10 10:17:55 +0100
commitf149bd11e4289b5a994d7d1d9c11d6c7995ecbf3 (patch)
treecb19b746fb874ad936c3262d7028c65d9f2c0424 /config-proxy
parentcfd3c3a096eae48543fce849c9d5fc43dc73823a (diff)
Decouple rpc server and rpc client by using a ResponseHandler
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java36
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java25
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java10
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java63
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java10
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java10
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java23
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java5
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java8
9 files changed, 106 insertions, 84 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index 207a6385b5a..3f03725fb60 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -30,10 +30,10 @@ import java.util.logging.Logger;
*
* @author hmusum
*/
-public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer {
+public class ConfigProxyRpcServer implements Runnable, TargetWatcher {
private final static Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
- private static final int TRACELEVEL = 6;
+ static final int TRACELEVEL = 6;
private final Spec spec;
private final Supervisor supervisor;
@@ -245,12 +245,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
* @param request a Request
*/
private void getConfigImpl(JRTServerConfigRequest request) {
+ ResponseHandler responseHandler = new ResponseHandler();
request.getRequestTrace().trace(TRACELEVEL, "Config proxy getConfig()");
log.log(Level.FINE, () ->"getConfig: " + request.getShortDescription() + ",config checksums=" + request.getRequestConfigChecksums());
if (!request.validateParameters()) {
// Error code is set in verifyParameters if parameters are not OK.
log.log(Level.WARNING, "Parameters for request " + request + " did not validate: " + request.errorCode() + " : " + request.errorMessage());
- returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
+ responseHandler.returnErrorResponse(request, request.errorCode(), "Parameters for request " + request.getShortDescription() + " did not validate: " + request.errorMessage());
return;
}
try {
@@ -258,13 +259,13 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
if (config == null) {
log.log(Level.FINEST, () -> "No config received yet for " + request.getShortDescription() + ", not sending response");
} else if (ProxyServer.configOrGenerationHasChanged(config, request)) {
- returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.FINEST, () -> "No new config for " + request.getShortDescription() + ", not sending response");
}
} catch (Exception e) {
e.printStackTrace();
- returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
+ responseHandler.returnErrorResponse(request, com.yahoo.vespa.config.ErrorCode.INTERNAL_ERROR, e.getMessage());
}
}
@@ -324,29 +325,4 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
// requesting this config?
}
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
- request.addOkResponse(config.getPayload(),
- config.getGeneration(),
- config.applyOnRestart(),
- config.getPayloadChecksums());
- log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
- ",generation=" + config.getGeneration());
- log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
-
-
- // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
- // TODO Move logic so that all requests are returned in CheckDelayedResponse
- try {
- request.getRequest().returnRequest();
- } catch (IllegalStateException e) {
- log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
- }
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
- request.addErrorResponse(errorCode, message);
- request.getRequest().returnRequest();
- }
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
index f77bd4b9138..0e8ebe0d9c9 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/DelayedResponseHandler.java
@@ -6,10 +6,12 @@ import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.yolean.Exceptions;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.yahoo.protect.Process.logAndDie;
+
/**
* The run method of this class is executed periodically to return delayed responses
* (requests use long polling, so config proxy needs to return a response when they time out).
@@ -22,12 +24,13 @@ public class DelayedResponseHandler implements Runnable {
private final DelayedResponses delayedResponses;
private final MemoryCache memoryCache;
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
+ private final AtomicLong sentResponses = new AtomicLong();
- DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, RpcServer rpcServer) {
+ DelayedResponseHandler(DelayedResponses delayedResponses, MemoryCache memoryCache, ResponseHandler responseHandler) {
this.delayedResponses = delayedResponses;
this.memoryCache = memoryCache;
- this.rpcServer = rpcServer;
+ this.responseHandler = responseHandler;
}
@Override
@@ -41,25 +44,27 @@ public class DelayedResponseHandler implements Runnable {
log.log(Level.FINEST, () -> "Running DelayedResponseHandler. There are " + delayedResponses.size() +
" delayed responses. First one is " + delayedResponses.responses().peek());
DelayedResponse response;
- AtomicInteger i = new AtomicInteger(0);
while ((response = delayedResponses.responses().poll()) != null) {
JRTServerConfigRequest request = response.getRequest();
ConfigCacheKey cacheKey = new ConfigCacheKey(request.getConfigKey(), request.getRequestDefMd5());
RawConfig config = memoryCache.get(cacheKey);
if (config != null) {
- rpcServer.returnOkResponse(request, config);
- i.incrementAndGet();
+ responseHandler.returnOkResponse(request, config);
+ sentResponses.incrementAndGet();
} else {
log.log(Level.WARNING, "Timed out (timeout " + request.getTimeout() + ") getting config " +
request.getConfigKey() + ", will retry");
}
}
- log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + i.get() + " delayed responses sent in " +
- (System.currentTimeMillis() - start) + " ms");
+ log.log(Level.FINEST, () -> "Finished running DelayedResponseHandler. " + sentResponses.get() +
+ " delayed responses sent in " + (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) { // To avoid thread throwing exception and executor never running this again
log.log(Level.WARNING, "Got exception in DelayedResponseHandler: " + Exceptions.toMessageString(e));
} catch (Throwable e) {
- com.yahoo.protect.Process.logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
+ logAndDie("Got error in DelayedResponseHandler, exiting: " + Exceptions.toMessageString(e));
}
}
+
+ public long sentResponses() { return sentResponses.get(); }
+
}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
index 8492381786b..63e5bd69bc3 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java
@@ -56,7 +56,7 @@ public class ProxyServer implements Runnable {
supervisor = new Supervisor(new Transport("proxy-server", JRT_TRANSPORT_THREADS)).setDropEmptyBuffers(true);
log.log(Level.FINE, () -> "Using config source '" + source);
this.rpcServer = createRpcServer(spec);
- this.configClient = (configClient == null) ? createRpcClient(rpcServer, source) : configClient;
+ this.configClient = (configClient == null) ? createRpcClient(source) : configClient;
this.fileDistributionAndUrlDownload = new FileDistributionAndUrlDownload(supervisor, source);
}
@@ -98,7 +98,7 @@ public class ProxyServer implements Runnable {
break;
case DEFAULT:
flush();
- configClient = createRpcClient(rpcServer, configSource);
+ configClient = createRpcClient(configSource);
this.mode = new Mode(modeName);
break;
default:
@@ -111,8 +111,8 @@ public class ProxyServer implements Runnable {
return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this'
}
- private static RpcConfigSourceClient createRpcClient(RpcServer rpcServer, ConfigSourceSet source) {
- return new RpcConfigSourceClient(rpcServer, source);
+ private static RpcConfigSourceClient createRpcClient(ConfigSourceSet source) {
+ return new RpcConfigSourceClient(new ResponseHandler(), source);
}
private void setupSignalHandler() {
@@ -211,7 +211,7 @@ public class ProxyServer implements Runnable {
void updateSourceConnections(List<String> sources) {
configSource = new ConfigSourceSet(sources);
flush();
- configClient = createRpcClient(rpcServer, configSource);
+ configClient = createRpcClient(configSource);
}
DelayedResponses delayedResponses() {
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
new file mode 100644
index 00000000000..c9cfbdd3e16
--- /dev/null
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ResponseHandler.java
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.config.proxy;
+
+import com.yahoo.vespa.config.RawConfig;
+import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static com.yahoo.vespa.config.proxy.ConfigProxyRpcServer.TRACELEVEL;
+
+/**
+ * An RPC server that handles config and file distribution requests.
+ *
+ * @author hmusum
+ */
+public class ResponseHandler {
+
+ private final Optional<AtomicLong> sentResponses;
+
+ public ResponseHandler() {
+ this(false);
+ }
+
+ // For testing only
+ ResponseHandler(boolean trackResponses) {
+ sentResponses = trackResponses ? Optional.of(new AtomicLong()) : Optional.empty();
+ }
+
+ private final static Logger log = Logger.getLogger(ResponseHandler.class.getName());
+
+ public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnOkResponse()");
+ request.addOkResponse(config.getPayload(),
+ config.getGeneration(),
+ config.applyOnRestart(),
+ config.getPayloadChecksums());
+ log.log(Level.FINE, () -> "Return response: " + request.getShortDescription() + ",config checksums=" + config.getPayloadChecksums() +
+ ",generation=" + config.getGeneration());
+ log.log(Level.FINEST, () -> "Config payload in response for " + request.getShortDescription() + ":" + config.getPayload());
+
+
+ // TODO Catch exception for now, since the request might have been returned in CheckDelayedResponse
+ // TODO Move logic so that all requests are returned in CheckDelayedResponse
+ try {
+ request.getRequest().returnRequest();
+ } catch (IllegalStateException e) {
+ log.log(Level.FINE, () -> "Something bad happened when sending response for '" + request.getShortDescription() + "':" + e.getMessage());
+ }
+ sentResponses.ifPresent(AtomicLong::getAndIncrement);
+ }
+
+ public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
+ request.getRequestTrace().trace(TRACELEVEL, "Config proxy returnErrorResponse()");
+ request.addErrorResponse(errorCode, message);
+ request.getRequest().returnRequest();
+ }
+
+ public long sentResponses() { return sentResponses.map(AtomicLong::get).orElse(0L); }
+
+}
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
index ce60e2cea67..0b9d0241890 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClient.java
@@ -43,7 +43,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
private final Supervisor supervisor = new Supervisor(new Transport("config-source-client"));
- private final RpcServer rpcServer;
+ private final ResponseHandler responseHandler;
private final ConfigSourceSet configSourceSet;
private final Map<ConfigCacheKey, Subscriber> activeSubscribers = new ConcurrentHashMap<>();
private final MemoryCache memoryCache;
@@ -57,15 +57,15 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("delayed responses"));
private final ScheduledFuture<?> delayedResponsesFuture;
- RpcConfigSourceClient(RpcServer rpcServer, ConfigSourceSet configSourceSet) {
- this.rpcServer = rpcServer;
+ RpcConfigSourceClient(ResponseHandler responseHandler, ConfigSourceSet configSourceSet) {
+ this.responseHandler = responseHandler;
this.configSourceSet = configSourceSet;
this.memoryCache = new MemoryCache();
this.delayedResponses = new DelayedResponses();
checkConfigSources();
nextConfigFuture = nextConfigScheduler.scheduleAtFixedRate(this, 0, 10, MILLISECONDS);
this.requester = JRTConfigRequester.create(configSourceSet, timingValues);
- DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, rpcServer);
+ DelayedResponseHandler command = new DelayedResponseHandler(delayedResponses, memoryCache, responseHandler);
this.delayedResponsesFuture = delayedResponsesScheduler.scheduleAtFixedRate(command, 5, 1, SECONDS);
}
@@ -230,7 +230,7 @@ class RpcConfigSourceClient implements ConfigSourceClient, Runnable {
log.log(Level.FINE, () -> "Call returnOkResponse for " + key + "," + generation);
if (config.getPayload().getData().getByteLength() == 0)
log.log(Level.WARNING, () -> "Call returnOkResponse for " + key + "," + generation + " with empty config");
- rpcServer.returnOkResponse(request, config);
+ responseHandler.returnOkResponse(request, config);
} else {
log.log(Level.INFO, "Could not remove " + key + " from delayedResponses queue, already removed");
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
index c2a0282fd05..8a668b34fd0 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/DelayedResponseHandlerTest.java
@@ -6,8 +6,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
/**
* @author hmusum
@@ -29,16 +28,15 @@ public class DelayedResponseHandlerTest {
public void basic() {
ConfigTester tester = new ConfigTester();
DelayedResponses delayedResponses = new DelayedResponses();
- final MockRpcServer mockRpcServer = new MockRpcServer();
- final MemoryCache memoryCache = new MemoryCache();
+ MemoryCache memoryCache = new MemoryCache();
memoryCache.update(ConfigTester.fooConfig);
- final DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, mockRpcServer);
+ DelayedResponseHandler delayedResponseHandler = new DelayedResponseHandler(delayedResponses, memoryCache, new ResponseHandler());
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 0)));
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.fooConfig, 1200000))); // should not be returned yet
delayedResponses.add(new DelayedResponse(tester.createRequest(ProxyServerTest.errorConfig, 0))); // will not give a config when resolving
delayedResponseHandler.checkDelayedResponses();
- assertThat(mockRpcServer.responses, is(1L));
+ assertEquals(1, delayedResponseHandler.sentResponses());
}
}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
deleted file mode 100644
index 56fcca191de..00000000000
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/MockRpcServer.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.config.proxy;
-
-import com.yahoo.vespa.config.RawConfig;
-import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
-
-/**
- * @author hmusum
- */
-public class MockRpcServer implements RpcServer {
-
- volatile long responses = 0;
- volatile long errorResponses = 0;
-
- public void returnOkResponse(JRTServerConfigRequest request, RawConfig config) {
- responses++;
- }
-
- public void returnErrorResponse(JRTServerConfigRequest request, int errorCode, String message) {
- responses++;
- errorResponses++;
- }
-}
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
index 9c36190da34..172abff6237 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ProxyServerTest.java
@@ -2,7 +2,10 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.config.subscription.ConfigSourceSet;
-import com.yahoo.vespa.config.*;
+import com.yahoo.vespa.config.ConfigCacheKey;
+import com.yahoo.vespa.config.ConfigKey;
+import com.yahoo.vespa.config.ErrorCode;
+import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.Payload;
import org.junit.After;
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
index f452289c6d8..ada98f4b30e 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/RpcConfigSourceClientTest.java
@@ -17,7 +17,7 @@ import static org.junit.Assert.assertEquals;
*/
public class RpcConfigSourceClientTest {
- private MockRpcServer rpcServer;
+ private ResponseHandler responseHandler;
private RpcConfigSourceClient rpcConfigSourceClient;
@Rule
@@ -26,8 +26,8 @@ public class RpcConfigSourceClientTest {
@Before
public void setup() {
- rpcServer = new MockRpcServer();
- rpcConfigSourceClient = new RpcConfigSourceClient(rpcServer, new MockConfigSource());
+ responseHandler = new ResponseHandler(true);
+ rpcConfigSourceClient = new RpcConfigSourceClient(responseHandler, new MockConfigSource());
}
@Test
@@ -90,7 +90,7 @@ public class RpcConfigSourceClientTest {
}
private void assertSentResponses(int expected) {
- assertEquals(expected, rpcServer.responses);
+ assertEquals(expected, responseHandler.sentResponses());
}
private void simulateClientRequestingConfig(RawConfig config) {