aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2022-10-05 13:16:55 +0200
committerGitHub <noreply@github.com>2022-10-05 13:16:55 +0200
commit3533daf30eb3fd99a74a3c6c4edd895b8888fc72 (patch)
tree12e8994d61a47d60ed0b9cc43683732968651ba2
parent302a5fe305ffde6594668d60385f8643e523a91f (diff)
parente7f127984207ffeacef5c5a18d0bfae976b98e6a (diff)
Merge pull request #24311 from vespa-engine/hmusum/add-spooler
Add spooler for log entries
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java64
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java20
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java175
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java107
5 files changed, 365 insertions, 12 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
index cfcbf2cab45..5cb3aa24f98 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -1,12 +1,68 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
package com.yahoo.search.logging;
-abstract class AbstractSpoolingLogger extends AbstractThreadedLogger {
+import com.yahoo.concurrent.DaemonThreadFactory;
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+/**
+ * Abstract class that deals with storing event entries on disk and making sure all stored
+ * entries are eventually sent
+ *
+ * @author hmusum
+ */
+abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements Runnable {
+
+ protected static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+
+ private static final ScheduledExecutorService executorService =
+ new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("AbstractSpoolingLogger-send-"));
+
+ protected final Spooler spooler;
+
+ public AbstractSpoolingLogger() {
+ this(new Spooler());
+ }
+
+ public AbstractSpoolingLogger(Spooler spooler) {
+ this.spooler = spooler;
+ executorService.scheduleWithFixedDelay(this, 0, 10L, TimeUnit.MILLISECONDS);
+ }
+
+ public void run() {
+ try {
+ var entries = spooler.processFiles();
+ log.log(Level.INFO, "Entries: " + entries.size());
+ entries.forEach(this::transport);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
@Override
- protected void dequeue(LoggerEntry entry) {
- // Todo: add to spooler etc
+ public boolean send(LoggerEntry entry) {
+ log.log(Level.INFO, "Sending");
+ try {
+ executor.execute(() -> spooler.write(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ // TODO Call from a component or make this class a component
+ public void shutdown() {
+ executorService.shutdown();
+ try {
+ if ( ! executorService.awaitTermination(10, TimeUnit.SECONDS))
+ log.log(Level.WARNING, "Timeout elapsed waiting for termination");
+ } catch (InterruptedException e) {
+ log.log(Level.WARNING, "Failure when waiting for termination: " + e.getMessage());
+ }
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
index 5c14109b26e..1df236a037f 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -2,23 +2,22 @@
package com.yahoo.search.logging;
-import org.slf4j.LoggerFactory;
-
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
abstract class AbstractThreadedLogger implements Logger {
- private final static org.slf4j.Logger log = LoggerFactory.getLogger(AbstractThreadedLogger.class);
+ private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName());
final static int DEFAULT_MAX_THREADS = 1;
final static int DEFAULT_QUEUE_SIZE = 1000;
- private final WorkerThreadExecutor executor;
+ protected final WorkerThreadExecutor executor;
AbstractThreadedLogger() {
this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
@@ -68,13 +67,13 @@ abstract class AbstractThreadedLogger implements Logger {
try {
super.run();
} catch (Exception e) {
- log.error(String.format("Error while sending logger entry: %s", e), e);
+ log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e);
}
}
}
- private static class WorkerThreadExecutor implements Executor {
+ protected static class WorkerThreadExecutor implements Executor {
protected final ThreadPoolExecutor executor;
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
index 44eec6e64a0..692de3e6cef 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -6,7 +6,8 @@ import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.search.Query;
-
+import com.yahoo.slime.JsonDecoder;
+import com.yahoo.slime.Slime;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -73,6 +74,19 @@ public class LoggerEntry {
}
}
+ // TODO: Rename method here and above? (serialize/deserialize)
+ // TODO: Use Slime or Jackson for both
+ public static LoggerEntry fromJson(byte[] content) throws IOException {
+ var decoder = new JsonDecoder();
+ var slime = decoder.decode(new Slime(), content);
+
+ var timestamp = slime.get().field("timestamp").asLong();
+ var query = new Query(slime.get().field("query").asString());
+ var blob = slime.get().field("blob").asData();
+
+ return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob));
+ }
+
public static class Builder {
private final Logger logger;
@@ -81,6 +95,9 @@ public class LoggerEntry {
private Query query;
private ByteBuffer blob;
+ // For testing
+ public Builder() { this(entry -> false); }
+
public Builder(Logger logger) {
this.logger = logger;
}
@@ -107,5 +124,4 @@ public class LoggerEntry {
}
-
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
new file mode 100644
index 00000000000..8fa51a0b47c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -0,0 +1,175 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import ai.vespa.validation.Validation;
+import com.yahoo.vespa.defaults.Defaults;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.stream.Stream;
+
+/**
+ * Spooler that will write an entry to a file and read files that are ready to be sent
+ *
+ * @author hmusum
+ */
+public class Spooler {
+
+ private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
+ private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
+ private static final Comparator<File> ordering = new TimestampCompare();
+
+ private Path processingPath;
+ private Path readyPath;
+ private Path failuresPath;
+ private Path successesPath;
+
+ AtomicInteger fileCounter = new AtomicInteger(1);
+
+ private final Path spoolPath;
+
+ public Spooler() {
+ this(defaultSpoolPath);
+ }
+
+ public Spooler(Path spoolPath) {
+ this.spoolPath = spoolPath;
+ createDirs(spoolPath);
+ }
+
+ public boolean write(LoggerEntry entry) {
+ writeEntry(entry);
+
+ return true;
+ }
+
+ public List<LoggerEntry> processFiles() throws IOException {
+ List<Path> files = listFilesInPath(readyPath);
+
+ if (files.size() == 0) {
+ log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath());
+ return List.of();
+ }
+ log.log(Level.FINE, "Files in ready path: " + files.size());
+
+ List<File> fileList = getFiles(files, 50); // TODO
+ if ( ! fileList.isEmpty()) {
+ return processFiles(fileList);
+ }
+
+ return List.of();
+ }
+
+ List<Path> listFilesInPath(Path path) throws IOException {
+ List<Path> files;
+ System.out.println("Path " + path + " exists: " + path.toFile().exists());
+ try (Stream<Path> stream = Files.list(path)) {
+ files = stream.toList();
+ // TODO: Or check if stream is empty
+ } catch (NoSuchFileException e) {
+ return List.of(); // No files, this is OK
+ }
+ return files;
+ }
+
+ public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException {
+ ArrayList<LoggerEntry> entries = new ArrayList<>();
+ for (File f : files) {
+ log.log(Level.INFO, "Found file " + f);
+ var content = Files.readAllBytes(f.toPath());
+ var entry = LoggerEntry.fromJson(content);
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ Files.move(file, target);
+ entries.add(entry);
+ }
+ return entries;
+ }
+
+ public Path processingPath() { return processingPath; }
+ public Path readyPath() { return readyPath; }
+ public Path successesPath() { return successesPath; }
+ public Path failuresPath() { return failuresPath; }
+
+ List<File> getDirectories(File[] files) {
+ List<File> fileList = new ArrayList<>();
+
+ for (File f : files) {
+ if (f.isDirectory()) {
+ fileList.add(f);
+ }
+ }
+
+ Collections.sort(fileList);
+ return fileList;
+ }
+
+ List<File> getFiles(List<Path> files, int count) {
+ Validation.requireAtLeast(count, "count must be a positive number", 1);
+ List<File> fileList = new ArrayList<>();
+
+ for (Path p : files) {
+ File f = p.toFile();
+ if (!f.isDirectory()) {
+ fileList.add(f);
+ }
+
+ // Grab only some files
+ if (fileList.size() > count) {
+ break;
+ }
+ }
+
+ fileList.sort(ordering);
+ return fileList;
+ }
+
+ private void writeEntry(LoggerEntry entry) {
+ String fileName = String.valueOf(fileCounter);
+ Path file = spoolPath.resolve(processingPath).resolve(fileName);
+ try {
+ Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ fileCounter.addAndGet(1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createDirs(Path spoolerPath) {
+ processingPath = createDir(spoolerPath.resolve("processing"));
+ readyPath = createDir(spoolerPath.resolve("ready"));
+ failuresPath = createDir(spoolerPath.resolve("failures"));
+ successesPath = createDir(spoolerPath.resolve("successes"));
+ }
+
+ private static Path createDir(Path path) {
+ File file = path.toFile();
+ if (file.exists() && file.canRead() && file.canWrite()) {
+ log.log(Level.INFO, "Directory " + path + " already exists");
+ } else if (file.mkdirs()) {
+ log.log(Level.INFO, "Created " + path);
+ } else {
+ log.log(Level.WARNING, "Could not create " + path + ", please check permissions");
+ }
+ return path;
+ }
+
+ private static class TimestampCompare implements Comparator<File> {
+ public int compare(File a, File b) {
+ return (int) (a.lastModified() - b.lastModified());
+ }
+ }
+
+}
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
new file mode 100644
index 00000000000..abd3b27b70b
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -0,0 +1,107 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.logging;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SpoolerTest {
+
+ @TempDir
+ Path tempDir;
+
+ @Test
+ public void testSpoolingLogger() throws IOException {
+ Path spoolDir = tempDir.resolve("spool");
+
+ Spooler spooler = new Spooler(spoolDir);
+
+ TestLogger logger = new TestLogger(spooler);
+ assertTrue(logger.newEntry()
+ .blob("Yo entry".getBytes())
+ .send());
+ assertTrue(logger.newEntry()
+ .blob("Yo entry 2".getBytes())
+ .send());
+
+ Path readyPath = spooler.readyPath();
+ Path readyFile1 = readyPath.resolve("1");
+ waitUntilFileExists(readyFile1);
+ Path readyFile2 = readyPath.resolve("2");
+ waitUntilFileExists(readyFile2);
+
+ // Check content after being moved to ready path
+ String content = Files.readString(readyFile1);
+ assertTrue(content.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes())));
+ assertTrue(Files.readString(readyFile2).contains(Base64.getEncoder().encodeToString("Yo entry 2".getBytes())));
+
+ // Process files (read, transport files)
+ logger.manualRun();
+ assertEquals(2, logger.entriesSent());
+
+ // No files in processing or ready, 2 files in successes
+ assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size());
+ assertEquals(0, spooler.listFilesInPath(readyPath).size());
+ assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size());
+ assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+ }
+
+ private void waitUntilFileExists(Path path) {
+ Instant end = Instant.now().plus(Duration.ofSeconds(1));
+ while (!path.toFile().exists() && Instant.now().isBefore(end)) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ assertTrue(path.toFile().exists());
+ }
+
+
+ private static class TestLogger extends AbstractSpoolingLogger {
+
+ private final List<LoggerEntry> entriesSent = new ArrayList<>();
+
+ public TestLogger(Spooler spooler) {
+ super(spooler);
+ }
+
+ @Override
+ void transport(LoggerEntry entry) {
+ System.out.println("Called transport()");
+ entriesSent.add(entry);
+ }
+
+ @Override
+ public void run() {
+ // Do nothing, use manualRun
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return spooler.write(entry);
+ }
+
+ public void manualRun() {
+ super.run();
+ }
+
+ int entriesSent() {
+ return entriesSent.size();
+ }
+
+ }
+
+}