aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-10-15 13:44:31 +0200
committerHarald Musum <musum@yahooinc.com>2022-10-15 13:44:31 +0200
commit682be49b09b9b5d2585c79f62d75305daa8054bd (patch)
treeaa9671dae9e751616b4986a581bec7d704540130 /container-search/src/main/java/com/yahoo/search/logging/Spooler.java
parente4443f6c23edcd520ca97a453086a22bc8f94131 (diff)
Write several entries into files using JSONL file format
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java74
1 files changed, 44 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 0b50985d974..5533320c907 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -5,12 +5,12 @@ import ai.vespa.validation.Validation;
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -19,7 +19,8 @@ import java.util.logging.Level;
import java.util.stream.Stream;
/**
- * Spooler that will write an entry to a file and read files that are ready to be sent
+ * Spooler that will write an entry to a file and read files that are ready to be sent.
+ * Files are written in JSON Lines text file format.
*
* @author hmusum
*/
@@ -28,22 +29,26 @@ public class Spooler {
private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
private static final Comparator<File> ordering = new TimestampCompare();
+ private static final int defaultMaxEntriesPerFile = 100;
private Path processingPath;
private Path readyPath;
private Path failuresPath;
private Path successesPath;
+ AtomicInteger entryCounter = new AtomicInteger(1);
AtomicInteger fileCounter = new AtomicInteger(1);
private final Path spoolPath;
+ private final int maxEntriesPerFile;
public Spooler() {
- this(defaultSpoolPath);
+ this(defaultSpoolPath, defaultMaxEntriesPerFile);
}
- public Spooler(Path spoolPath) {
+ public Spooler(Path spoolPath, int maxEntriesPerFile) {
this.spoolPath = spoolPath;
+ this.maxEntriesPerFile = maxEntriesPerFile;
createDirs(spoolPath);
}
@@ -77,16 +82,33 @@ public class Spooler {
return files;
}
- public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException {
+ public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) {
for (File f : files) {
- log.log(Level.INFO, "Found file " + f);
- var content = Files.readAllBytes(f.toPath());
- var entry = LoggerEntry.fromJson(content);
-
- if (transport.apply(entry)) {
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- Files.move(file, target);
+ log.log(Level.FINE, "Processing file " + f);
+ boolean succcess = false;
+ try {
+ List<String> lines = Files.readAllLines(f.toPath());
+ for (String line : lines) {
+ LoggerEntry entry = LoggerEntry.fromJson(line);
+ log.log(Level.FINE, "Read entry " + entry + " from " + f);
+ succcess = transport.apply(entry);
+ if (! succcess) {
+ log.log(Level.WARNING, "unsuccessful call to transport() for " + entry);
+ }
+ };
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to process file " + f.toPath(), e);
+ // TODO: Move to failures path
+ } finally {
+ if (succcess) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ try {
+ Files.move(file, target);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
}
}
@@ -96,19 +118,6 @@ public class Spooler {
public Path successesPath() { return successesPath; }
public Path failuresPath() { return failuresPath; }
- List<File> getDirectories(File[] files) {
- List<File> fileList = new ArrayList<>();
-
- for (File f : files) {
- if (f.isDirectory()) {
- fileList.add(f);
- }
- }
-
- Collections.sort(fileList);
- return fileList;
- }
-
List<File> getFiles(List<Path> files, int count) {
Validation.requireAtLeast(count, "count must be a positive number", 1);
List<File> fileList = new ArrayList<>();
@@ -133,11 +142,16 @@ public class Spooler {
String fileName = String.valueOf(fileCounter);
Path file = spoolPath.resolve(processingPath).resolve(fileName);
try {
- Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName);
+ Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
- log.log(Level.INFO, "Moving file from " + file + " to " + target);
- Files.move(file, target);
- fileCounter.addAndGet(1);
+
+ if (entryCounter.incrementAndGet() > maxEntriesPerFile) {
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ entryCounter.set(1);
+ fileCounter.incrementAndGet();
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}