diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 48 |
1 files changed, 36 insertions, 12 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 2193d414458..e80437de0d3 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -10,10 +10,14 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; import java.util.stream.Stream; @@ -30,25 +34,32 @@ public class Spooler { private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator<File> ordering = new TimestampCompare(); private static final int defaultMaxEntriesPerFile = 100; + // Max delay between first write to a file and when we should close file and move it for further processing + static final Duration maxDelayAfterFirstWrite = Duration.ofSeconds(10); private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + // Count of entries for the file that is currently being written to AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; private final int maxEntriesPerFile; + private final Clock clock; + private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); - public Spooler() { - this(defaultSpoolPath, defaultMaxEntriesPerFile); + public Spooler(Clock clock) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock); } - public Spooler(Path spoolPath, int maxEntriesPerFile) { + public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock) { this.spoolPath = spoolPath; this.maxEntriesPerFile = maxEntriesPerFile; + this.clock = clock; + firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -106,7 +117,7 @@ public class Spooler { try { Files.move(file, target); } catch (IOException e) { - throw new UncheckedIOException(e); + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target); } } } @@ -144,19 +155,32 @@ public class Spooler { try { log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName); Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); - Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - - if (entryCounter.incrementAndGet() > maxEntriesPerFile) { - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - entryCounter.set(1); - fileCounter.incrementAndGet(); - } + firstWriteTimestamp.compareAndExchange(Instant.EPOCH, clock.instant()); + entryCounter.incrementAndGet(); + switchFileIfNeeded(file, fileName); } catch (IOException e) { throw new RuntimeException(e); } } + void switchFileIfNeeded() throws IOException { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + switchFileIfNeeded(file, fileName); + } + + private void switchFileIfNeeded(Path file, String fileName) throws IOException { + if (file.toFile().exists() + && (entryCounter.get() > maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Finished writing file " + file + " moving it to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + firstWriteTimestamp.set(Instant.EPOCH); + } + } + private void createDirs(Path spoolerPath) { processingPath = createDir(spoolerPath.resolve("processing")); readyPath = createDir(spoolerPath.resolve("ready")); |