summaryrefslogtreecommitdiffstats
path: root/config-proxy
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2019-07-03 17:40:40 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2019-07-03 17:40:40 +0200
commit163f84450c5744755d8d59b7dff855b50725c2ba (patch)
treef3ca64ca897d4847ece7bf798e1cd8a36d5a505d /config-proxy
parent9681c501d73a45936a663b94116b7099191a74e0 (diff)
Process all RPC requests in dedicated thread pool
- Rewrite unit tests to use a RPC client
Diffstat (limited to 'config-proxy')
-rw-r--r--config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java202
-rw-r--r--config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java69
2 files changed, 183 insertions, 88 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
index db3b787f9f9..5ffc7293742 100644
--- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
+++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java
@@ -1,10 +1,21 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.proxy;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringArray;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
+import com.yahoo.jrt.TargetWatcher;
import com.yahoo.log.LogLevel;
-import com.yahoo.vespa.config.*;
import com.yahoo.vespa.config.ErrorCode;
+import com.yahoo.vespa.config.JRTMethods;
+import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
@@ -12,6 +23,9 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -28,6 +42,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
private final Spec spec;
private final Supervisor supervisor;
private final ProxyServer proxyServer;
+ private final ExecutorService rpcExecutor = Executors.newFixedThreadPool(8);
ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) {
this.proxyServer = proxyServer;
@@ -50,6 +65,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
void shutdown() {
supervisor.transport().shutdown();
+ try {
+ rpcExecutor.shutdownNow();
+ rpcExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
Spec getSpec() {
@@ -109,12 +130,16 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
* @param req a Request
*/
private void getConfigV3(Request req) {
- log.log(LogLevel.SPAM, () -> "getConfigV3");
- JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req);
- if (isProtocolVersionSupported(request)) {
- preHandle(req);
- getConfigImpl(request);
- }
+ dispatchRpcRequest(req, () -> {
+ JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req);
+ if (isProtocolVersionSupported(request)) {
+ proxyServer.getStatistics().incRpcRequests();
+ req.target().addWatcher(this);
+ getConfigImpl(request);
+ return;
+ }
+ req.returnRequest();
+ });
}
/**
@@ -122,8 +147,11 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
*
* @param req a Request
*/
- void ping(Request req) {
- req.returnValues().add(new Int32Value(0));
+ private void ping(Request req) {
+ dispatchRpcRequest(req, () -> {
+ req.returnValues().add(new Int32Value(0));
+ req.returnRequest();
+ });
}
/**
@@ -131,81 +159,120 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
*
* @param req a Request
*/
- void printStatistics(Request req) {
- StringBuilder sb = new StringBuilder();
- sb.append("\nDelayed responses queue size: ");
- sb.append(proxyServer.delayedResponses.size());
- sb.append("\nContents: ");
- for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) {
- sb.append(delayed.getRequest().toString()).append("\n");
- }
-
- req.returnValues().add(new StringValue(sb.toString()));
- }
+ private void printStatistics(Request req) {
+ dispatchRpcRequest(req, () -> {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nDelayed responses queue size: ");
+ sb.append(proxyServer.delayedResponses.size());
+ sb.append("\nContents: ");
+ for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) {
+ sb.append(delayed.getRequest().toString()).append("\n");
+ }
- void listCachedConfig(Request req) {
- listCachedConfig(req, false);
+ req.returnValues().add(new StringValue(sb.toString()));
+ req.returnRequest();
+ });
}
- void listCachedConfigFull(Request req) {
- listCachedConfig(req, true);
+ private void listCachedConfig(Request req) {
+ dispatchRpcRequest(req, () -> listCachedConfig(req, false));
}
- void listSourceConnections(Request req) {
- String[] ret = new String[2];
- ret[0] = "Current source: " + proxyServer.getActiveSourceConnection();
- ret[1] = "All sources:\n" + printSourceConnections();
- req.returnValues().add(new StringArray(ret));
+ private void listCachedConfigFull(Request req) {
+ dispatchRpcRequest(req, () -> listCachedConfig(req, true));
}
- void updateSources(Request req) {
- String sources = req.parameters().get(0).asString();
- String ret;
- System.out.println(proxyServer.getMode());
- if (proxyServer.getMode().requiresConfigSource()) {
- proxyServer.updateSourceConnections(Arrays.asList(sources.split(",")));
- ret = "Updated config sources to: " + sources;
- } else {
- ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode";
- }
- req.returnValues().add(new StringValue(ret));
+ private void listSourceConnections(Request req) {
+ dispatchRpcRequest(req, () -> {
+ String[] ret = new String[2];
+ ret[0] = "Current source: " + proxyServer.getActiveSourceConnection();
+ ret[1] = "All sources:\n" + printSourceConnections();
+ req.returnValues().add(new StringArray(ret));
+ req.returnRequest();
+ });
}
- void invalidateCache(Request req) {
- proxyServer.getMemoryCache().clear();
- String[] s = new String[2];
- s[0] = "0";
- s[1] = "success";
- req.returnValues().add(new StringArray(s));
+ private void updateSources(Request req) {
+ dispatchRpcRequest(req, () -> {
+ String sources = req.parameters().get(0).asString();
+ String ret;
+ System.out.println(proxyServer.getMode());
+ if (proxyServer.getMode().requiresConfigSource()) {
+ proxyServer.updateSourceConnections(Arrays.asList(sources.split(",")));
+ ret = "Updated config sources to: " + sources;
+ } else {
+ ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode";
+ }
+ req.returnValues().add(new StringValue(ret));
+ req.returnRequest();
+ });
}
- void setMode(Request req) {
- String suppliedMode = req.parameters().get(0).asString();
- log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode);
- String[] s = new String[2];
- if (Mode.validModeName(suppliedMode.toLowerCase())) {
- proxyServer.setMode(suppliedMode);
+ private void invalidateCache(Request req) {
+ dispatchRpcRequest(req, () -> {
+ proxyServer.getMemoryCache().clear();
+ String[] s = new String[2];
s[0] = "0";
s[1] = "success";
- } else {
- s[0] = "1";
- s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'";
- }
+ req.returnValues().add(new StringArray(s));
+ req.returnRequest();
+ });
+ }
+
+ private void setMode(Request req) {
+ dispatchRpcRequest(req, () -> {
+ String suppliedMode = req.parameters().get(0).asString();
+ log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode);
+ String[] s = new String[2];
+ if (Mode.validModeName(suppliedMode.toLowerCase())) {
+ proxyServer.setMode(suppliedMode);
+ s[0] = "0";
+ s[1] = "success";
+ } else {
+ s[0] = "1";
+ s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'";
+ }
- req.returnValues().add(new StringArray(s));
+ req.returnValues().add(new StringArray(s));
+ req.returnRequest();
+ });
}
- void getMode(Request req) {
- req.returnValues().add(new StringValue(proxyServer.getMode().name()));
+ private void getMode(Request req) {
+ dispatchRpcRequest(req, () -> {
+ req.returnValues().add(new StringValue(proxyServer.getMode().name()));
+ req.returnRequest();
+ });
}
- void dumpCache(Request req) {
- final MemoryCache memoryCache = proxyServer.getMemoryCache();
- req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
+ private void dumpCache(Request req) {
+ dispatchRpcRequest(req, () -> {
+ final MemoryCache memoryCache = proxyServer.getMemoryCache();
+ req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
+ req.returnRequest();
+ });
}
//----------------------------------------------------
+ private void dispatchRpcRequest(Request request, Runnable handler) {
+ request.detach();
+ log.log(LogLevel.SPAM, () -> String.format("Dispatching RPC request %s", requestLogId(request)));
+ rpcExecutor.execute(() -> {
+ try {
+ log.log(LogLevel.SPAM, () -> String.format("Executing RPC request %s.", requestLogId(request)));
+ handler.run();
+ } catch (Exception e) {
+ log.log(LogLevel.WARNING,
+ String.format("Exception thrown during execution of RPC request %s: %s", requestLogId(request), e.getMessage()), e);
+ }
+ });
+ }
+
+ private String requestLogId(Request request) {
+ return String.format("%s/%08X", request.methodName(), request.hashCode());
+ }
+
private boolean isProtocolVersionSupported(JRTServerConfigRequest request) {
Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions();
if (supportedProtocolVersions.contains(request.getProtocolVersion())) {
@@ -219,12 +286,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
return false;
}
- private void preHandle(Request req) {
- proxyServer.getStatistics().incRpcRequests();
- req.detach();
- req.target().addWatcher(this);
- }
-
/**
* Handles all versions of "getConfig" requests.
*
@@ -262,7 +323,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
return sb.toString();
}
- final void listCachedConfig(Request req, boolean full) {
+ private void listCachedConfig(Request req, boolean full) {
String[] ret;
MemoryCache cache = proxyServer.getMemoryCache();
ret = new String[cache.size()];
@@ -287,6 +348,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer
}
Arrays.sort(ret);
req.returnValues().add(new StringArray(ret));
+ req.returnRequest();
}
/**
diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
index f32bb2ac024..5b0534f524a 100644
--- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
+++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java
@@ -2,10 +2,13 @@
package com.yahoo.vespa.config.proxy;
import com.yahoo.config.subscription.ConfigSourceSet;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.vespa.config.RawConfig;
import org.junit.After;
@@ -18,7 +21,7 @@ import static org.junit.Assert.assertThat;
/**
* @author hmusum
- * @since 5.1.9
+ * @author bjorncs
*/
public class ConfigProxyRpcServerTest {
private static final String hostname = "localhost";
@@ -26,15 +29,25 @@ public class ConfigProxyRpcServerTest {
private static final String address = "tcp/" + hostname + ":" + port;
private ProxyServer proxyServer;
private ConfigProxyRpcServer rpcServer;
+ private Acceptor acceptor;
+ private Supervisor supervisor;
+ private TestClient client;
@Before
- public void setup() {
+ public void setup() throws ListenFailedException {
proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address));
- rpcServer = new ConfigProxyRpcServer(proxyServer, new Supervisor(new Transport()), null);
+ supervisor = new Supervisor(new Transport());
+ Spec spec = new Spec(0);
+ rpcServer = new ConfigProxyRpcServer(proxyServer, supervisor, spec);
+ acceptor = supervisor.listen(spec);
+ client = new TestClient(acceptor.port());
}
@After
public void teardown() {
+ client.close();
+ acceptor.shutdown().join();
+ supervisor.transport().shutdown().join();
rpcServer.shutdown();
}
@@ -52,7 +65,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodPing() {
Request req = new Request("ping");
- rpcServer.ping(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
@@ -65,7 +78,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodListCachedConfig() {
Request req = new Request("listCachedConfig");
- rpcServer.listCachedConfig(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
String[] ret = req.returnValues().get(0).asStringArray();
@@ -75,7 +88,7 @@ public class ConfigProxyRpcServerTest {
final RawConfig config = ProxyServerTest.fooConfig;
proxyServer.getMemoryCache().update(config);
req = new Request("listCachedConfig");
- rpcServer.listCachedConfig(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
ret = req.returnValues().get(0).asStringArray();
@@ -92,7 +105,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodListCachedConfigFull() {
Request req = new Request("listCachedConfigFull");
- rpcServer.listCachedConfigFull(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
@@ -102,7 +115,7 @@ public class ConfigProxyRpcServerTest {
final RawConfig config = ProxyServerTest.fooConfig;
proxyServer.getMemoryCache().update(config);
req = new Request("listCachedConfigFull");
- rpcServer.listCachedConfigFull(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
ret = req.returnValues().get(0).asStringArray();
assertThat(ret.length, is(1));
@@ -119,7 +132,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodListSourceConnections() {
Request req = new Request("listSourceConnections");
- rpcServer.listSourceConnections(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
@@ -135,7 +148,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodPrintStatistics() {
Request req = new Request("printStatistics");
- rpcServer.printStatistics(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is("\n" +
@@ -149,7 +162,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodInvalidateCache() {
Request req = new Request("invalidateCache");
- rpcServer.invalidateCache(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
@@ -165,7 +178,7 @@ public class ConfigProxyRpcServerTest {
@Test
public void testRpcMethodGetModeAndSetMode() {
Request req = new Request("getMode");
- rpcServer.getMode(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is("default"));
@@ -173,7 +186,7 @@ public class ConfigProxyRpcServerTest {
req = new Request("setMode");
String mode = "memorycache";
req.parameters().add(new StringValue(mode));
- rpcServer.setMode(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
String[] ret = req.returnValues().get(0).asStringArray();
@@ -183,7 +196,7 @@ public class ConfigProxyRpcServerTest {
assertThat(proxyServer.getMode().name(), is(mode));
req = new Request("getMode");
- rpcServer.getMode(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is(mode));
@@ -192,7 +205,7 @@ public class ConfigProxyRpcServerTest {
String oldMode = mode;
mode = "invalid";
req.parameters().add(new StringValue(mode));
- rpcServer.setMode(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
ret = req.returnValues().get(0).asStringArray();
@@ -211,7 +224,7 @@ public class ConfigProxyRpcServerTest {
String spec1 = "tcp/a:19070";
String spec2 = "tcp/b:19070";
req.parameters().add(new StringValue(spec1 + "," + spec2));
- rpcServer.updateSources(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is("Updated config sources to: " + spec1 + "," + spec2));
@@ -221,7 +234,7 @@ public class ConfigProxyRpcServerTest {
req = new Request("updateSources");
req.parameters().add(new StringValue(spec1 + "," + spec2));
- rpcServer.updateSources(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is("Cannot update sources when in '" + Mode.ModeName.MEMORYCACHE.name().toLowerCase() + "' mode"));
@@ -245,10 +258,30 @@ public class ConfigProxyRpcServerTest {
Request req = new Request("dumpCache");
String path = "/tmp";
req.parameters().add(new StringValue(path));
- rpcServer.dumpCache(req);
+ client.invoke(req);
assertFalse(req.errorMessage(), req.isError());
assertThat(req.returnValues().size(), is(1));
assertThat(req.returnValues().get(0).asString(), is("success"));
}
+ private static class TestClient implements AutoCloseable {
+
+ private final Supervisor supervisor;
+ private final Target target;
+
+ TestClient(int rpcPort) {
+ this.supervisor = new Supervisor(new Transport());
+ this.target = supervisor.connect(new Spec(rpcPort));
+ }
+
+ void invoke(Request request) {
+ target.invokeSync(request, 0/*no timeout*/);
+ }
+
+ @Override
+ public void close() {
+ target.close();
+ supervisor.transport().shutdown().join();
+ }
+ }
}