diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2019-07-05 10:28:32 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-07-05 10:28:32 +0200 |
commit | 99c794f11d90b9d928be05196d244c500298a495 (patch) | |
tree | a6dfbec82e5a5bec683ee80dc7559378a7837f13 /config-proxy | |
parent | 532c8ded1cdaeb4f7c6e996b9c0cba0ea855c771 (diff) | |
parent | ba63090c33ecd2eb1ad92e9a810f0a3fcf9254b5 (diff) |
Merge pull request #9952 from vespa-engine/bjorncs/config-proxy
Bjorncs/config proxy
Diffstat (limited to 'config-proxy')
3 files changed, 214 insertions, 98 deletions
diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index db3b787f9f9..5ffc7293742 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,10 +1,21 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.jrt.*; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.TargetWatcher; import com.yahoo.log.LogLevel; -import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.ErrorCode; +import com.yahoo.vespa.config.JRTMethods; +import com.yahoo.vespa.config.RawConfig; import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; @@ -12,6 +23,9 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; import java.util.Arrays; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -28,6 +42,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private final Spec spec; private final Supervisor supervisor; private final ProxyServer proxyServer; + private final ExecutorService rpcExecutor = Executors.newFixedThreadPool(8); ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) { this.proxyServer = proxyServer; @@ -50,6 +65,12 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer void shutdown() { supervisor.transport().shutdown(); + try { + rpcExecutor.shutdownNow(); + rpcExecutor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } Spec getSpec() { @@ -109,12 +130,16 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * @param req a Request */ private void getConfigV3(Request req) { - log.log(LogLevel.SPAM, () -> "getConfigV3"); - JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req); - if (isProtocolVersionSupported(request)) { - preHandle(req); - getConfigImpl(request); - } + dispatchRpcRequest(req, () -> { + JRTServerConfigRequest request = JRTServerConfigRequestV3.createFromRequest(req); + if (isProtocolVersionSupported(request)) { + proxyServer.getStatistics().incRpcRequests(); + req.target().addWatcher(this); + getConfigImpl(request); + return; + } + req.returnRequest(); + }); } /** @@ -122,8 +147,11 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * * @param req a Request */ - void ping(Request req) { - req.returnValues().add(new Int32Value(0)); + private void ping(Request req) { + dispatchRpcRequest(req, () -> { + req.returnValues().add(new Int32Value(0)); + req.returnRequest(); + }); } /** @@ -131,81 +159,120 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer * * @param req a Request */ - void printStatistics(Request req) { - StringBuilder sb = new StringBuilder(); - sb.append("\nDelayed responses queue size: "); - sb.append(proxyServer.delayedResponses.size()); - sb.append("\nContents: "); - for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) { - sb.append(delayed.getRequest().toString()).append("\n"); - } - - req.returnValues().add(new StringValue(sb.toString())); - } + private void printStatistics(Request req) { + dispatchRpcRequest(req, () -> { + StringBuilder sb = new StringBuilder(); + sb.append("\nDelayed responses queue size: "); + sb.append(proxyServer.delayedResponses.size()); + sb.append("\nContents: "); + for (DelayedResponse delayed : proxyServer.delayedResponses.responses()) { + sb.append(delayed.getRequest().toString()).append("\n"); + } - void listCachedConfig(Request req) { - listCachedConfig(req, false); + req.returnValues().add(new StringValue(sb.toString())); + req.returnRequest(); + }); } - void listCachedConfigFull(Request req) { - listCachedConfig(req, true); + private void listCachedConfig(Request req) { + dispatchRpcRequest(req, () -> listCachedConfig(req, false)); } - void listSourceConnections(Request req) { - String[] ret = new String[2]; - ret[0] = "Current source: " + proxyServer.getActiveSourceConnection(); - ret[1] = "All sources:\n" + printSourceConnections(); - req.returnValues().add(new StringArray(ret)); + private void listCachedConfigFull(Request req) { + dispatchRpcRequest(req, () -> listCachedConfig(req, true)); } - void updateSources(Request req) { - String sources = req.parameters().get(0).asString(); - String ret; - System.out.println(proxyServer.getMode()); - if (proxyServer.getMode().requiresConfigSource()) { - proxyServer.updateSourceConnections(Arrays.asList(sources.split(","))); - ret = "Updated config sources to: " + sources; - } else { - ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode"; - } - req.returnValues().add(new StringValue(ret)); + private void listSourceConnections(Request req) { + dispatchRpcRequest(req, () -> { + String[] ret = new String[2]; + ret[0] = "Current source: " + proxyServer.getActiveSourceConnection(); + ret[1] = "All sources:\n" + printSourceConnections(); + req.returnValues().add(new StringArray(ret)); + req.returnRequest(); + }); } - void invalidateCache(Request req) { - proxyServer.getMemoryCache().clear(); - String[] s = new String[2]; - s[0] = "0"; - s[1] = "success"; - req.returnValues().add(new StringArray(s)); + private void updateSources(Request req) { + dispatchRpcRequest(req, () -> { + String sources = req.parameters().get(0).asString(); + String ret; + System.out.println(proxyServer.getMode()); + if (proxyServer.getMode().requiresConfigSource()) { + proxyServer.updateSourceConnections(Arrays.asList(sources.split(","))); + ret = "Updated config sources to: " + sources; + } else { + ret = "Cannot update sources when in '" + proxyServer.getMode().name() + "' mode"; + } + req.returnValues().add(new StringValue(ret)); + req.returnRequest(); + }); } - void setMode(Request req) { - String suppliedMode = req.parameters().get(0).asString(); - log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode); - String[] s = new String[2]; - if (Mode.validModeName(suppliedMode.toLowerCase())) { - proxyServer.setMode(suppliedMode); + private void invalidateCache(Request req) { + dispatchRpcRequest(req, () -> { + proxyServer.getMemoryCache().clear(); + String[] s = new String[2]; s[0] = "0"; s[1] = "success"; - } else { - s[0] = "1"; - s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'"; - } + req.returnValues().add(new StringArray(s)); + req.returnRequest(); + }); + } + + private void setMode(Request req) { + dispatchRpcRequest(req, () -> { + String suppliedMode = req.parameters().get(0).asString(); + log.log(LogLevel.DEBUG, () -> "Supplied mode=" + suppliedMode); + String[] s = new String[2]; + if (Mode.validModeName(suppliedMode.toLowerCase())) { + proxyServer.setMode(suppliedMode); + s[0] = "0"; + s[1] = "success"; + } else { + s[0] = "1"; + s[1] = "Could not set mode to '" + suppliedMode + "'. Legal modes are '" + Mode.modes() + "'"; + } - req.returnValues().add(new StringArray(s)); + req.returnValues().add(new StringArray(s)); + req.returnRequest(); + }); } - void getMode(Request req) { - req.returnValues().add(new StringValue(proxyServer.getMode().name())); + private void getMode(Request req) { + dispatchRpcRequest(req, () -> { + req.returnValues().add(new StringValue(proxyServer.getMode().name())); + req.returnRequest(); + }); } - void dumpCache(Request req) { - final MemoryCache memoryCache = proxyServer.getMemoryCache(); - req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); + private void dumpCache(Request req) { + dispatchRpcRequest(req, () -> { + final MemoryCache memoryCache = proxyServer.getMemoryCache(); + req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); + req.returnRequest(); + }); } //---------------------------------------------------- + private void dispatchRpcRequest(Request request, Runnable handler) { + request.detach(); + log.log(LogLevel.SPAM, () -> String.format("Dispatching RPC request %s", requestLogId(request))); + rpcExecutor.execute(() -> { + try { + log.log(LogLevel.SPAM, () -> String.format("Executing RPC request %s.", requestLogId(request))); + handler.run(); + } catch (Exception e) { + log.log(LogLevel.WARNING, + String.format("Exception thrown during execution of RPC request %s: %s", requestLogId(request), e.getMessage()), e); + } + }); + } + + private String requestLogId(Request request) { + return String.format("%s/%08X", request.methodName(), request.hashCode()); + } + private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { Set<Long> supportedProtocolVersions = JRTConfigRequestFactory.supportedProtocolVersions(); if (supportedProtocolVersions.contains(request.getProtocolVersion())) { @@ -219,12 +286,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return false; } - private void preHandle(Request req) { - proxyServer.getStatistics().incRpcRequests(); - req.detach(); - req.target().addWatcher(this); - } - /** * Handles all versions of "getConfig" requests. * @@ -262,7 +323,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer return sb.toString(); } - final void listCachedConfig(Request req, boolean full) { + private void listCachedConfig(Request req, boolean full) { String[] ret; MemoryCache cache = proxyServer.getMemoryCache(); ret = new String[cache.size()]; @@ -287,6 +348,7 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer } Arrays.sort(ret); req.returnValues().add(new StringArray(ret)); + req.returnRequest(); } /** diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 40526641855..d93eb819463 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -37,6 +37,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class ProxyServer implements Runnable { private static final int DEFAULT_RPC_PORT = 19090; + private static final int JRT_TRANSPORT_THREADS = 4; static final String DEFAULT_PROXY_CONFIG_SOURCES = "tcp/localhost:19070"; final static Logger log = Logger.getLogger(ProxyServer.class.getName()); @@ -44,7 +45,7 @@ public class ProxyServer implements Runnable { // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); - private final Supervisor supervisor = new Supervisor(new Transport()); + private final Supervisor supervisor = new Supervisor(new Transport(JRT_TRANSPORT_THREADS)); private final ClientUpdater clientUpdater; private ScheduledFuture<?> delayedResponseScheduler; diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index f32bb2ac024..ffaf5bafc59 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -2,10 +2,13 @@ package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSourceSet; +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.RawConfig; import org.junit.After; @@ -18,24 +21,25 @@ import static org.junit.Assert.assertThat; /** * @author hmusum - * @since 5.1.9 + * @author bjorncs */ public class ConfigProxyRpcServerTest { private static final String hostname = "localhost"; private static final int port = 12345; private static final String address = "tcp/" + hostname + ":" + port; - private ProxyServer proxyServer; - private ConfigProxyRpcServer rpcServer; + private TestServer server; + private TestClient client; @Before - public void setup() { - proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address)); - rpcServer = new ConfigProxyRpcServer(proxyServer, new Supervisor(new Transport()), null); + public void setup() throws ListenFailedException { + server = new TestServer(); + client = new TestClient(server.listenPort()); } @After public void teardown() { - rpcServer.shutdown(); + client.close(); + server.close(); } @Test @@ -52,7 +56,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodPing() { Request req = new Request("ping"); - rpcServer.ping(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -65,7 +69,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListCachedConfig() { Request req = new Request("listCachedConfig"); - rpcServer.listCachedConfig(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); String[] ret = req.returnValues().get(0).asStringArray(); @@ -73,9 +77,9 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - proxyServer.getMemoryCache().update(config); + server.proxyServer().getMemoryCache().update(config); req = new Request("listCachedConfig"); - rpcServer.listCachedConfig(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); ret = req.returnValues().get(0).asStringArray(); @@ -92,7 +96,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListCachedConfigFull() { Request req = new Request("listCachedConfigFull"); - rpcServer.listCachedConfigFull(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -100,9 +104,9 @@ public class ConfigProxyRpcServerTest { assertThat(ret.length, is(0)); final RawConfig config = ProxyServerTest.fooConfig; - proxyServer.getMemoryCache().update(config); + server.proxyServer().getMemoryCache().update(config); req = new Request("listCachedConfigFull"); - rpcServer.listCachedConfigFull(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); assertThat(ret.length, is(1)); @@ -119,7 +123,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodListSourceConnections() { Request req = new Request("listSourceConnections"); - rpcServer.listSourceConnections(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -135,7 +139,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodPrintStatistics() { Request req = new Request("printStatistics"); - rpcServer.printStatistics(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("\n" + @@ -149,7 +153,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodInvalidateCache() { Request req = new Request("invalidateCache"); - rpcServer.invalidateCache(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); @@ -165,7 +169,7 @@ public class ConfigProxyRpcServerTest { @Test public void testRpcMethodGetModeAndSetMode() { Request req = new Request("getMode"); - rpcServer.getMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("default")); @@ -173,17 +177,17 @@ public class ConfigProxyRpcServerTest { req = new Request("setMode"); String mode = "memorycache"; req.parameters().add(new StringValue(mode)); - rpcServer.setMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); String[] ret = req.returnValues().get(0).asStringArray(); assertThat(ret.length, is(2)); assertThat(ret[0], is("0")); assertThat(ret[1], is("success")); - assertThat(proxyServer.getMode().name(), is(mode)); + assertThat(server.proxyServer().getMode().name(), is(mode)); req = new Request("getMode"); - rpcServer.getMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is(mode)); @@ -192,14 +196,14 @@ public class ConfigProxyRpcServerTest { String oldMode = mode; mode = "invalid"; req.parameters().add(new StringValue(mode)); - rpcServer.setMode(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); assertThat(ret.length, is(2)); assertThat(ret[0], is("1")); assertThat(ret[1], is("Could not set mode to '" + mode + "'. Legal modes are '" + Mode.modes() + "'")); - assertThat(proxyServer.getMode().name(), is(oldMode)); + assertThat(server.proxyServer().getMode().name(), is(oldMode)); } /** @@ -211,17 +215,17 @@ public class ConfigProxyRpcServerTest { String spec1 = "tcp/a:19070"; String spec2 = "tcp/b:19070"; req.parameters().add(new StringValue(spec1 + "," + spec2)); - rpcServer.updateSources(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("Updated config sources to: " + spec1 + "," + spec2)); - proxyServer.setMode(Mode.ModeName.MEMORYCACHE.name()); + server.proxyServer().setMode(Mode.ModeName.MEMORYCACHE.name()); req = new Request("updateSources"); req.parameters().add(new StringValue(spec1 + "," + spec2)); - rpcServer.updateSources(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("Cannot update sources when in '" + Mode.ModeName.MEMORYCACHE.name().toLowerCase() + "' mode")); @@ -245,10 +249,59 @@ public class ConfigProxyRpcServerTest { Request req = new Request("dumpCache"); String path = "/tmp"; req.parameters().add(new StringValue(path)); - rpcServer.dumpCache(req); + client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertThat(req.returnValues().size(), is(1)); assertThat(req.returnValues().get(0).asString(), is("success")); } + private static class TestServer implements AutoCloseable { + + private static final Spec SPEC = new Spec(0); + + private final ProxyServer proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address)); + private final Supervisor supervisor = new Supervisor(new Transport()); + private final ConfigProxyRpcServer rpcServer = new ConfigProxyRpcServer(proxyServer, supervisor, SPEC); + private final Acceptor acceptor; + + TestServer() throws ListenFailedException { + acceptor = supervisor.listen(SPEC); + } + + ProxyServer proxyServer() { + return proxyServer; + } + + int listenPort() { + return acceptor.port(); + } + + @Override + public void close() { + acceptor.shutdown().join(); + supervisor.transport().shutdown().join(); + rpcServer.shutdown(); + } + } + + private static class TestClient implements AutoCloseable { + + private final Supervisor supervisor; + private final Target target; + + TestClient(int rpcPort) { + this.supervisor = new Supervisor(new Transport()); + this.target = supervisor.connect(new Spec(rpcPort)); + } + + void invoke(Request request) { + target.invokeSync(request, 0/*no timeout*/); + } + + @Override + public void close() { + target.close(); + supervisor.transport().shutdown().join(); + } + } } |