aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2023-06-06 13:30:30 +0200
committerHarald Musum <musum@yahooinc.com>2023-06-06 13:30:30 +0200
commit09d00888d1cb60a92ba75b4707262e08987f1bad (patch)
tree599f0fefd708120031c51921e86f55e206623fba /container-search/src/main/java/com/yahoo/search/logging/Spooler.java
parentd0c8f93a7b9dae9acae144857176c1dd1a375c6f (diff)
Track failures and move file to failures path if more than max failures
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/Spooler.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java53
1 files changed, 36 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 46f7fbb0b3c..862be3848f2 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -5,7 +5,6 @@ import ai.vespa.validation.Validation;
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -16,6 +15,8 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,21 +54,24 @@ public class Spooler {
private final Clock clock;
private final AtomicReference<Instant> firstWriteTimestamp = new AtomicReference<>();
private final boolean keepSuccessFiles;
+ private final int maxFailures;
+ private final Map<File, Integer> failures = new ConcurrentHashMap<>();
public Spooler(Clock clock) {
- this(clock, false);
+ this(clock, false, 1000);
}
- public Spooler(Clock clock, boolean keepSuccessFiles) {
- this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles);
+ public Spooler(Clock clock, boolean keepSuccessFiles, int maxFailures) {
+ this(defaultSpoolPath, defaultMaxEntriesPerFile, clock, keepSuccessFiles, maxFailures);
}
- public Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles) {
+ Spooler(Path spoolPath, int maxEntriesPerFile, Clock clock, boolean keepSuccessFiles, int maxFailures) {
this.spoolPath = spoolPath;
this.maxEntriesPerFile = maxEntriesPerFile;
this.clock = clock;
this.fileNameBase.set(newFileNameBase(clock));
this.keepSuccessFiles = keepSuccessFiles;
+ this.maxFailures = maxFailures;
firstWriteTimestamp.set(Instant.EPOCH);
createDirs(spoolPath);
}
@@ -113,26 +117,39 @@ public class Spooler {
log.log(Level.FINE, "Read entry " + entry + " from " + f);
succcess = transport.apply(entry);
if (! succcess) {
- log.log(Level.WARNING, "unsuccessful call to transport() for " + entry);
+ throw new RuntimeException("Unable to process file " + f + ": unsuccessful call to transport() for " + entry);
}
- };
- } catch (IOException e) {
- throw new UncheckedIOException("Unable to process file " + f.toPath(), e);
- // TODO: Move to failures path
+ }
+ failures.remove(f);
+ } catch (Exception e) {
+ handleFailure(f);
} finally {
if (succcess && keepSuccessFiles) {
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- try {
- Files.move(file, target);
- } catch (IOException e) {
- log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e);
- }
+ moveProcessedFile(f, successesPath);
}
}
}
}
+ private void handleFailure(File file) {
+ failures.putIfAbsent(file, 0);
+ var failCount = failures.compute(file, (f, count) -> count + 1);
+ if (failCount > maxFailures) {
+ log.log(Level.WARNING, "Unable to process file " + file + " after trying " + maxFailures + " times, moving it to " + failuresPath);
+ moveProcessedFile(file, failuresPath);
+ }
+ }
+
+ private void moveProcessedFile(File f, Path path) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(path).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ try {
+ Files.move(file, target);
+ } catch (IOException e) {
+ log.log(Level.SEVERE, "Unable to move processed file " + file + " to " + target, e);
+ }
+ }
+
public Path processingPath() { return processingPath; }
public Path readyPath() { return readyPath; }
public Path successesPath() { return successesPath; }
@@ -178,6 +195,8 @@ public class Spooler {
switchFileIfNeeded(file, fileName);
}
+ Map<File, Integer> failures() { return failures; }
+
private synchronized void switchFileIfNeeded(Path file, String fileName) throws IOException {
if (file.toFile().exists()
&& (entryCounter.get() >= maxEntriesPerFile || firstWriteTimestamp.get().plus(maxDelayAfterFirstWrite).isBefore(clock.instant()))) {