summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java48
1 files changed, 36 insertions, 12 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 2193d414458..e80437de0d3 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -10,10 +10,14 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Stream;
@@ -30,25 +34,32 @@ public class Spooler {
private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
private static final Comparator<File> ordering = new TimestampCompare();
private static final int defaultMaxEntriesPerFile = 100;
+ // Max delay between first write to a file and when we should close file and move it for further processing
+ static final Duration maxDelayAfterFirstWrite = Duration.ofSeconds(10);
private Path processingPath;
private Path readyPath;
private Path failuresPath;
private Path successesPath;
+ // Count of entries for the file that is currently being written to
AtomicInteger entryCounter = new AtomicInteger(1);
AtomicInteger fileCounter = new AtomicInteger(1);
private final Path spoolPath;
private final int maxEntriesPerFile;
+ private final Clock clock;
+ private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>();
- public Spooler() {
- this(defaultSpoolPath, defaultMaxEntriesPerFile);
+ public Spooler(Clock clock) {
+ this(defaultSpoolPath, defaultMaxEntriesPerFile, clock);
}
- public Spooler(Path spoolPath, int maxEntriesPerFile) {
+ public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock) {
this.spoolPath = spoolPath;
this.maxEntriesPerFile = maxEntriesPerFile;
+ this.clock = clock;
+ firstWriteTimestamp.set(Instant.EPOCH);
createDirs(spoolPath);
}
@@ -106,7 +117,7 @@ public class Spooler {
try {
Files.move(file, target);
} catch (IOException e) {
- throw new UncheckedIOException(e);
+ log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target);
}
}
}
@@ -144,19 +155,32 @@ public class Spooler {
try {
log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName);
Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
- Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
-
- if (entryCounter.incrementAndGet() > maxEntriesPerFile) {
- log.log(Level.INFO, "Moving file from " + file + " to " + target);
- Files.move(file, target);
- entryCounter.set(1);
- fileCounter.incrementAndGet();
- }
+ firstWriteTimestamp.compareAndExchange(Instant.EPOCH, clock.instant());
+ entryCounter.incrementAndGet();
+ switchFileIfNeeded(file, fileName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ void switchFileIfNeeded() throws IOException {
+ String fileName = String.valueOf(fileCounter);
+ Path file = spoolPath.resolve(processingPath).resolve(fileName);
+ switchFileIfNeeded(file, fileName);
+ }
+
+ private void switchFileIfNeeded(Path file, String fileName) throws IOException {
+ if (file.toFile().exists()
+ && (entryCounter.get() > maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) {
+ Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
+ log.log(Level.INFO, "Finished writing file " + file + " moving it to " + target);
+ Files.move(file, target);
+ entryCounter.set(1);
+ fileCounter.incrementAndGet();
+ firstWriteTimestamp.set(Instant.EPOCH);
+ }
+ }
+
private void createDirs(Path spoolerPath) {
processingPath = createDir(spoolerPath.resolve("processing"));
readyPath = createDir(spoolerPath.resolve("ready"));