summaryrefslogtreecommitdiffstats
path: root/logserver
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2019-03-27 14:26:39 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2019-03-27 14:37:33 +0100
commitbbc7aecaa48a73827a64cb3e355e85481c0bc63c (patch)
tree64c19eca15506b56d7c28d43892e8649348aacdd /logserver
parent634983cc739b8f34cc85867fa200f889cbfda9c9 (diff)
Add rpc interface to logserver
Diffstat (limited to 'logserver')
-rw-r--r--logserver/pom.xml5
-rw-r--r--logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java88
-rw-r--r--logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java130
-rw-r--r--logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java44
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/Server.java14
-rw-r--r--logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java83
-rw-r--r--logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java9
7 files changed, 367 insertions, 6 deletions
diff --git a/logserver/pom.xml b/logserver/pom.xml
index a813558580c..dca3c321453 100644
--- a/logserver/pom.xml
+++ b/logserver/pom.xml
@@ -46,6 +46,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java b/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java
new file mode 100644
index 00000000000..64d81610cdb
--- /dev/null
+++ b/logserver/src/main/java/ai/vespa/logserver/protocol/ArchiveLogMessagesMethod.java
@@ -0,0 +1,88 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.logserver.protocol;
+
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.logserver.LogDispatcher;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * RPC method that archives incoming log messages
+ *
+ * @author bjorncs
+ */
+class ArchiveLogMessagesMethod {
+
+ static final String METHOD_NAME = "vespa.logserver.archiveLogMessages";
+
+ private static final Logger log = Logger.getLogger(ArchiveLogMessagesMethod.class.getName());
+
+ private final Executor executor = Executors.newSingleThreadExecutor();
+ private final LogDispatcher logDispatcher;
+ private final Method method;
+
+ ArchiveLogMessagesMethod(LogDispatcher logDispatcher) {
+ this.logDispatcher = logDispatcher;
+ this.method = new Method(METHOD_NAME, "bix", "bix", this::log)
+ .methodDesc("Archive log messages")
+ .paramDesc(0, "compressionType", "Compression type (0=raw)")
+ .paramDesc(1, "uncompressedSize", "Uncompressed size (0 if compression type 'raw')")
+ .paramDesc(2, "logRequest", "Log request encoded with protobuf")
+ .returnDesc(0, "compressionType", "Compression type (0=raw)")
+ .returnDesc(1, "uncompressedSize", "Uncompressed size (0 if compression type 'raw')")
+ .returnDesc(2, "logResponse", "Log response encoded with protobuf");
+ }
+
+ Method methodDefinition() {
+ return method;
+ }
+
+ private void log(Request rpcRequest) {
+ rpcRequest.detach();
+ executor.execute(new ArchiveLogMessagesTask(rpcRequest, logDispatcher));
+ }
+
+ private static class ArchiveLogMessagesTask implements Runnable {
+ final Request rpcRequest;
+ final LogDispatcher logDispatcher;
+
+ ArchiveLogMessagesTask(Request rpcRequest, LogDispatcher logDispatcher) {
+ this.rpcRequest = rpcRequest;
+ this.logDispatcher = logDispatcher;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte compressionType = rpcRequest.parameters().get(0).asInt8();
+ if (compressionType != 0) {
+ rpcRequest.setError(0, "Invalid compression type: " + compressionType);
+ rpcRequest.returnRequest();
+ return;
+ }
+ int uncompressedSize = rpcRequest.parameters().get(1).asInt32();
+ byte[] logRequestPayload = rpcRequest.parameters().get(2).asData();
+ if (uncompressedSize != logRequestPayload.length) {
+ rpcRequest.setError(1, String.format("Invalid uncompressed size: got %d while data is of size %d ", uncompressedSize, logRequestPayload.length));
+ rpcRequest.returnRequest();
+ return;
+ }
+ logDispatcher.handle(ProtobufSerialization.fromLogRequest(logRequestPayload));
+ rpcRequest.returnValues().add(new Int8Value((byte)0));
+ byte[] responsePayload = ProtobufSerialization.toLogResponse();
+ rpcRequest.returnValues().add(new Int32Value(responsePayload.length));
+ rpcRequest.returnValues().add(new DataValue(responsePayload));
+ rpcRequest.returnRequest();
+ } catch (Exception e) {
+ log.log(Level.WARNING, e, () -> "Failed to handle log request: " + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java b/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java
new file mode 100644
index 00000000000..9065b47fa52
--- /dev/null
+++ b/logserver/src/main/java/ai/vespa/logserver/protocol/ProtobufSerialization.java
@@ -0,0 +1,130 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.logserver.protocol;
+
+import ai.vespa.logserver.protocol.protobuf.LogProtocol;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.logging.Level;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Utility class for serialization of log requests and responses.
+ *
+ * @author bjorncs
+ */
+class ProtobufSerialization {
+
+ private ProtobufSerialization() {}
+
+ static List<LogMessage> fromLogRequest(byte[] logRequestPayload) {
+ try {
+ LogProtocol.LogRequest logRequest = LogProtocol.LogRequest.parseFrom(logRequestPayload);
+ return logRequest.getLogMessagesList().stream()
+ .map(ProtobufSerialization::fromLogRequest)
+ .collect(toList());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException("Unable to parse log request: " + e.getMessage(), e);
+ }
+ }
+
+ static byte[] toLogRequest(List<LogMessage> logMessages) {
+ LogProtocol.LogRequest.Builder builder = LogProtocol.LogRequest.newBuilder();
+ for (LogMessage logMessage : logMessages) {
+ builder.addLogMessages(toLogRequestMessage(logMessage));
+ }
+ return builder.build().toByteArray();
+ }
+
+ static byte[] toLogResponse() {
+ return LogProtocol.LogResponse.newBuilder().build().toByteArray();
+ }
+
+ static void fromLogResponse(byte[] logResponsePayload) {
+ try {
+ LogProtocol.LogResponse.parseFrom(logResponsePayload); // log response is empty
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException("Unable to parse log response: " + e.getMessage(), e);
+ }
+ }
+
+ private static LogMessage fromLogRequest(LogProtocol.LogMessage message) {
+ return LogMessage.of(
+ Instant.ofEpochSecond(0, message.getTimeNanos()),
+ message.getHostname(),
+ message.getProcessId(),
+ message.getThreadId(),
+ message.getService(),
+ message.getComponent(),
+ fromLogMessageLevel(message.getLevel()),
+ message.getPayload());
+ }
+
+ private static LogProtocol.LogMessage toLogRequestMessage(LogMessage logMessage) {
+ Instant timestamp = logMessage.getTimestamp();
+ long timestampNanos = timestamp.getEpochSecond() * 1_000_000_000L + timestamp.getNano();
+ return LogProtocol.LogMessage.newBuilder()
+ .setTimeNanos(timestampNanos)
+ .setHostname(logMessage.getHost())
+ .setProcessId((int) logMessage.getProcessId())
+ .setThreadId((int) logMessage.getThreadId().orElse(0))
+ .setService(logMessage.getService())
+ .setComponent(logMessage.getComponent())
+ .setLevel(toLogMessageLevel(logMessage.getLevel()))
+ .setPayload(logMessage.getPayload())
+ .build();
+ }
+
+ private static Level fromLogMessageLevel(LogProtocol.LogMessage.Level level) {
+ switch (level) {
+ case FATAL:
+ return LogLevel.FATAL;
+ case ERROR:
+ return LogLevel.ERROR;
+ case WARNING:
+ return LogLevel.WARNING;
+ case CONFIG:
+ return LogLevel.CONFIG;
+ case INFO:
+ return LogLevel.INFO;
+ case EVENT:
+ return LogLevel.EVENT;
+ case DEBUG:
+ return LogLevel.DEBUG;
+ case SPAM:
+ return LogLevel.SPAM;
+ case UNKNOWN:
+ case UNRECOGNIZED:
+ default:
+ return LogLevel.UNKNOWN;
+ }
+ }
+
+ private static LogProtocol.LogMessage.Level toLogMessageLevel(Level level) {
+ Level vespaLevel = LogLevel.getVespaLogLevel(level);
+ if (vespaLevel.equals(LogLevel.FATAL)) {
+ return LogProtocol.LogMessage.Level.FATAL;
+ } else if (vespaLevel.equals(LogLevel.ERROR)) {
+ return LogProtocol.LogMessage.Level.ERROR;
+ } else if (vespaLevel.equals(LogLevel.WARNING)) {
+ return LogProtocol.LogMessage.Level.WARNING;
+ } else if (vespaLevel.equals(LogLevel.CONFIG)) {
+ return LogProtocol.LogMessage.Level.CONFIG;
+ } else if (vespaLevel.equals(LogLevel.INFO)) {
+ return LogProtocol.LogMessage.Level.INFO;
+ } else if (vespaLevel.equals(LogLevel.EVENT)) {
+ return LogProtocol.LogMessage.Level.EVENT;
+ } else if (vespaLevel.equals(LogLevel.DEBUG)) {
+ return LogProtocol.LogMessage.Level.DEBUG;
+ } else if (vespaLevel.equals(LogLevel.SPAM)) {
+ return LogProtocol.LogMessage.Level.SPAM;
+ } else {
+ return LogProtocol.LogMessage.Level.UNKNOWN;
+ }
+ }
+
+}
diff --git a/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java b/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java
new file mode 100644
index 00000000000..10ea00bbbac
--- /dev/null
+++ b/logserver/src/main/java/ai/vespa/logserver/protocol/RpcServer.java
@@ -0,0 +1,44 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.logserver.protocol;
+
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.logserver.LogDispatcher;
+
+/**
+ * A JRT based RPC server for handling log requests
+ *
+ * @author bjorncs
+ */
+public class RpcServer implements AutoCloseable {
+
+ private final Supervisor supervisor = new Supervisor(new Transport());
+ private final int listenPort;
+ private Acceptor acceptor;
+
+ public RpcServer(int listenPort, LogDispatcher logDispatcher) {
+ this.listenPort = listenPort;
+ supervisor.addMethod(new ArchiveLogMessagesMethod(logDispatcher).methodDefinition());
+ }
+
+ public void start() {
+ try {
+ acceptor = supervisor.listen(new Spec(listenPort));
+ } catch (ListenFailedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ int listenPort() {
+ return acceptor.port();
+ }
+
+ @Override
+ public void close() {
+ acceptor.shutdown().join();
+ supervisor.transport().shutdown().join();
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/Server.java b/logserver/src/main/java/com/yahoo/logserver/Server.java
index db01444ca67..efccf1d8f83 100644
--- a/logserver/src/main/java/com/yahoo/logserver/Server.java
+++ b/logserver/src/main/java/com/yahoo/logserver/Server.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.logserver;
+import ai.vespa.logserver.protocol.RpcServer;
import com.yahoo.io.FatalErrorHandler;
import com.yahoo.io.Listener;
import com.yahoo.log.LogLevel;
@@ -39,6 +40,7 @@ public class Server implements Runnable {
LogSetup.initVespaLogging("ADM");
}
+ private static final int DEFAULT_RPC_LISTEN_PORT = 19080;
// the port is a String because we want to use it as the default
// value of a System.getProperty().
private static final String LISTEN_PORT = "19081";
@@ -46,6 +48,7 @@ public class Server implements Runnable {
private int listenPort;
private Listener listener;
private final LogDispatcher dispatch;
+ private RpcServer rpcServer;
private final boolean isInitialized;
@@ -108,8 +111,9 @@ public class Server implements Runnable {
*
* @param listenPort The port on which the logserver accepts log
* messages.
+ * @param rpcListenPort
*/
- public void initialize(int listenPort) {
+ public void initialize(int listenPort, int rpcListenPort) {
if (isInitialized) {
throw new IllegalStateException(APPNAME + " already initialized");
}
@@ -123,6 +127,7 @@ public class Server implements Runnable {
listener = new Listener(APPNAME);
listener.addSelectLoopPostHook(dispatch);
listener.setFatalErrorHandler(fatalErrorHandler);
+ rpcServer = new RpcServer(rpcListenPort, dispatch);
}
/**
@@ -140,6 +145,8 @@ public class Server implements Runnable {
log.fine("Starting listener...");
listener.start();
+ log.fine("Starting rpc server...");
+ rpcServer.start();
Event.started(APPNAME);
try {
listener.join();
@@ -164,6 +171,7 @@ public class Server implements Runnable {
}
}
Event.stopping(APPNAME, "shutdown");
+ rpcServer.close();
dispatch.close();
Event.stopped(APPNAME, 0, 0);
System.exit(0);
@@ -176,6 +184,7 @@ public class Server implements Runnable {
static void help() {
System.out.println();
System.out.println("System properties:");
+ System.out.println(" - " + APPNAME + ".rpcListenPort (" + DEFAULT_RPC_LISTEN_PORT + ")");
System.out.println(" - " + APPNAME + ".listenport (" + LISTEN_PORT + ")");
System.out.println(" - " + APPNAME + ".queue.size (" + HandlerThread.DEFAULT_QUEUESIZE + ")");
System.out.println();
@@ -188,9 +197,10 @@ public class Server implements Runnable {
}
String portString = System.getProperty(APPNAME + ".listenport", LISTEN_PORT);
+ int rpcPort = Integer.parseInt(System.getProperty(APPNAME + ".rpcListenPort", Integer.toString(DEFAULT_RPC_LISTEN_PORT)));
Server server = Server.getInstance();
server.setupSignalHandler();
- server.initialize(Integer.parseInt(portString));
+ server.initialize(Integer.parseInt(portString), rpcPort);
Thread t = new Thread(server, "logserver main");
t.start();
diff --git a/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java b/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java
new file mode 100644
index 00000000000..321c21915fc
--- /dev/null
+++ b/logserver/src/test/java/ai/vespa/logserver/protocol/RpcServerTest.java
@@ -0,0 +1,83 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.logserver.protocol;
+
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Target;
+import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.Values;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.LogDispatcher;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+
+import static ai.vespa.logserver.protocol.ProtobufSerialization.fromLogResponse;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * @author bjorncs
+ */
+public class RpcServerTest {
+
+ private static final LogMessage MESSAGE_1 =
+ LogMessage.of(Instant.EPOCH.plus(1000, ChronoUnit.DAYS), "localhost", 12, 3456, "my-service", "my-component", LogLevel.ERROR, "My error message");
+ private static final LogMessage MESSAGE_2 =
+ LogMessage.of(Instant.EPOCH.plus(5005, ChronoUnit.DAYS), "localhost", 12, 6543, "my-service", "my-component", LogLevel.INFO, "My info message");
+
+ @Test
+ public void server_dispatches_log_messages_from_log_request() {
+ List<LogMessage> messages = List.of(MESSAGE_1, MESSAGE_2);
+ LogDispatcher logDispatcher = mock(LogDispatcher.class);
+ try (RpcServer server = new RpcServer(0, logDispatcher)) {
+ server.start();
+ try (TestClient client = new TestClient(server.listenPort())) {
+ client.logMessages(messages);
+ }
+ }
+ verify(logDispatcher).handle(new ArrayList<>(messages));
+ }
+
+ private static class TestClient implements AutoCloseable {
+
+ private final Supervisor supervisor;
+ private final Target target;
+
+ TestClient(int logserverPort) {
+ this.supervisor = new Supervisor(new Transport());
+ this.target = supervisor.connectSync(new Spec(logserverPort));
+ }
+
+ void logMessages(List<LogMessage> messages) {
+ byte[] requestPayload = ProtobufSerialization.toLogRequest(messages);
+ Request request = new Request(ArchiveLogMessagesMethod.METHOD_NAME);
+ request.parameters().add(new Int8Value((byte)0));
+ request.parameters().add(new Int32Value(requestPayload.length));
+ request.parameters().add(new DataValue(requestPayload));
+ target.invokeSync(request, 10/*seconds*/);
+ Values returnValues = request.returnValues();
+ assertEquals(3, returnValues.size());
+ assertEquals(0, returnValues.get(0).asInt8());
+ byte[] responsePayload = returnValues.get(2).asData();
+ assertEquals(responsePayload.length, returnValues.get(1).asInt32());
+ fromLogResponse(responsePayload); // 'void' return type as current response message contains no data
+ }
+
+ @Override
+ public void close() {
+ target.close();
+ supervisor.transport().shutdown().join();
+ }
+ }
+
+} \ No newline at end of file
diff --git a/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java b/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java
index a97eaf69f84..e3793ee3057 100644
--- a/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java
+++ b/logserver/src/test/java/com/yahoo/logserver/ServerTestCase.java
@@ -5,12 +5,13 @@ import com.yahoo.log.LogSetup;
import com.yahoo.logserver.handlers.LogHandler;
import com.yahoo.logserver.handlers.logmetrics.LogMetricsPlugin;
import com.yahoo.logserver.test.LogDispatcherTestCase;
+import org.junit.Test;
import java.io.IOException;
-import org.junit.*;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Unit tests for the Server class.
@@ -23,7 +24,7 @@ public class ServerTestCase {
public void testStartupAndRegHandlers() throws IOException, InterruptedException {
Server.help();
Server server = Server.getInstance();
- server.initialize(18322);
+ server.initialize(18322, 18323); // TODO Stop using hardcoded ports
LogSetup.clearHandlers();
Thread serverThread = new Thread(server);
serverThread.start();