diff options
author | Harald Musum <musum@yahooinc.com> | 2022-10-19 09:14:39 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2022-10-19 09:14:39 +0200 |
commit | ad237aba91d05c81d28be2bce185e1eab2ba9bd0 (patch) | |
tree | 5887fff189b15fc577bd5c3f31b84c6c5c2254d9 /container-search/src/main/java/com/yahoo/search/logging | |
parent | 733cb216bc1519e34749a5e358f7e12c962c96d6 (diff) |
Switch to a new file if some time after first write has passed
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java | 4 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 48 |
2 files changed, 39 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java index c9c6546711a..3569a28f563 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java @@ -3,6 +3,7 @@ package com.yahoo.search.logging; import com.yahoo.concurrent.DaemonThreadFactory; import java.io.IOException; +import java.time.Clock; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -25,7 +26,7 @@ public abstract class AbstractSpoolingLogger extends AbstractThreadedLogger impl protected final Spooler spooler; public AbstractSpoolingLogger() { - this(new Spooler()); + this(new Spooler(Clock.systemUTC())); } public AbstractSpoolingLogger(Spooler spooler) { @@ -35,6 +36,7 @@ public abstract class AbstractSpoolingLogger extends AbstractThreadedLogger impl public void run() { try { + spooler.switchFileIfNeeded(); spooler.processFiles(this::transport); } catch (IOException e) { e.printStackTrace(); diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 2193d414458..e80437de0d3 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -10,10 +10,14 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; import java.util.stream.Stream; @@ -30,25 +34,32 @@ public class Spooler { private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator<File> ordering = new TimestampCompare(); private static final int defaultMaxEntriesPerFile = 100; + // Max delay between first write to a file and when we should close file and move it for further processing + static final Duration maxDelayAfterFirstWrite = Duration.ofSeconds(10); private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + // Count of entries for the file that is currently being written to AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; private final int maxEntriesPerFile; + private final Clock clock; + private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); - public Spooler() { - this(defaultSpoolPath, defaultMaxEntriesPerFile); + public Spooler(Clock clock) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock); } - public Spooler(Path spoolPath, int maxEntriesPerFile) { + public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock) { this.spoolPath = spoolPath; this.maxEntriesPerFile = maxEntriesPerFile; + this.clock = clock; + firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -106,7 +117,7 @@ public class Spooler { try { Files.move(file, target); } catch (IOException e) { - throw new UncheckedIOException(e); + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target); } } } @@ -144,19 +155,32 @@ public class Spooler { try { log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName); Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); - Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - - if (entryCounter.incrementAndGet() > maxEntriesPerFile) { - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - entryCounter.set(1); - fileCounter.incrementAndGet(); - } + firstWriteTimestamp.compareAndExchange(Instant.EPOCH, clock.instant()); + entryCounter.incrementAndGet(); + switchFileIfNeeded(file, fileName); } catch (IOException e) { throw new RuntimeException(e); } } + void switchFileIfNeeded() throws IOException { + String fileName = String.valueOf(fileCounter); + Path file = spoolPath.resolve(processingPath).resolve(fileName); + switchFileIfNeeded(file, fileName); + } + + private void switchFileIfNeeded(Path file, String fileName) throws IOException { + if (file.toFile().exists() + && (entryCounter.get() > maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { + Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); + log.log(Level.INFO, "Finished writing file " + file + " moving it to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + firstWriteTimestamp.set(Instant.EPOCH); + } + } + private void createDirs(Path spoolerPath) { processingPath = createDir(spoolerPath.resolve("processing")); readyPath = createDir(spoolerPath.resolve("ready")); |