// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Acceptor; import com.yahoo.jrt.ListenFailedException; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.Supervisor; import com.yahoo.jrt.Target; import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.RawConfig; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.time.Duration; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertEquals; /** * @author hmusum * @author bjorncs */ public class ConfigProxyRpcServerTest { private static final String hostname = "localhost"; private static final int port = 12345; private static final String configSourceAddress = "tcp/" + hostname + ":" + port; private static TestServer server; private static TestClient client; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @BeforeClass public static void setup() throws ListenFailedException { server = new TestServer(); client = new TestClient(server.listenPort()); } @AfterClass public static void teardown() { client.close(); server.close(); } private static void reset() throws ListenFailedException { teardown(); setup(); } @Test public void basic() { ProxyServer proxy = createTestServer(new MockConfigSource()); Spec spec = new Spec("localhost", 12345); ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, new Supervisor(new Transport()), spec); assertEquals(spec, server.getSpec()); } /** * Tests ping RPC command */ @Test public void testRpcMethodPing() { Request req = new Request("ping"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals(0, req.returnValues().get(0).asInt32()); } /** * Tests listCachedConfig RPC command */ @Test public void testRpcMethodListCachedConfig() throws ListenFailedException { reset(); Request req = new Request("listCachedConfig"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(1, req.returnValues().size()); assertEquals(0, ret.length); final RawConfig config = ProxyServerTest.fooConfig; server.proxyServer().memoryCache().update(config); req = new Request("listCachedConfig"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); ret = req.returnValues().get(0).asStringArray(); assertEquals(1, ret.length); assertEquals(config.getNamespace() + "." + config.getName() + "," + config.getConfigId() + "," + config.getGeneration() + "," + config.getPayloadChecksums(), ret[0]); } /** * Tests listCachedConfig RPC command */ @Test public void testRpcMethodListCachedConfigFull() { Request req = new Request("listCachedConfigFull"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(0, ret.length); final RawConfig config = ProxyServerTest.fooConfig; server.proxyServer().memoryCache().update(config); req = new Request("listCachedConfigFull"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); assertEquals(1, ret.length); assertEquals(config.getNamespace() + "." + config.getName() + "," + config.getConfigId() + "," + config.getGeneration() + "," + config.getPayloadChecksums() + "," + config.getPayload().getData(), ret[0]); } /** * Tests listSourceConnections RPC command */ @Test public void testRpcMethodListSourceConnections() throws ListenFailedException { reset(); Request req = new Request("listSourceConnections"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); final String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(2, ret.length); assertEquals("Current source: " + configSourceAddress, ret[0]); assertEquals("All sources:\n" + configSourceAddress + "\n", ret[1]); } /** * Tests invalidateCache RPC command */ @Test public void testRpcMethodInvalidateCache() { Request req = new Request("invalidateCache"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); final String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(2, ret.length); assertEquals("0", ret[0]); assertEquals("success", ret[1]); } /** * Tests getMode and setMode RPC commands */ @Test public void testRpcMethodGetModeAndSetMode() { Request req = new Request("getMode"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals("default", req.returnValues().get(0).asString()); req = new Request("setMode"); String mode = "memorycache"; req.parameters().add(new StringValue(mode)); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(2, ret.length); assertEquals("0", ret[0]); assertEquals("success", ret[1]); assertEquals(mode, server.proxyServer().getMode().name()); req = new Request("getMode"); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals(mode, req.returnValues().get(0).asString()); req = new Request("setMode"); String oldMode = mode; mode = "invalid"; req.parameters().add(new StringValue(mode)); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); ret = req.returnValues().get(0).asStringArray(); assertEquals(2, ret.length); assertEquals("1", ret[0]); assertEquals("Unrecognized mode '" + mode + "' supplied. Legal modes are '" + Mode.modes() + "'", ret[1]); assertEquals(oldMode, server.proxyServer().getMode().name()); } /** * Tests updateSources RPC command */ @Test public void testRpcMethodUpdateSources() throws ListenFailedException { reset(); Request req = new Request("updateSources"); String spec1 = "tcp/a:19070"; String spec2 = "tcp/b:19070"; req.parameters().add(new StringValue(spec1 + "," + spec2)); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals("Updated config sources to: " + spec1 + "," + spec2, req.returnValues().get(0).asString()); server.proxyServer().setMode(Mode.ModeName.MEMORYCACHE.name()); req = new Request("updateSources"); req.parameters().add(new StringValue(spec1 + "," + spec2)); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals("Cannot update sources when in '" + Mode.ModeName.MEMORYCACHE.name().toLowerCase() + "' mode", req.returnValues().get(0).asString()); // TODO source connections needs to have deterministic order to work /*req = new Request("listSourceConnections"); rpcServer.listSourceConnections(req); assertFalse(req.errorMessage(), req.isError()); final String[] ret = req.returnValues().get(0).asStringArray(); assertEquals(ret.length, is(2)); assertEquals(ret[0], is("Current source: " + spec1)); assertEquals(ret[1], is("All sources:\n" + spec2 + "\n" + spec1 + "\n")); */ } /** * Tests dumpCache RPC command */ @Test public void testRpcMethodDumpCache() throws IOException { Request req = new Request("dumpCache"); String path = temporaryFolder.newFolder().getAbsolutePath(); req.parameters().add(new StringValue(path)); client.invoke(req); assertFalse(req.errorMessage(), req.isError()); assertEquals(1, req.returnValues().size()); assertEquals("success", req.returnValues().get(0).asString()); } private static ProxyServer createTestServer(ConfigSourceSet source) { return new ProxyServer(null, source, new RpcConfigSourceClient(new ResponseHandler(), source)); } private static class TestServer implements AutoCloseable { private static final Spec SPEC = new Spec(0); private final ProxyServer proxyServer = createTestServer(new ConfigSourceSet(configSourceAddress)); private final Supervisor supervisor = new Supervisor(new Transport()); private final ConfigProxyRpcServer rpcServer = new ConfigProxyRpcServer(proxyServer, supervisor, SPEC); private final Acceptor acceptor; TestServer() throws ListenFailedException { acceptor = supervisor.listen(SPEC); } ProxyServer proxyServer() { return proxyServer; } int listenPort() { return acceptor.port(); } @Override public void close() { acceptor.shutdown().join(); supervisor.transport().shutdown().join(); rpcServer.shutdown(); } } private static class TestClient implements AutoCloseable { private final Supervisor supervisor; private final Target target; TestClient(int rpcPort) { this.supervisor = new Supervisor(new Transport()); this.target = supervisor.connect(new Spec(rpcPort)); } void invoke(Request request) { target.invokeSync(request, Duration.ofMinutes(10).getSeconds()); } @Override public void close() { target.close(); supervisor.transport().shutdown().join(); } } }