aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java12
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java111
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Logger.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java96
-rw-r--r--container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def5
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java50
7 files changed, 314 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
new file mode 100644
index 00000000000..cfcbf2cab45
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -0,0 +1,12 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+abstract class AbstractSpoolingLogger extends AbstractThreadedLogger {
+
+ @Override
+ protected void dequeue(LoggerEntry entry) {
+ // Todo: add to spooler etc
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
new file mode 100644
index 00000000000..d061c2430cb
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -0,0 +1,111 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+abstract class AbstractThreadedLogger implements Logger {
+
+ private final static org.slf4j.Logger log = LoggerFactory.getLogger(AbstractThreadedLogger.class);
+
+ final static int DEFAULT_MAX_THREADS = 1;
+ final static int DEFAULT_QUEUE_SIZE = 1000;
+
+ private final WorkerThreadExecutor executor;
+
+ AbstractThreadedLogger() {
+ this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize) {
+ executor = new WorkerThreadExecutor(threads, queueSize);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) {
+ executor = new WorkerThreadExecutor(threads, queueSize, factory);
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return enqueue(entry);
+ }
+
+ protected boolean enqueue(LoggerEntry entry) {
+ // Todo: metric things
+ try {
+ executor.execute(() -> dequeue(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ protected void dequeue(LoggerEntry entry) {
+ transport(entry); // This happens in worker thread
+ }
+
+ /**
+ * Actually transports the entry to it's destination
+ */
+ abstract void transport(LoggerEntry entry);
+
+
+ private static class WorkerThread extends Thread {
+
+ public WorkerThread(Runnable r) {
+ super(r);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (Exception e) {
+ log.error(String.format("Error while sending to DH: %s", e), e);
+ }
+ }
+
+ }
+
+ private static class WorkerThreadExecutor implements Executor {
+
+ protected final ThreadPoolExecutor executor;
+
+ WorkerThreadExecutor(int threads, int queueSize) {
+ this(threads, queueSize, WorkerThread::new);
+ }
+
+ WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) {
+ executor = new ThreadPoolExecutor(
+ threads, threads,
+ 0L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ threadFactory);
+ }
+
+ public void close() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ executor.execute(r);
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
new file mode 100644
index 00000000000..6d815059066
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class LocalDiskLogger extends AbstractThreadedLogger {
+
+ private String logFilePath;
+
+ public LocalDiskLogger(LocalDiskLoggerConfig config) {
+ logFilePath = config.path();
+ }
+
+ @Override
+ void transport(LoggerEntry entry) {
+ String json = entry.toJson();
+ try (FileWriter fw = new FileWriter(logFilePath, true)) {
+ fw.write(json);
+ fw.write(System.getProperty("line.separator"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Logger.java b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
new file mode 100644
index 00000000000..3938b01c7b7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
@@ -0,0 +1,13 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+public interface Logger {
+
+ default LoggerEntry.Builder newEntry() {
+ return new LoggerEntry.Builder(this);
+ }
+
+ boolean send(LoggerEntry entry);
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
new file mode 100644
index 00000000000..4498f666091
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -0,0 +1,96 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.search.Query;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+
+public class LoggerEntry {
+
+ private final Long timestamp;
+ private final Query query;
+ private final ByteBuffer blob;
+
+ private LoggerEntry(Builder builder) {
+ timestamp = builder.timestamp; // or set automatically if not set
+ query = builder.query;
+ blob = builder.blob;
+ }
+
+ public Long timestamp() {
+ return timestamp;
+ }
+
+ public Query query() {
+ return query;
+ }
+
+ public ByteBuffer blob() {
+ return blob;
+ }
+
+ public String toString() {
+ return toJson();
+ }
+
+ public String toJson() {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array()));
+
+ g.writeEndObject();
+ g.close();
+ return out.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static class Builder {
+
+ private final Logger logger;
+
+ private Long timestamp;
+ private Query query;
+ private ByteBuffer blob;
+
+ public Builder(Logger logger) {
+ this.logger = logger;
+ }
+
+ public Builder timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder query(Query query) {
+ this.query = query;
+ return this;
+ }
+
+ public Builder blob(byte[] bytes) {
+ blob = ByteBuffer.allocate(bytes.length);
+ blob.put(bytes).limit(blob.position()).position(0);
+ return this;
+ }
+
+ public boolean send() {
+ return logger.send(new LoggerEntry(this));
+ }
+
+ }
+
+
+}
diff --git a/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def
new file mode 100644
index 00000000000..fc42c0dbd20
--- /dev/null
+++ b/container-search/src/main/resources/configdefinitions/search.logging.local-disk-logger.def
@@ -0,0 +1,5 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=search.logging
+
+## Path where logs should be put
+path string default=/dev/null
diff --git a/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java
new file mode 100644
index 00000000000..bf6938424d7
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/logging/LocalDiskLoggerTest.java
@@ -0,0 +1,50 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import com.yahoo.io.IOUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Base64;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class LocalDiskLoggerTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testLocalDiskLogger() throws InterruptedException, IOException {
+ File logFile = tempDir.resolve("localdisklogger.log").toFile();
+
+ LocalDiskLoggerConfig.Builder builder = new LocalDiskLoggerConfig.Builder();
+ builder.path(logFile.getAbsolutePath());
+ LocalDiskLogger logger = new LocalDiskLogger(builder.build());
+
+ logger.newEntry()
+ .blob("Yo entry".getBytes())
+ .send();
+ waitForFile(logFile);
+
+ String test = IOUtils.readAll(new FileReader(logFile));
+ assertTrue(test.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes())));
+ }
+
+ private void waitForFile(File file) throws InterruptedException {
+ int waitFor = 10;
+ while ( ! file.exists() && --waitFor > 0) {
+ Thread.sleep(10);
+ }
+ if ( ! file.exists()) {
+ fail("Local disk logger file was not created");
+ }
+ }
+
+}