aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java74
2 files changed, 55 insertions, 33 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
index 692de3e6cef..69f37226dae 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.search.Query;
import com.yahoo.slime.JsonDecoder;
import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -76,13 +77,13 @@ public class LoggerEntry {
// TODO: Rename method here and above? (serialize/deserialize)
// TODO: Use Slime or Jackson for both
- public static LoggerEntry fromJson(byte[] content) throws IOException {
+ public static LoggerEntry fromJson(String content) throws IOException {
var decoder = new JsonDecoder();
- var slime = decoder.decode(new Slime(), content);
+ var slime = decoder.decode(new Slime(), Utf8.toBytes(content));
var timestamp = slime.get().field("timestamp").asLong();
var query = new Query(slime.get().field("query").asString());
- var blob = slime.get().field("blob").asData();
+ var blob = slime.get().field("blob").asString();
return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob));
}
@@ -118,6 +119,13 @@ public class LoggerEntry {
return this;
}
+ public Builder blob(String blob) {
+ byte[] bytes = Utf8.toBytes(blob);
+ this.blob = ByteBuffer.allocate(bytes.length);
+ this.blob.put(bytes);
+ return this;
+ }
+
public boolean send() {
return logger.send(new LoggerEntry(this));
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 0b50985d974..5533320c907 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -5,12 +5,12 @@ import ai.vespa.validation.Validation;
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -19,7 +19,8 @@ import java.util.logging.Level;
import java.util.stream.Stream;
/**
- * Spooler that will write an entry to a file and read files that are ready to be sent
+ * Spooler that will write an entry to a file and read files that are ready to be sent.
+ * Files are written in JSON Lines text file format.
*
* @author hmusum
*/
@@ -28,22 +29,26 @@ public class Spooler {
private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
private static final Comparator<File> ordering = new TimestampCompare();
+ private static final int defaultMaxEntriesPerFile = 100;
private Path processingPath;
private Path readyPath;
private Path failuresPath;
private Path successesPath;
+ AtomicInteger entryCounter = new AtomicInteger(1);
AtomicInteger fileCounter = new AtomicInteger(1);
private final Path spoolPath;
+ private final int maxEntriesPerFile;
public Spooler() {
- this(defaultSpoolPath);
+ this(defaultSpoolPath, defaultMaxEntriesPerFile);
}
- public Spooler(Path spoolPath) {
+ public Spooler(Path spoolPath, int maxEntriesPerFile) {
this.spoolPath = spoolPath;
+ this.maxEntriesPerFile = maxEntriesPerFile;
createDirs(spoolPath);
}
@@ -77,16 +82,33 @@ public class Spooler {
return files;
}
- public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException {
+ public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) {
for (File f : files) {
- log.log(Level.INFO, "Found file " + f);
- var content = Files.readAllBytes(f.toPath());
- var entry = LoggerEntry.fromJson(content);
-
- if (transport.apply(entry)) {
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- Files.move(file, target);
+ log.log(Level.FINE, "Processing file " + f);
+ boolean succcess = false;
+ try {
+ List<String> lines = Files.readAllLines(f.toPath());
+ for (String line : lines) {
+ LoggerEntry entry = LoggerEntry.fromJson(line);
+ log.log(Level.FINE, "Read entry " + entry + " from " + f);
+ succcess = transport.apply(entry);
+ if (! succcess) {
+ log.log(Level.WARNING, "unsuccessful call to transport() for " + entry);
+ }
+ };
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to process file " + f.toPath(), e);
+ // TODO: Move to failures path
+ } finally {
+ if (succcess) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ try {
+ Files.move(file, target);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
}
}
@@ -96,19 +118,6 @@ public class Spooler {
public Path successesPath() { return successesPath; }
public Path failuresPath() { return failuresPath; }
- List<File> getDirectories(File[] files) {
- List<File> fileList = new ArrayList<>();
-
- for (File f : files) {
- if (f.isDirectory()) {
- fileList.add(f);
- }
- }
-
- Collections.sort(fileList);
- return fileList;
- }
-
List<File> getFiles(List<Path> files, int count) {
Validation.requireAtLeast(count, "count must be a positive number", 1);
List<File> fileList = new ArrayList<>();
@@ -133,11 +142,16 @@ public class Spooler {
String fileName = String.valueOf(fileCounter);
Path file = spoolPath.resolve(processingPath).resolve(fileName);
try {
- Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName);
+ Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
- log.log(Level.INFO, "Moving file from " + file + " to " + target);
- Files.move(file, target);
- fileCounter.addAndGet(1);
+
+ if (entryCounter.incrementAndGet() > maxEntriesPerFile) {
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ entryCounter.set(1);
+ fileCounter.incrementAndGet();
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}