From 682be49b09b9b5d2585c79f62d75305daa8054bd Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Sat, 15 Oct 2022 13:44:31 +0200 Subject: Write several entries into files using JSONL file format --- .../java/com/yahoo/search/logging/LoggerEntry.java | 14 +++- .../java/com/yahoo/search/logging/Spooler.java | 74 +++++++++++++--------- 2 files changed, 55 insertions(+), 33 deletions(-) (limited to 'container-search/src/main/java/com/yahoo/search/logging') diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java index 692de3e6cef..69f37226dae 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.search.Query; import com.yahoo.slime.JsonDecoder; import com.yahoo.slime.Slime; +import com.yahoo.text.Utf8; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; @@ -76,13 +77,13 @@ public class LoggerEntry { // TODO: Rename method here and above? (serialize/deserialize) // TODO: Use Slime or Jackson for both - public static LoggerEntry fromJson(byte[] content) throws IOException { + public static LoggerEntry fromJson(String content) throws IOException { var decoder = new JsonDecoder(); - var slime = decoder.decode(new Slime(), content); + var slime = decoder.decode(new Slime(), Utf8.toBytes(content)); var timestamp = slime.get().field("timestamp").asLong(); var query = new Query(slime.get().field("query").asString()); - var blob = slime.get().field("blob").asData(); + var blob = slime.get().field("blob").asString(); return new LoggerEntry(new Builder().timestamp(timestamp).query(query).blob(blob)); } @@ -118,6 +119,13 @@ public class LoggerEntry { return this; } + public Builder blob(String blob) { + byte[] bytes = Utf8.toBytes(blob); + this.blob = ByteBuffer.allocate(bytes.length); + this.blob.put(bytes); + return this; + } + public boolean send() { return logger.send(new LoggerEntry(this)); } diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 0b50985d974..5533320c907 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -5,12 +5,12 @@ import ai.vespa.validation.Validation; import com.yahoo.vespa.defaults.Defaults; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -19,7 +19,8 @@ import java.util.logging.Level; import java.util.stream.Stream; /** - * Spooler that will write an entry to a file and read files that are ready to be sent + * Spooler that will write an entry to a file and read files that are ready to be sent. + * Files are written in JSON Lines text file format. * * @author hmusum */ @@ -28,22 +29,26 @@ public class Spooler { private static final java.util.logging.Logger log = java.util.logging.Logger.getLogger(Spooler.class.getName()); private static final Path defaultSpoolPath = Path.of(Defaults.getDefaults().underVespaHome("var/spool/vespa/events")); private static final Comparator ordering = new TimestampCompare(); + private static final int defaultMaxEntriesPerFile = 100; private Path processingPath; private Path readyPath; private Path failuresPath; private Path successesPath; + AtomicInteger entryCounter = new AtomicInteger(1); AtomicInteger fileCounter = new AtomicInteger(1); private final Path spoolPath; + private final int maxEntriesPerFile; public Spooler() { - this(defaultSpoolPath); + this(defaultSpoolPath, defaultMaxEntriesPerFile); } - public Spooler(Path spoolPath) { + public Spooler(Path spoolPath, int maxEntriesPerFile) { this.spoolPath = spoolPath; + this.maxEntriesPerFile = maxEntriesPerFile; createDirs(spoolPath); } @@ -77,16 +82,33 @@ public class Spooler { return files; } - public void processFiles(List files, Function transport) throws IOException { + public void processFiles(List files, Function transport) { for (File f : files) { - log.log(Level.INFO, "Found file " + f); - var content = Files.readAllBytes(f.toPath()); - var entry = LoggerEntry.fromJson(content); - - if (transport.apply(entry)) { - Path file = f.toPath(); - Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); - Files.move(file, target); + log.log(Level.FINE, "Processing file " + f); + boolean succcess = false; + try { + List lines = Files.readAllLines(f.toPath()); + for (String line : lines) { + LoggerEntry entry = LoggerEntry.fromJson(line); + log.log(Level.FINE, "Read entry " + entry + " from " + f); + succcess = transport.apply(entry); + if (! succcess) { + log.log(Level.WARNING, "unsuccessful call to transport() for " + entry); + } + }; + } catch (IOException e) { + throw new UncheckedIOException("Unable to process file " + f.toPath(), e); + // TODO: Move to failures path + } finally { + if (succcess) { + Path file = f.toPath(); + Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName()); + try { + Files.move(file, target); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } } } @@ -96,19 +118,6 @@ public class Spooler { public Path successesPath() { return successesPath; } public Path failuresPath() { return failuresPath; } - List getDirectories(File[] files) { - List fileList = new ArrayList<>(); - - for (File f : files) { - if (f.isDirectory()) { - fileList.add(f); - } - } - - Collections.sort(fileList); - return fileList; - } - List getFiles(List files, int count) { Validation.requireAtLeast(count, "count must be a positive number", 1); List fileList = new ArrayList<>(); @@ -133,11 +142,16 @@ public class Spooler { String fileName = String.valueOf(fileCounter); Path file = spoolPath.resolve(processingPath).resolve(fileName); try { - Files.writeString(file, entry.toJson(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName); + Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); - log.log(Level.INFO, "Moving file from " + file + " to " + target); - Files.move(file, target); - fileCounter.addAndGet(1); + + if (entryCounter.incrementAndGet() > maxEntriesPerFile) { + log.log(Level.INFO, "Moving file from " + file + " to " + target); + Files.move(file, target); + entryCounter.set(1); + fileCounter.incrementAndGet(); + } } catch (IOException e) { throw new RuntimeException(e); } -- cgit v1.2.3 From 08d13bef3a30a49c280421cf6688bc80989eb349 Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Mon, 17 Oct 2022 08:39:53 +0200 Subject: Rename to serialize and deserialize, use Slime everywhere --- .../com/yahoo/search/logging/LocalDiskLogger.java | 2 +- .../java/com/yahoo/search/logging/LoggerEntry.java | 34 ++++++++-------------- .../java/com/yahoo/search/logging/Spooler.java | 6 ++-- 3 files changed, 16 insertions(+), 26 deletions(-) (limited to 'container-search/src/main/java/com/yahoo/search/logging') diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java index f5d8c31be5d..7602b9d53e7 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java @@ -15,7 +15,7 @@ public class LocalDiskLogger extends AbstractThreadedLogger { @Override boolean transport(LoggerEntry entry) { - String json = entry.toJson(); + String json = entry.serialize(); try (FileWriter fw = new FileWriter(logFilePath, true)) { fw.write(json); fw.write(System.getProperty("line.separator")); diff --git a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java index 69f37226dae..e02bb8da8c0 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java +++ b/container-search/src/main/java/com/yahoo/search/logging/LoggerEntry.java @@ -2,14 +2,11 @@ package com.yahoo.search.logging; -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.yahoo.search.Query; -import com.yahoo.slime.JsonDecoder; +import com.yahoo.slime.Cursor; import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; import com.yahoo.text.Utf8; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; @@ -53,33 +50,26 @@ public class LoggerEntry { } public String toString() { - return toJson(); + return serialize(); } - public String toJson() { + public String serialize() { try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - g.writeStartObject(); + Slime slime = new Slime(); + Cursor root = slime.setObject(); - g.writeNumberField("timestamp", timestamp == null ? 0 : timestamp); - g.writeStringField("query", queryString()); - g.writeStringField("blob", Base64.getEncoder().encodeToString(blob.array())); - - g.writeEndObject(); - g.close(); - return out.toString(); + root.setLong("timestamp", timestamp == null ? 0 : timestamp); + root.setString("query", queryString()); + root.setString("blob", Base64.getEncoder().encodeToString(blob.array())); + return Utf8.toString(SlimeUtils.toJsonBytes(slime)); // TODO } catch (IOException e) { throw new UncheckedIOException(e); } } - // TODO: Rename method here and above? (serialize/deserialize) - // TODO: Use Slime or Jackson for both - public static LoggerEntry fromJson(String content) throws IOException { - var decoder = new JsonDecoder(); - var slime = decoder.decode(new Slime(), Utf8.toBytes(content)); + public static LoggerEntry deserialize(String content) throws IOException { + var slime = SlimeUtils.jsonToSlime(content); var timestamp = slime.get().field("timestamp").asLong(); var query = new Query(slime.get().field("query").asString()); diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java index 5533320c907..2193d414458 100644 --- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java +++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java @@ -89,7 +89,7 @@ public class Spooler { try { List lines = Files.readAllLines(f.toPath()); for (String line : lines) { - LoggerEntry entry = LoggerEntry.fromJson(line); + LoggerEntry entry = LoggerEntry.deserialize(line); log.log(Level.FINE, "Read entry " + entry + " from " + f); succcess = transport.apply(entry); if (! succcess) { @@ -142,8 +142,8 @@ public class Spooler { String fileName = String.valueOf(fileCounter); Path file = spoolPath.resolve(processingPath).resolve(fileName); try { - log.log(Level.INFO, "Writing entry " + entryCounter.get() + " (" + entry.toJson() + ") to " + fileName); - Files.writeString(file, entry.toJson() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + log.log(Level.FINE, "Writing entry " + entryCounter.get() + " (" + entry.serialize() + ") to file " + fileName); + Files.writeString(file, entry.serialize() + "\n", StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); Path target = spoolPath.resolve(readyPath).resolve(file.relativize(file)).resolve(fileName); if (entryCounter.incrementAndGet() > maxEntriesPerFile) { -- cgit v1.2.3