diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2019-07-03 17:40:40 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2019-07-03 17:40:40 +0200 |
commit | 163f84450c5744755d8d59b7dff855b50725c2ba (patch) | |
tree | f3ca64ca897d4847ece7bf798e1cd8a36d5a505d | |
parent | 9681c501d73a45936a663b94116b7099191a74e0 (diff) |
Process all RPC requests in dedicated thread pool
- Rewrite unit tests to use a RPC client
-rw-r--r-- | config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java | 202 | ||||
-rw-r--r-- | config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java | 69 |
2 files changed, 183 insertions, 88 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index db3b787f9f9..5ffc7293742 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,10 +1,21 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.TargetWatcher; import com.yahoo.log.LogLevel; -import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.ErrorCode; +import com.yahoo.vespa.config.JRTMethods; +import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; @@ -12,6 +23,9 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; import java.util.Arrays; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -28,6 +42,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private final Spec spec; private final Supervisor supervisor; private final ProxyServer proxyServer; + private final ExecutorService rpcExecutor = Executors.newFixedThreadPool(8); ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) { this.proxyServer = proxyServer; @@ -50,6 +65,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer void shutdown() { supervisor.transport().shutdown(); + try { + rpcExecutor.shutdownNow(); + rpcExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } Spec getSpec() { @@ -109,12 +130,16 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * @param req a Request */ private void getConfigV3(Request req) { - log.log(LogLevel.SPAM, () -> "getConfigV3"); - JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req); - if (isProtocolVersionSupported(request)) { - preHandle(req); - getConfigImpl(request); - } + dispatchRpcRequest(req, () -> { + JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req); + if (isProtocolVersionSupported(request)) { + proxyServer.getStatistics().incRpcRequests(); + req.target().addWatcher(this); + getConfigImpl(request); + return; + } + req.returnRequest(); + }); } /** @@ -122,8 +147,11 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * * @param req a Request */ - void ping(Request req) { - req.returnValues().add(new Int32Value(0)); + private void ping(Request req) { + dispatchRpcRequest(req, () -> { + req.returnValues().add(new Int32Value(0)); + req.returnRequest(); + }); } /** @@ -131,81 +159,120 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * * @param req a Request */ - void printStatistics(Request req) { - StringBuilder sb = new StringBuilder(); - sb.append("\nDelayed responses queue size: "); - sb.append(proxyServer.delayedResponses.size()); - sb.append("\nContents: "); - for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) { - sb.append(delayed.getRequest().toString()).append("\n"); - } - - req.returnValues().add(new StringValue(sb.toString())); - } + private void printStatistics(Request req) { + dispatchRpcRequest(req, () -> { + StringBuilder sb = new StringBuilder(); + sb.append("\nDelayed responses queue size: "); + sb.append(proxyServer.delayedResponses.size()); + sb.append("\nContents: "); + for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) { + sb.append(delayed.getRequest().toString()).append("\n"); + } - void listCachedConfig(Request req) { - listCachedConfig(req, false); + req.returnValues().add(new StringValue(sb.toString())); + req.returnRequest(); + }); } - void listCachedConfigFull(Request req) { - listCachedConfig(req, true); + private void listCachedConfig(Request req) { + dispatchRpcRequest(req, () -> listCachedConfig(req, false)); } - void listSourceConnections(Request req) { - String[] ret = new String[2]; - ret[0] = "Current source: " + proxyServer.getActiveSourceConnection(); - ret[1] = "All sources:\n" + printSourceConnections(); - req.returnValues().add(new StringArray(ret)); + private void listCachedConfigFull(Request req) { + dispatchRpcRequest(req, () -> listCachedConfig(req, true)); } - void updateSources(Request req) { - String sources = req.parameters().get(0).asString(); - String ret; - System.out.println(proxyServer.getMode()); - if (proxyServer.getMode().requiresConfigSource()) { - proxyServer.updateSourceConnections(Arrays.asList(sources.split(","))); - ret = "Updated config sources to: " + sources; - } else { - ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode"; - } - req.returnValues().add(new StringValue(ret)); + private void listSourceConnections(Request req) { + dispatchRpcRequest(req, () -> { + String[] ret = new String[2]; + ret[0] = "Current source: " + proxyServer.getActiveSourceConnection(); + ret[1] = "All sources:\n" + printSourceConnections(); + req.returnValues().add(new StringArray(ret)); + req.returnRequest(); + }); } - void invalidateCache(Request req) { - proxyServer.getMemoryCache().clear(); - String[] s = new String[2]; - s[0] = "0"; - s[1] = "success"; - req.returnValues().add(new StringArray(s)); + private void updateSources(Request req) { + dispatchRpcRequest(req, () -> { + String sources = req.parameters().get(0).asString(); + String ret; + System.out.println(proxyServer.getMode()); + if (proxyServer.getMode().requiresConfigSource()) { + proxyServer.updateSourceConnections(Arrays.asList(sources.split(","))); + ret = "Updated config sources to: " + sources; + } else { + ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode"; + } + req.returnValues().add(new StringValue(ret)); + req.returnRequest(); + }); } - void setMode(Request req) { - String suppliedMode = req.parameters().get(0).asString(); - log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode); - String[] s = new String[2]; - if (Mode.validModeName(suppliedMode.toLowerCase())) { - proxyServer.setMode(suppliedMode); + private void invalidateCache(Request req) { + dispatchRpcRequest(req, () -> { + proxyServer.getMemoryCache().clear(); + String[] s = new String[2]; s[0] = "0"; s[1] = "success"; - } else { - s[0] = "1"; - s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'"; - } + req.returnValues().add(new StringArray(s)); + req.returnRequest(); + }); + } + + private void setMode(Request req) { + dispatchRpcRequest(req, () -> { + String suppliedMode = req.parameters().get(0).asString(); + log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode); + String[] s = new String[2]; + if (Mode.validModeName(suppliedMode.toLowerCase())) { + proxyServer.setMode(suppliedMode); + s[0] = "0"; + s[1] = "success"; + } else { + s[0] = "1"; + s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'"; + } - req.returnValues().add(new StringArray(s)); + req.returnValues().add(new StringArray(s)); + req.returnRequest(); + }); } - void getMode(Request req) { - req.returnValues().add(new StringValue(proxyServer.getMode().name())); + private void getMode(Request req) { + dispatchRpcRequest(req, () -> { + req.returnValues().add(new StringValue(proxyServer.getMode().name())); + req.returnRequest(); + }); } - void dumpCache(Request req) { - final MemoryCache memoryCache = proxyServer.getMemoryCache(); - req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); + private void dumpCache(Request req) { + dispatchRpcRequest(req, () -> { + final MemoryCache memoryCache = proxyServer.getMemoryCache(); + req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); + req.returnRequest(); + }); } //---------------------------------------------------- + private void dispatchRpcRequest(Request request, Runnable handler) { + request.detach(); + log.log(LogLevel.SPAM, () -> String.format("Dispatching RPC request %s", requestLogId(request))); + rpcExecutor.execute(() -> { + try { + log.log(LogLevel.SPAM, () -> String.format("Executing RPC request %s.", requestLogId(request))); + handler.run(); + } catch (Exception e) { + log.log(LogLevel.WARNING, + String.format("Exception thrown during execution of RPC request %s: %s", requestLogId(request), e.getMessage()), e); + } + }); + } + + private String requestLogId(Request request) { + return String.format("%s/%08X", request.methodName(), request.hashCode()); + } + private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); if (supportedProtocolVersions.contains(request.getProtocolVersion())) { @@ -219,12 +286,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return false; } - private void preHandle(Request req) { - proxyServer.getStatistics().incRpcRequests(); - req.detach(); - req.target().addWatcher(this); - } - /** * Handles all versions of "getConfig" requests. * @@ -262,7 +323,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return sb.toString(); } - final void listCachedConfig(Request req, boolean full) { + private void listCachedConfig(Request req, boolean full) { String[] ret; MemoryCache cache = proxyServer.getMemoryCache(); ret = new String[cache.size()]; @@ -287,6 +348,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer } Arrays.sort(ret); req.returnValues().add(new StringArray(ret)); + req.returnRequest(); } /** diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index f32bb2ac024..5b0534f524a 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -2,10 +2,13 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.RawConfig; import org.junit.After; @@ -18,7 +21,7 @@ import static org.junit.Assert.assertThat; /** * @author hmusum - * @since 5.1.9 + * @author bjorncs */ public class ConfigProxyRpcServerTest { private static final String hostname = "localhost"; @@ -26,15 +29,25 @@ public class ConfigProxyRpcServerTest { private static final String address = "tcp/" + hostname + ":" + port; private ProxyServer proxyServer; private ConfigProxyRpcServer rpcServer; + private Acceptor acceptor; + private Supervisor supervisor; + private TestClient client; @Before - public void setup() { + public void setup() throws ListenFailedException { proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address)); - rpcServer = new ConfigProxyRpcServer(proxyServer, new Supervisor(new Transport()), null); + supervisor = new Supervisor(new Transport()); + Spec spec = new Spec(0); + rpcServer = new ConfigProxyRpcServer(proxyServer, supervisor, spec); + acceptor = supervisor.listen(spec); + client = new TestClient(acceptor.port()); } @After public void teardown() { + client.close(); + acceptor.shutdown().join(); + supervisor.transport().shutdown().join(); rpcServer.shutdown(); } @@ -52,7 +65,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodPing() { Request req = new Request("ping"); - rpcServer.ping(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -65,7 +78,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListCachedConfig() { Request req = new Request("listCachedConfig"); - rpcServer.listCachedConfig(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); String[] ret = req.returnValues().get(0).asStringArray(); @@ -75,7 +88,7 @@ public class ConfigProxyRpcServerTest { final RawConfig config = ProxyServerTest.fooConfig; proxyServer.getMemoryCache().update(config); req = new Request("listCachedConfig"); - rpcServer.listCachedConfig(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); ret = req.returnValues().get(0).asStringArray(); @@ -92,7 +105,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListCachedConfigFull() { Request req = new Request("listCachedConfigFull"); - rpcServer.listCachedConfigFull(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -102,7 +115,7 @@ public class ConfigProxyRpcServerTest { final RawConfig config = ProxyServerTest.fooConfig; proxyServer.getMemoryCache().update(config); req = new Request("listCachedConfigFull"); - rpcServer.listCachedConfigFull(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); assertThat(ret.length, is(1)); @@ -119,7 +132,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListSourceConnections() { Request req = new Request("listSourceConnections"); - rpcServer.listSourceConnections(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -135,7 +148,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodPrintStatistics() { Request req = new Request("printStatistics"); - rpcServer.printStatistics(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("\n" + @@ -149,7 +162,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodInvalidateCache() { Request req = new Request("invalidateCache"); - rpcServer.invalidateCache(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -165,7 +178,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodGetModeAndSetMode() { Request req = new Request("getMode"); - rpcServer.getMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("default")); @@ -173,7 +186,7 @@ public class ConfigProxyRpcServerTest { req = new Request("setMode"); String mode = "memorycache"; req.parameters().add(new StringValue(mode)); - rpcServer.setMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); String[] ret = req.returnValues().get(0).asStringArray(); @@ -183,7 +196,7 @@ public class ConfigProxyRpcServerTest { assertThat(proxyServer.getMode().name(), is(mode)); req = new Request("getMode"); - rpcServer.getMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is(mode)); @@ -192,7 +205,7 @@ public class ConfigProxyRpcServerTest { String oldMode = mode; mode = "invalid"; req.parameters().add(new StringValue(mode)); - rpcServer.setMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); @@ -211,7 +224,7 @@ public class ConfigProxyRpcServerTest { String spec1 = "tcp/a:19070"; String spec2 = "tcp/b:19070"; req.parameters().add(new StringValue(spec1 + "," + spec2)); - rpcServer.updateSources(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("Updated config sources to: " + spec1 + "," + spec2)); @@ -221,7 +234,7 @@ public class ConfigProxyRpcServerTest { req = new Request("updateSources"); req.parameters().add(new StringValue(spec1 + "," + spec2)); - rpcServer.updateSources(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("Cannot update sources when in '" + Mode.ModeName.MEMORYCACHE.name().toLowerCase() + "' mode")); @@ -245,10 +258,30 @@ public class ConfigProxyRpcServerTest { Request req = new Request("dumpCache"); String path = "/tmp"; req.parameters().add(new StringValue(path)); - rpcServer.dumpCache(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("success")); } + private static class TestClient implements AutoCloseable { + + private final Supervisor supervisor; + private final Target target; + + TestClient(int rpcPort) { + this.supervisor = new Supervisor(new Transport()); + this.target = supervisor.connect(new Spec(rpcPort)); + } + + void invoke(Request request) { + target.invokeSync(request, 0/*no timeout*/); + } + + @Override + public void close() { + target.close(); + supervisor.transport().shutdown().join(); + } + } } |