diff options
author | Harald Musum <musum@yahooinc.com> | 2023-06-06 13:30:30 +0200 |
---|---|---|
committer | Harald Musum <musum@yahooinc.com> | 2023-06-06 13:30:30 +0200 |
commit | 09d00888d1cb60a92ba75b4707262e08987f1bad (patch) | |
tree | 599f0fefd708120031c51921e86f55e206623fba /container-search/src/main/java/com/yahoo/search/logging/Spooler.java | |
parent | d0c8f93a7b9dae9acae144857176c1dd1a375c6f (diff) |
Track failures and move file to failures path if more than max failures
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/logging/Spooler.java | 53 |
1 files changed, 36 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 46f7fbb0b3c..862be3848f2 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,7 +5,6 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -16,6 +15,8 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,21 +54,24 @@ public class Spooler { private final Clock clock; private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>(); private final boolean keepSuccessFiles; + private final int maxFailures; + private final Map<File, Integer> failures = new ConcurrentHashMap<>(); public Spooler(Clock clock) { - this(clock, false); + this(clock, false, 1000); } - public Spooler(Clock clock, boolean keepSuccessFiles) { - this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles); + public Spooler(Clock clock, boolean keepSuccessFiles, int maxFailures) { + this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles, maxFailures); } - public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles) { + Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles, int maxFailures) { this.spoolPath = spoolPath; this.maxEntriesPerFile = maxEntriesPerFile; this.clock = clock; this.fileNameBase.set(newFileNameBase(clock)); this.keepSuccessFiles = keepSuccessFiles; + this.maxFailures = maxFailures; firstWriteTimestamp.set(Instant.EPOCH); createDirs(spoolPath); } @@ -113,26 +117,39 @@ public class Spooler { log.log(Level.FINE, "Read entry " + entry + " from " + f); succcess = transport.apply(entry); if (! succcess) { - log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + throw new RuntimeException("Unable to process file " + f + ": unsuccessful call to transport() for " + entry); } - }; - } catch (IOException e) { - throw new UncheckedIOException("Unable to process file " + f.toPath(), e); - // TODO: Move to failures path + } + failures.remove(f); + } catch (Exception e) { + handleFailure(f); } finally { if (succcess && keepSuccessFiles) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - try { - Files.move(file, target); - } catch (IOException e) { - log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); - } + moveProcessedFile(f, successesPath); } } } } + private void handleFailure(File file) { + failures.putIfAbsent(file, 0); + var failCount = failures.compute(file, (f, count) -> count + 1); + if (failCount > maxFailures) { + log.log(Level.WARNING, "Unable to process file " + file + " after trying " + maxFailures + " times, moving it to " + failuresPath); + moveProcessedFile(file, failuresPath); + } + } + + private void moveProcessedFile(File f, Path path) { + Path file = f.toPath(); + Path target = spoolPath.resolve(path).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e); + } + } + public Path processingPath() { return processingPath; } public Path readyPath() { return readyPath; } public Path successesPath() { return successesPath; } @@ -178,6 +195,8 @@ public class Spooler { switchFileIfNeeded(file, fileName); } + Map<File, Integer> failures() { return failures; } + private synchronized void switchFileIfNeeded(Path file, String fileName) throws IOException { if (file.toFile().exists() && (entryCounter.get() >= maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) { |