summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-10-15 13:44:31 +0200
committerHarald Musum <musum@yahooinc.com>2022-10-15 13:44:31 +0200
commit682be49b09b9b5d2585c79f62d75305daa8054bd (patch)
treeaa9671dae9e751616b4986a581bec7d704540130
parente4443f6c23edcd520ca97a453086a22bc8f94131 (diff)
Write several entries into files using JSONL file format
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java74
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java60
3 files changed, 108 insertions, 40 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
index 692de3e6cef..69f37226dae 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.yahoo.search.Query;
import com.yahoo.slime.JsonDecoder;
import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -76,13 +77,13 @@ public class LoggerEntry {
// TODO: Rename method here and above? (serialize/deserialize)
// TODO: Use Slime or Jackson for both
- public static LoggerEntry fromJson(byte[] content) throws IOException {
+ public static LoggerEntry fromJson(String content) throws IOException {
var decoder = new JsonDecoder();
- var slime = decoder.decode(new Slime(), content);
+ var slime = decoder.decode(new Slime(), Utf8.toBytes(content));
var timestamp = slime.get().field("timestamp").asLong();
var query = new Query(slime.get().field("query").asString());
- var blob = slime.get().field("blob").asData();
+ var blob = slime.get().field("blob").asString();
return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob));
}
@@ -118,6 +119,13 @@ public class LoggerEntry {
return this;
}
+ public Builder blob(String blob) {
+ byte[] bytes = Utf8.toBytes(blob);
+ this.blob = ByteBuffer.allocate(bytes.length);
+ this.blob.put(bytes);
+ return this;
+ }
+
public boolean send() {
return logger.send(new LoggerEntry(this));
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 0b50985d974..5533320c907 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -5,12 +5,12 @@ import ai.vespa.validation.Validation;
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -19,7 +19,8 @@ import java.util.logging.Level;
import java.util.stream.Stream;
/**
- * Spooler that will write an entry to a file and read files that are ready to be sent
+ * Spooler that will write an entry to a file and read files that are ready to be sent.
+ * Files are written in JSON Lines text file format.
*
* @author hmusum
*/
@@ -28,22 +29,26 @@ public class Spooler {
private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName());
private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events"));
private static final Comparator<File> ordering = new TimestampCompare();
+ private static final int defaultMaxEntriesPerFile = 100;
private Path processingPath;
private Path readyPath;
private Path failuresPath;
private Path successesPath;
+ AtomicInteger entryCounter = new AtomicInteger(1);
AtomicInteger fileCounter = new AtomicInteger(1);
private final Path spoolPath;
+ private final int maxEntriesPerFile;
public Spooler() {
- this(defaultSpoolPath);
+ this(defaultSpoolPath, defaultMaxEntriesPerFile);
}
- public Spooler(Path spoolPath) {
+ public Spooler(Path spoolPath, int maxEntriesPerFile) {
this.spoolPath = spoolPath;
+ this.maxEntriesPerFile = maxEntriesPerFile;
createDirs(spoolPath);
}
@@ -77,16 +82,33 @@ public class Spooler {
return files;
}
- public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException {
+ public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) {
for (File f : files) {
- log.log(Level.INFO, "Found file " + f);
- var content = Files.readAllBytes(f.toPath());
- var entry = LoggerEntry.fromJson(content);
-
- if (transport.apply(entry)) {
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- Files.move(file, target);
+ log.log(Level.FINE, "Processing file " + f);
+ boolean succcess = false;
+ try {
+ List<String> lines = Files.readAllLines(f.toPath());
+ for (String line : lines) {
+ LoggerEntry entry = LoggerEntry.fromJson(line);
+ log.log(Level.FINE, "Read entry " + entry + " from " + f);
+ succcess = transport.apply(entry);
+ if (! succcess) {
+ log.log(Level.WARNING, "unsuccessful call to transport() for " + entry);
+ }
+ };
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to process file " + f.toPath(), e);
+ // TODO: Move to failures path
+ } finally {
+ if (succcess) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ try {
+ Files.move(file, target);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
}
}
@@ -96,19 +118,6 @@ public class Spooler {
public Path successesPath() { return successesPath; }
public Path failuresPath() { return failuresPath; }
- List<File> getDirectories(File[] files) {
- List<File> fileList = new ArrayList<>();
-
- for (File f : files) {
- if (f.isDirectory()) {
- fileList.add(f);
- }
- }
-
- Collections.sort(fileList);
- return fileList;
- }
-
List<File> getFiles(List<Path> files, int count) {
Validation.requireAtLeast(count, "count must be a positive number", 1);
List<File> fileList = new ArrayList<>();
@@ -133,11 +142,16 @@ public class Spooler {
String fileName = String.valueOf(fileCounter);
Path file = spoolPath.resolve(processingPath).resolve(fileName);
try {
- Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
+ log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName);
+ Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName);
- log.log(Level.INFO, "Moving file from " + file + " to " + target);
- Files.move(file, target);
- fileCounter.addAndGet(1);
+
+ if (entryCounter.incrementAndGet() > maxEntriesPerFile) {
+ log.log(Level.INFO, "Moving file from " + file + " to " + target);
+ Files.move(file, target);
+ entryCounter.set(1);
+ fileCounter.incrementAndGet();
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
index af371b7da93..2899f5b67a0 100644
--- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -24,7 +24,8 @@ public class SpoolerTest {
public void testSpoolingLogger() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- Spooler spooler = new Spooler(spoolDir);
+ int maxEntriesPerFile = 1;
+ Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile);
TestLogger logger = new TestLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -45,16 +46,61 @@ public class SpoolerTest {
assertEquals(2, logger.entriesSent());
// No files in processing or ready, 2 files in successes
- assertEquals(0, spooler.listFilesInPath(spooler.processingPath()).size());
- assertEquals(0, spooler.listFilesInPath(readyPath).size());
- assertEquals(2, spooler.listFilesInPath(spooler.successesPath()).size());
- assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
+ assertProcessedFiles(spooler, 0);
+ assertReadyFiles(spooler, 0);
+ assertSuccessFiles(spooler, 2);
+ assertFailureFiles(spooler, 0);
+ }
+
+ @Test
+ public void testSpoolingManyEntriesPerFile() throws IOException {
+ Path spoolDir = tempDir.resolve("spool");
+
+ int maxEntriesPerFile = 2;
+ Spooler spooler = new Spooler(spoolDir, maxEntriesPerFile);
+
+ TestLogger logger = new TestLogger(spooler);
+ assertTrue(sendEntry(logger, "Yo entry"));
+ assertTrue(sendEntry(logger, "Yo entry 2"));
+
+ Path readyPath = spooler.readyPath();
+ Path readyFile1 = readyPath.resolve("1");
+ waitUntilFileExists(readyFile1);
+
+ // Check content after being moved to ready path
+ assertContent(readyFile1, "Yo entry");
+
+ // Process files (read, transport files)
+ logger.manualRun();
+ assertEquals(2, logger.entriesSent());
+
+ // No files in processing or ready, 1 file in successes
+ assertProcessedFiles(spooler, 0);
+ assertReadyFiles(spooler, 0);
+ assertSuccessFiles(spooler, 1);
+ assertFailureFiles(spooler, 0);
+ }
+
+ private void assertProcessedFiles(Spooler spooler, int expected) throws IOException {
+ assertEquals(expected, spooler.listFilesInPath(spooler.processingPath()).size());
+ }
+
+ private void assertReadyFiles(Spooler spooler, int expected) throws IOException {
+ assertEquals(expected, spooler.listFilesInPath(spooler.readyPath()).size());
+ }
+
+ private void assertSuccessFiles(Spooler spooler, int expected) throws IOException {
+ assertEquals(expected, spooler.listFilesInPath(spooler.successesPath()).size());
+ }
+
+ private void assertFailureFiles(Spooler spooler, int expected) throws IOException {
+ assertEquals(expected, spooler.listFilesInPath(spooler.failuresPath()).size());
}
@Test
public void failingToTransportIsRetried() throws IOException {
Path spoolDir = tempDir.resolve("spool");
- Spooler spooler = new Spooler(spoolDir);
+ Spooler spooler = new Spooler(spoolDir, 1);
FailingToTransportSecondEntryLogger logger = new FailingToTransportSecondEntryLogger(spooler);
assertTrue(sendEntry(logger, "Yo entry"));
@@ -85,7 +131,7 @@ public class SpoolerTest {
}
}
- assertTrue(path.toFile().exists());
+ assertTrue(path.toFile().exists(), path.toFile() + " does not exits");
}
private void assertContent(Path file, String expectedContent) throws IOException {