summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java68
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java110
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Logger.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java127
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java175
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/package-info.java6
7 files changed, 526 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
new file mode 100644
index 00000000000..5cb3aa24f98
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -0,0 +1,68 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+/**
+ * Abstract class that deals with storing event entries on disk and making sure all stored
+ * entries are eventually sent
+ *
+ * @author hmusum
+ */
+abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable {
+
+ protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+
+ private static final ScheduledExecutorService executorService =
+ new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("AbstractSpoolingLogger-send-"));
+
+ protected final Spooler spooler;
+
+ public AbstractSpoolingLogger() {
+ this(new Spooler());
+ }
+
+ public AbstractSpoolingLogger(Spooler spooler) {
+ this.spooler = spooler;
+ executorService.scheduleWithFixedDelay(this, 0, 10L, TimeUnit.MILLISECONDS);
+ }
+
+ public void run() {
+ try {
+ var entries = spooler.processFiles();
+ log.log(Level.INFO, "Entries: " + entries.size());
+ entries.forEach(this::transport);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ log.log(Level.INFO, "Sending");
+ try {
+ executor.execute(() -> spooler.write(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ // TODO Call from a component or make this class a component
+ public void shutdown() {
+ executorService.shutdown();
+ try {
+ if ( ! executorService.awaitTermination(10, TimeUnit.SECONDS))
+ log.log(Level.WARNING, "Timeout elapsed waiting for termination");
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Failure when waiting for termination: " + e.getMessage());
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
new file mode 100644
index 00000000000..1df236a037f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -0,0 +1,110 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+abstract class AbstractThreadedLogger implements Logger {
+
+ private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName());
+
+ final static int DEFAULT_MAX_THREADS = 1;
+ final static int DEFAULT_QUEUE_SIZE = 1000;
+
+ protected final WorkerThreadExecutor executor;
+
+ AbstractThreadedLogger() {
+ this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize) {
+ executor = new WorkerThreadExecutor(threads, queueSize);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) {
+ executor = new WorkerThreadExecutor(threads, queueSize, factory);
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return enqueue(entry);
+ }
+
+ protected boolean enqueue(LoggerEntry entry) {
+ // Todo: metric things
+ try {
+ executor.execute(() -> dequeue(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ protected void dequeue(LoggerEntry entry) {
+ transport(entry); // This happens in worker thread
+ }
+
+ /**
+ * Actually transports the entry to it's destination
+ */
+ abstract void transport(LoggerEntry entry);
+
+
+ private static class WorkerThread extends Thread {
+
+ public WorkerThread(Runnable r) {
+ super(r);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (Exception e) {
+ log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e);
+ }
+ }
+
+ }
+
+ protected static class WorkerThreadExecutor implements Executor {
+
+ protected final ThreadPoolExecutor executor;
+
+ WorkerThreadExecutor(int threads, int queueSize) {
+ this(threads, queueSize, WorkerThread::new);
+ }
+
+ WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) {
+ executor = new ThreadPoolExecutor(
+ threads, threads,
+ 0L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ threadFactory);
+ }
+
+ public void close() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ executor.execute(r);
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
new file mode 100644
index 00000000000..6d815059066
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class LocalDiskLogger extends AbstractThreadedLogger {
+
+ private String logFilePath;
+
+ public LocalDiskLogger(LocalDiskLoggerConfig config) {
+ logFilePath = config.path();
+ }
+
+ @Override
+ void transport(LoggerEntry entry) {
+ String json = entry.toJson();
+ try (FileWriter fw = new FileWriter(logFilePath, true)) {
+ fw.write(json);
+ fw.write(System.getProperty("line.separator"));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Logger.java b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
new file mode 100644
index 00000000000..3938b01c7b7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Logger.java
@@ -0,0 +1,13 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+public interface Logger {
+
+ default LoggerEntry.Builder newEntry() {
+ return new LoggerEntry.Builder(this);
+ }
+
+ boolean send(LoggerEntry entry);
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
new file mode 100644
index 00000000000..692de3e6cef
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -0,0 +1,127 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.yahoo.search.Query;
+import com.yahoo.slime.JsonDecoder;
+import com.yahoo.slime.Slime;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+
+public class LoggerEntry {
+
+ private final Long timestamp;
+ private final Query query;
+ private final ByteBuffer blob;
+
+ private LoggerEntry(Builder builder) {
+ timestamp = builder.timestamp; // or set automatically if not set
+ query = builder.query;
+ blob = builder.blob;
+ }
+
+ public Long timestamp() {
+ return timestamp;
+ }
+
+ public Query query() {
+ return query;
+ }
+
+ public String queryString() {
+ String queryString = null;
+ if (query != null) {
+ if (query.getHttpRequest() != null && query.getHttpRequest().getUri() != null) {
+ queryString = query.getHttpRequest().getUri().getPath();
+ if (query.getHttpRequest().getUri().getQuery() != null) {
+ queryString += "?" + query.getHttpRequest().getUri().getRawQuery();
+ }
+ }
+ }
+ return queryString;
+ }
+
+ public ByteBuffer blob() {
+ return blob;
+ }
+
+ public String toString() {
+ return toJson();
+ }
+
+ public String toJson() {
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
+ g.writeStartObject();
+
+ g.writeNumberField("timestamp", timestamp == null ? 0 : timestamp);
+ g.writeStringField("query", queryString());
+ g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array()));
+
+ g.writeEndObject();
+ g.close();
+ return out.toString();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // TODO: Rename method here and above? (serialize/deserialize)
+ // TODO: Use Slime or Jackson for both
+ public static LoggerEntry fromJson(byte[] content) throws IOException {
+ var decoder = new JsonDecoder();
+ var slime = decoder.decode(new Slime(), content);
+
+ var timestamp = slime.get().field("timestamp").asLong();
+ var query = new Query(slime.get().field("query").asString());
+ var blob = slime.get().field("blob").asData();
+
+ return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob));
+ }
+
+ public static class Builder {
+
+ private final Logger logger;
+
+ private Long timestamp;
+ private Query query;
+ private ByteBuffer blob;
+
+ // For testing
+ public Builder() { this(entry -> false); }
+
+ public Builder(Logger logger) {
+ this.logger = logger;
+ }
+
+ public Builder timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public Builder query(Query query) {
+ this.query = query;
+ return this;
+ }
+
+ public Builder blob(byte[] bytes) {
+ blob = ByteBuffer.allocate(bytes.length);
+ blob.put(bytes).limit(blob.position()).position(0);
+ return this;
+ }
+
+ public boolean send() {
+ return logger.send(new LoggerEntry(this));
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
new file mode 100644
index 00000000000..8fa51a0b47c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -0,0 +1,175 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import ai.vespa.validation.Validation;
+import com.yahoo.vespa.defaults.Defaults;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.stream.Stream;
+
+/**
+ * Spooler that will write an entry to a file and read files that are ready to be sent
+ *
+ * @author hmusum
+ */
+public class Spooler {
+
+ private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+ private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
+ private static final Comparator<File> ordering = new TimestampCompare();
+
+ private Path processingPath;
+ private Path readyPath;
+ private Path failuresPath;
+ private Path successesPath;
+
+ AtomicInteger fileCounter = new AtomicInteger(1);
+
+ private final Path spoolPath;
+
+ public Spooler() {
+ this(defaultSpoolPath);
+ }
+
+ public Spooler(Path spoolPath) {
+ this.spoolPath = spoolPath;
+ createDirs(spoolPath);
+ }
+
+ public boolean write(LoggerEntry entry) {
+ writeEntry(entry);
+
+ return true;
+ }
+
+ public List<LoggerEntry> processFiles() throws IOException {
+ List<Path> files = listFilesInPath(readyPath);
+
+ if (files.size() == 0) {
+ log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath());
+ return List.of();
+ }
+ log.log(Level.FINE, "Files in ready path: " + files.size());
+
+ List<File> fileList = getFiles(files, 50); // TODO
+ if ( ! fileList.isEmpty()) {
+ return processFiles(fileList);
+ }
+
+ return List.of();
+ }
+
+ List<Path> listFilesInPath(Path path) throws IOException {
+ List<Path> files;
+ System.out.println("Path " + path + " exists: " + path.toFile().exists());
+ try (Stream<Path> stream = Files.list(path)) {
+ files = stream.toList();
+ // TODO: Or check if stream is empty
+ } catch (NoSuchFileException e) {
+ return List.of(); // No files, this is OK
+ }
+ return files;
+ }
+
+ public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException {
+ ArrayList<LoggerEntry> entries = new ArrayList<>();
+ for (File f : files) {
+ log.log(Level.INFO, "Found file " + f);
+ var content = Files.readAllBytes(f.toPath());
+ var entry = LoggerEntry.fromJson(content);
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ Files.move(file, target);
+ entries.add(entry);
+ }
+ return entries;
+ }
+
+ public Path processingPath() { return processingPath; }
+ public Path readyPath() { return readyPath; }
+ public Path successesPath() { return successesPath; }
+ public Path failuresPath() { return failuresPath; }
+
+ List<File> getDirectories(File[] files) {
+ List<File> fileList = new ArrayList<>();
+
+ for (File f : files) {
+ if (f.isDirectory()) {
+ fileList.add(f);
+ }
+ }
+
+ Collections.sort(fileList);
+ return fileList;
+ }
+
+ List<File> getFiles(List<Path> files, int count) {
+ Validation.requireAtLeast(count, "count must be a positive number", 1);
+ List<File> fileList = new ArrayList<>();
+
+ for (Path p : files) {
+ File f = p.toFile();
+ if (!f.isDirectory()) {
+ fileList.add(f);
+ }
+
+ // Grab only some files
+ if (fileList.size() > count) {
+ break;
+ }
+ }
+
+ fileList.sort(ordering);
+ return fileList;
+ }
+
+ private void writeEntry(LoggerEntry entry) {
+ String fileName = String.valueOf(fileCounter);
+ Path file = spoolPath.resolve(processingPath).resolve(fileName);
+ try {
+ Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ fileCounter.addAndGet(1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createDirs(Path spoolerPath) {
+ processingPath = createDir(spoolerPath.resolve("processing"));
+ readyPath = createDir(spoolerPath.resolve("ready"));
+ failuresPath = createDir(spoolerPath.resolve("failures"));
+ successesPath = createDir(spoolerPath.resolve("successes"));
+ }
+
+ private static Path createDir(Path path) {
+ File file = path.toFile();
+ if (file.exists() && file.canRead() && file.canWrite()) {
+ log.log(Level.INFO, "Directory " + path + " already exists");
+ } else if (file.mkdirs()) {
+ log.log(Level.INFO, "Created " + path);
+ } else {
+ log.log(Level.WARNING, "Could not create " + path + ", please check permissions");
+ }
+ return path;
+ }
+
+ private static class TimestampCompare implements Comparator<File> {
+ public int compare(File a, File b) {
+ return (int) (a.lastModified() - b.lastModified());
+ }
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/package-info.java b/container-search/src/main/java/com/yahoo/search/logging/package-info.java
new file mode 100644
index 00000000000..e2bc14f4faf
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/package-info.java
@@ -0,0 +1,6 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+@ExportPackage
+package com.yahoo.search.logging;
+
+import com.yahoo.osgi.annotation.ExportPackage;