diff options
author | Harald Musum <musum@yahooinc.com> | 2022-10-15 13:44:31 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-10-15 13:44:31 +0200 |
commit | 682be49b09b9b5d2585c79f62d75305daa8054bd (patch) | |
tree | aa9671dae9e751616b4986a581bec7d704540130 /container-search/src/main/java/com/yahoo/search/logging/Spooler.java | |
parent | e4443f6c23edcd520ca97a453086a22bc8f94131 (diff) |
Write several entries into files using JSONL file format
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 74 |
1 files changed, 44 insertions, 30 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 0b50985d974..5533320c907 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,12 +5,12 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -19,7 +19,8 @@ import java.util.logging.Level; import java.util.stream.Stream; /** - * Spooler that will write an entry to a file and read files that are ready to be sent + * Spooler that will write an entry to a file and read files that are ready to be sent. + * Files are written in JSON Lines text file format. * * @author hmusum */ @@ -28,22 +29,26 @@ public class Spooler { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator<File> ordering = new TimestampCompare(); + private static final int defaultMaxEntriesPerFile = 100; private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; + private final int maxEntriesPerFile; public Spooler() { - this(defaultSpoolPath); + this(defaultSpoolPath, defaultMaxEntriesPerFile); } - public Spooler(Path spoolPath) { + public Spooler(Path spoolPath, int maxEntriesPerFile) { this.spoolPath = spoolPath; + this.maxEntriesPerFile = maxEntriesPerFile; createDirs(spoolPath); } @@ -77,16 +82,33 @@ public class Spooler { return files; } - public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException { + public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) { for (File f : files) { - log.log(Level.INFO, "Found file " + f); - var content = Files.readAllBytes(f.toPath()); - var entry = LoggerEntry.fromJson(content); - - if (transport.apply(entry)) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - Files.move(file, target); + log.log(Level.FINE, "Processing file " + f); + boolean succcess = false; + try { + List<String> lines = Files.readAllLines(f.toPath()); + for (String line : lines) { + LoggerEntry entry = LoggerEntry.fromJson(line); + log.log(Level.FINE, "Read entry " + entry + " from " + f); + succcess = transport.apply(entry); + if (! succcess) { + log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + } + }; + } catch (IOException e) { + throw new UncheckedIOException("Unable to process file " + f.toPath(), e); + // TODO: Move to failures path + } finally { + if (succcess) { + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } } } @@ -96,19 +118,6 @@ public class Spooler { public Path successesPath() { return successesPath; } public Path failuresPath() { return failuresPath; } - List<File> getDirectories(File[] files) { - List<File> fileList = new ArrayList<>(); - - for (File f : files) { - if (f.isDirectory()) { - fileList.add(f); - } - } - - Collections.sort(fileList); - return fileList; - } - List<File> getFiles(List<Path> files, int count) { Validation.requireAtLeast(count, "count must be a positive number", 1); List<File> fileList = new ArrayList<>(); @@ -133,11 +142,16 @@ public class Spooler { String fileName = String.valueOf(fileCounter); Path file = spoolPath.resolve(processingPath).resolve(fileName); try { - Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName); + Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - fileCounter.addAndGet(1); + + if (entryCounter.incrementAndGet() > maxEntriesPerFile) { + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + } } catch (IOException e) { throw new RuntimeException(e); } |