diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 69 |
1 files changed, 48 insertions, 21 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 46f7fbb0b3c..e6d5bfc59ff 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,7 +5,6 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -16,6 +15,8 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,21 +54,25 @@ public class Spooler { private final Clock clock; private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); private final boolean keepSuccessFiles; + private final int maxFailures; + private final Map<File, Integer> failures = new ConcurrentHashMap<>(); public Spooler(Clock clock) { - this(clock, false); + this(clock, false, 1000); } - public Spooler(Clock clock, boolean keepSuccessFiles) { - this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles); + public Spooler(Clock clock, boolean keepSuccessFiles, int maxFailures) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles, maxFailures); } - public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles) { + // Note: Needs to be public, used in system tests + public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles, int maxFailures) { this.spoolPath = spoolPath; this.maxEntriesPerFile = maxEntriesPerFile; this.clock = clock; this.fileNameBase.set(newFileNameBase(clock)); this.keepSuccessFiles = keepSuccessFiles; + this.maxFailures = maxFailures; firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -105,34 +110,54 @@ public class Spooler { public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) { for (File f : files) { log.log(Level.FINE, "Processing file " + f); - boolean succcess = false; + boolean success = false; try { List<String> lines = Files.readAllLines(f.toPath()); for (String line : lines) { LoggerEntry entry = LoggerEntry.deserialize(line); log.log(Level.FINE, "Read entry " + entry + " from " + f); - succcess = transport.apply(entry); - if (! succcess) { - log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + success = transport.apply(entry); + if (! success) { + throw new RuntimeException("Unable to process file " + f + ": unsuccessful call to transport() for " + entry); } - }; - } catch (IOException e) { - throw new UncheckedIOException("Unable to process file " + f.toPath(), e); - // TODO: Move to failures path + } + failures.remove(f); + } catch (Exception e) { + handleFailure(f); } finally { - if (succcess && keepSuccessFiles) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - try { - Files.move(file, target); - } catch (IOException e) { - log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); - } + if (success) { + if (keepSuccessFiles) + moveProcessedFile(f, successesPath); + else + try { + Files.delete(f.toPath()); + } catch (IOException e) { + log.log(Level.WARNING, "Unable to delete file " + f, e); + } } } } } + private void handleFailure(File file) { + failures.putIfAbsent(file, 0); + var failCount = failures.compute(file, (f, count) -> count + 1); + if (failCount > maxFailures) { + log.log(Level.WARNING, "Unable to process file " + file + " after trying " + maxFailures + " times, moving it to " + failuresPath); + moveProcessedFile(file, failuresPath); + } + } + + private void moveProcessedFile(File f, Path path) { + Path file = f.toPath(); + Path target = spoolPath.resolve(path).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); + } + } + public Path processingPath() { return processingPath; } public Path readyPath() { return readyPath; } public Path successesPath() { return successesPath; } @@ -178,6 +203,8 @@ public class Spooler { switchFileIfNeeded(file, fileName); } + Map<File, Integer> failures() { return failures; } + private synchronized void switchFileIfNeeded(Path file, String fileName) throws IOException { if (file.toFile().exists() && (entryCounter.get() >= maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { |