diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2019-03-27 14:26:39 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2019-03-27 14:37:33 +0100 |
commit | bbc7aecaa48a73827a64cb3e355e85481c0bc63c (patch) | |
tree | 64c19eca15506b56d7c28d43892e8649348aacdd /logserver/src | |
parent | 634983cc739b8f34cc85867fa200f889cbfda9c9 (diff) |
Add rpc interface to logserver
Diffstat (limited to 'logserver/src')
6 files changed, 362 insertions, 6 deletions
diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java b/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java new file mode 100644 index 00000000000..64d81610cdb --- /dev/null +++ b/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java @@ -0,0 +1,88 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.logserver.protocol; + +import com.yahoo.jrt.DataValue; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Int8Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.logserver.LogDispatcher; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * RPC method that archives incoming log messages + * + * @author bjorncs + */ +class ArchiveLogMessagesMethod { + + static final String METHOD_NAME = "vespa.logserver.archiveLogMessages"; + + private static final Logger log = Logger.getLogger(ArchiveLogMessagesMethod.class.getName()); + + private final Executor executor = Executors.newSingleThreadExecutor(); + private final LogDispatcher logDispatcher; + private final Method method; + + ArchiveLogMessagesMethod(LogDispatcher logDispatcher) { + this.logDispatcher = logDispatcher; + this.method = new Method(METHOD_NAME, "bix", "bix", this::log) + .methodDesc("Archive log messages") + .paramDesc(0, "compressionType", "Compression type (0=raw)") + .paramDesc(1, "uncompressedSize", "Uncompressed size (0 if compression type 'raw')") + .paramDesc(2, "logRequest", "Log request encoded with protobuf") + .returnDesc(0, "compressionType", "Compression type (0=raw)") + .returnDesc(1, "uncompressedSize", "Uncompressed size (0 if compression type 'raw')") + .returnDesc(2, "logResponse", "Log response encoded with protobuf"); + } + + Method methodDefinition() { + return method; + } + + private void log(Request rpcRequest) { + rpcRequest.detach(); + executor.execute(new ArchiveLogMessagesTask(rpcRequest, logDispatcher)); + } + + private static class ArchiveLogMessagesTask implements Runnable { + final Request rpcRequest; + final LogDispatcher logDispatcher; + + ArchiveLogMessagesTask(Request rpcRequest, LogDispatcher logDispatcher) { + this.rpcRequest = rpcRequest; + this.logDispatcher = logDispatcher; + } + + @Override + public void run() { + try { + byte compressionType = rpcRequest.parameters().get(0).asInt8(); + if (compressionType != 0) { + rpcRequest.setError(0, "Invalid compression type: " + compressionType); + rpcRequest.returnRequest(); + return; + } + int uncompressedSize = rpcRequest.parameters().get(1).asInt32(); + byte[] logRequestPayload = rpcRequest.parameters().get(2).asData(); + if (uncompressedSize != logRequestPayload.length) { + rpcRequest.setError(1, String.format("Invalid uncompressed size: got %d while data is of size %d ", uncompressedSize, logRequestPayload.length)); + rpcRequest.returnRequest(); + return; + } + logDispatcher.handle(ProtobufSerialization.fromLogRequest(logRequestPayload)); + rpcRequest.returnValues().add(new Int8Value((byte)0)); + byte[] responsePayload = ProtobufSerialization.toLogResponse(); + rpcRequest.returnValues().add(new Int32Value(responsePayload.length)); + rpcRequest.returnValues().add(new DataValue(responsePayload)); + rpcRequest.returnRequest(); + } catch (Exception e) { + log.log(Level.WARNING, e, () -> "Failed to handle log request: " + e.getMessage()); + } + } + } +} diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java b/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java new file mode 100644 index 00000000000..9065b47fa52 --- /dev/null +++ b/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java @@ -0,0 +1,130 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.logserver.protocol; + +import ai.vespa.logserver.protocol.protobuf.LogProtocol; +import com.google.protobuf.InvalidProtocolBufferException; +import com.yahoo.log.LogLevel; +import com.yahoo.log.LogMessage; + +import java.time.Instant; +import java.util.List; +import java.util.logging.Level; + +import static java.util.stream.Collectors.toList; + +/** + * Utility class for serialization of log requests and responses. + * + * @author bjorncs + */ +class ProtobufSerialization { + + private ProtobufSerialization() {} + + static List<LogMessage> fromLogRequest(byte[] logRequestPayload) { + try { + LogProtocol.LogRequest logRequest = LogProtocol.LogRequest.parseFrom(logRequestPayload); + return logRequest.getLogMessagesList().stream() + .map(ProtobufSerialization::fromLogRequest) + .collect(toList()); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Unable to parse log request: " + e.getMessage(), e); + } + } + + static byte[] toLogRequest(List<LogMessage> logMessages) { + LogProtocol.LogRequest.Builder builder = LogProtocol.LogRequest.newBuilder(); + for (LogMessage logMessage : logMessages) { + builder.addLogMessages(toLogRequestMessage(logMessage)); + } + return builder.build().toByteArray(); + } + + static byte[] toLogResponse() { + return LogProtocol.LogResponse.newBuilder().build().toByteArray(); + } + + static void fromLogResponse(byte[] logResponsePayload) { + try { + LogProtocol.LogResponse.parseFrom(logResponsePayload); // log response is empty + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("Unable to parse log response: " + e.getMessage(), e); + } + } + + private static LogMessage fromLogRequest(LogProtocol.LogMessage message) { + return LogMessage.of( + Instant.ofEpochSecond(0, message.getTimeNanos()), + message.getHostname(), + message.getProcessId(), + message.getThreadId(), + message.getService(), + message.getComponent(), + fromLogMessageLevel(message.getLevel()), + message.getPayload()); + } + + private static LogProtocol.LogMessage toLogRequestMessage(LogMessage logMessage) { + Instant timestamp = logMessage.getTimestamp(); + long timestampNanos = timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano(); + return LogProtocol.LogMessage.newBuilder() + .setTimeNanos(timestampNanos) + .setHostname(logMessage.getHost()) + .setProcessId((int) logMessage.getProcessId()) + .setThreadId((int) logMessage.getThreadId().orElse(0)) + .setService(logMessage.getService()) + .setComponent(logMessage.getComponent()) + .setLevel(toLogMessageLevel(logMessage.getLevel())) + .setPayload(logMessage.getPayload()) + .build(); + } + + private static Level fromLogMessageLevel(LogProtocol.LogMessage.Level level) { + switch (level) { + case FATAL: + return LogLevel.FATAL; + case ERROR: + return LogLevel.ERROR; + case WARNING: + return LogLevel.WARNING; + case CONFIG: + return LogLevel.CONFIG; + case INFO: + return LogLevel.INFO; + case EVENT: + return LogLevel.EVENT; + case DEBUG: + return LogLevel.DEBUG; + case SPAM: + return LogLevel.SPAM; + case UNKNOWN: + case UNRECOGNIZED: + default: + return LogLevel.UNKNOWN; + } + } + + private static LogProtocol.LogMessage.Level toLogMessageLevel(Level level) { + Level vespaLevel = LogLevel.getVespaLogLevel(level); + if (vespaLevel.equals(LogLevel.FATAL)) { + return LogProtocol.LogMessage.Level.FATAL; + } else if (vespaLevel.equals(LogLevel.ERROR)) { + return LogProtocol.LogMessage.Level.ERROR; + } else if (vespaLevel.equals(LogLevel.WARNING)) { + return LogProtocol.LogMessage.Level.WARNING; + } else if (vespaLevel.equals(LogLevel.CONFIG)) { + return LogProtocol.LogMessage.Level.CONFIG; + } else if (vespaLevel.equals(LogLevel.INFO)) { + return LogProtocol.LogMessage.Level.INFO; + } else if (vespaLevel.equals(LogLevel.EVENT)) { + return LogProtocol.LogMessage.Level.EVENT; + } else if (vespaLevel.equals(LogLevel.DEBUG)) { + return LogProtocol.LogMessage.Level.DEBUG; + } else if (vespaLevel.equals(LogLevel.SPAM)) { + return LogProtocol.LogMessage.Level.SPAM; + } else { + return LogProtocol.LogMessage.Level.UNKNOWN; + } + } + +} diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java b/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java new file mode 100644 index 00000000000..10ea00bbbac --- /dev/null +++ b/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java @@ -0,0 +1,44 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.logserver.protocol; + +import com.yahoo.jrt.Acceptor; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.logserver.LogDispatcher; + +/** + * A JRT based RPC server for handling log requests + * + * @author bjorncs + */ +public class RpcServer implements AutoCloseable { + + private final Supervisor supervisor = new Supervisor(new Transport()); + private final int listenPort; + private Acceptor acceptor; + + public RpcServer(int listenPort, LogDispatcher logDispatcher) { + this.listenPort = listenPort; + supervisor.addMethod(new ArchiveLogMessagesMethod(logDispatcher).methodDefinition()); + } + + public void start() { + try { + acceptor = supervisor.listen(new Spec(listenPort)); + } catch (ListenFailedException e) { + throw new RuntimeException(e); + } + } + + int listenPort() { + return acceptor.port(); + } + + @Override + public void close() { + acceptor.shutdown().join(); + supervisor.transport().shutdown().join(); + } +} diff --git a/logserver/src/main/java/com/yahoo/logserver/Server.java b/logserver/src/main/java/com/yahoo/logserver/Server.java index db01444ca67..efccf1d8f83 100644 --- a/logserver/src/main/java/com/yahoo/logserver/Server.java +++ b/logserver/src/main/java/com/yahoo/logserver/Server.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.logserver; +import ai.vespa.logserver.protocol.RpcServer; import com.yahoo.io.FatalErrorHandler; import com.yahoo.io.Listener; import com.yahoo.log.LogLevel; @@ -39,6 +40,7 @@ public class Server implements Runnable { LogSetup.initVespaLogging("ADM"); } + private static final int DEFAULT_RPC_LISTEN_PORT = 19080; // the port is a String because we want to use it as the default // value of a System.getProperty(). private static final String LISTEN_PORT = "19081"; @@ -46,6 +48,7 @@ public class Server implements Runnable { private int listenPort; private Listener listener; private final LogDispatcher dispatch; + private RpcServer rpcServer; private final boolean isInitialized; @@ -108,8 +111,9 @@ public class Server implements Runnable { * * @param listenPort The port on which the logserver accepts log * messages. + * @param rpcListenPort */ - public void initialize(int listenPort) { + public void initialize(int listenPort, int rpcListenPort) { if (isInitialized) { throw new IllegalStateException(APPNAME + " already initialized"); } @@ -123,6 +127,7 @@ public class Server implements Runnable { listener = new Listener(APPNAME); listener.addSelectLoopPostHook(dispatch); listener.setFatalErrorHandler(fatalErrorHandler); + rpcServer = new RpcServer(rpcListenPort, dispatch); } /** @@ -140,6 +145,8 @@ public class Server implements Runnable { log.fine("Starting listener..."); listener.start(); + log.fine("Starting rpc server..."); + rpcServer.start(); Event.started(APPNAME); try { listener.join(); @@ -164,6 +171,7 @@ public class Server implements Runnable { } } Event.stopping(APPNAME, "shutdown"); + rpcServer.close(); dispatch.close(); Event.stopped(APPNAME, 0, 0); System.exit(0); @@ -176,6 +184,7 @@ public class Server implements Runnable { static void help() { System.out.println(); System.out.println("System properties:"); + System.out.println(" - " + APPNAME + ".rpcListenPort (" + DEFAULT_RPC_LISTEN_PORT + ")"); System.out.println(" - " + APPNAME + ".listenport (" + LISTEN_PORT + ")"); System.out.println(" - " + APPNAME + ".queue.size (" + HandlerThread.DEFAULT_QUEUESIZE + ")"); System.out.println(); @@ -188,9 +197,10 @@ public class Server implements Runnable { } String portString = System.getProperty(APPNAME + ".listenport", LISTEN_PORT); + int rpcPort = Integer.parseInt(System.getProperty(APPNAME + ".rpcListenPort", Integer.toString(DEFAULT_RPC_LISTEN_PORT))); Server server = Server.getInstance(); server.setupSignalHandler(); - server.initialize(Integer.parseInt(portString)); + server.initialize(Integer.parseInt(portString), rpcPort); Thread t = new Thread(server, "logserver main"); t.start(); diff --git a/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java b/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java new file mode 100644 index 00000000000..321c21915fc --- /dev/null +++ b/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java @@ -0,0 +1,83 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.logserver.protocol; + +import com.yahoo.jrt.DataValue; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Int8Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.Transport; +import com.yahoo.jrt.Values; +import com.yahoo.log.LogLevel; +import com.yahoo.log.LogMessage; +import com.yahoo.logserver.LogDispatcher; +import org.junit.Test; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; + +import static ai.vespa.logserver.protocol.ProtobufSerialization.fromLogResponse; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * @author bjorncs + */ +public class RpcServerTest { + + private static final LogMessage MESSAGE_1 = + LogMessage.of(Instant.EPOCH.plus(1000, ChronoUnit.DAYS), "localhost", 12, 3456, "my-service", "my-component", LogLevel.ERROR, "My error message"); + private static final LogMessage MESSAGE_2 = + LogMessage.of(Instant.EPOCH.plus(5005, ChronoUnit.DAYS), "localhost", 12, 6543, "my-service", "my-component", LogLevel.INFO, "My info message"); + + @Test + public void server_dispatches_log_messages_from_log_request() { + List<LogMessage> messages = List.of(MESSAGE_1, MESSAGE_2); + LogDispatcher logDispatcher = mock(LogDispatcher.class); + try (RpcServer server = new RpcServer(0, logDispatcher)) { + server.start(); + try (TestClient client = new TestClient(server.listenPort())) { + client.logMessages(messages); + } + } + verify(logDispatcher).handle(new ArrayList<>(messages)); + } + + private static class TestClient implements AutoCloseable { + + private final Supervisor supervisor; + private final Target target; + + TestClient(int logserverPort) { + this.supervisor = new Supervisor(new Transport()); + this.target = supervisor.connectSync(new Spec(logserverPort)); + } + + void logMessages(List<LogMessage> messages) { + byte[] requestPayload = ProtobufSerialization.toLogRequest(messages); + Request request = new Request(ArchiveLogMessagesMethod.METHOD_NAME); + request.parameters().add(new Int8Value((byte)0)); + request.parameters().add(new Int32Value(requestPayload.length)); + request.parameters().add(new DataValue(requestPayload)); + target.invokeSync(request, 10/*seconds*/); + Values returnValues = request.returnValues(); + assertEquals(3, returnValues.size()); + assertEquals(0, returnValues.get(0).asInt8()); + byte[] responsePayload = returnValues.get(2).asData(); + assertEquals(responsePayload.length, returnValues.get(1).asInt32()); + fromLogResponse(responsePayload); // 'void' return type as current response message contains no data + } + + @Override + public void close() { + target.close(); + supervisor.transport().shutdown().join(); + } + } + +}
\ No newline at end of file diff --git a/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java b/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java index a97eaf69f84..e3793ee3057 100644 --- a/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java +++ b/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java @@ -5,12 +5,13 @@ import com.yahoo.log.LogSetup; import com.yahoo.logserver.handlers.LogHandler; import com.yahoo.logserver.handlers.logmetrics.LogMetricsPlugin; import com.yahoo.logserver.test.LogDispatcherTestCase; +import org.junit.Test; import java.io.IOException; -import org.junit.*; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Unit tests for the Server class. @@ -23,7 +24,7 @@ public class ServerTestCase { public void testStartupAndRegHandlers() throws IOException, InterruptedException { Server.help(); Server server = Server.getInstance(); - server.initialize(18322); + server.initialize(18322, 18323); // TODO Stop using hardcoded ports LogSetup.clearHandlers(); Thread serverThread = new Thread(server); serverThread.start(); |