aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHarald Musum <musum@yahooinc.com>2022-10-06 07:58:48 +0200
committerHarald Musum <musum@yahooinc.com>2022-10-06 07:58:48 +0200
commit9b291caa34a2b61be45e63d5bb539c136df616e0 (patch)
tree464877d718b4c14d90fa8e39000cffb6c35b6617 /container-search
parentb61e190f896f217f54a71e14b33b0589b8d87a32 (diff)
Make transport return a boolean and use a function when processing files
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/Spooler.java30
-rw-r--r--container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java30
5 files changed, 37 insertions, 35 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
index 5cb3aa24f98..3db854a6888 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractSpoolingLogger.java
@@ -35,9 +35,7 @@ abstract class AbstractSpoolingLogger extends AbstractThreadedLogger implements
public void run() {
try {
- var entries = spooler.processFiles();
- log.log(Level.INFO, "Entries: " + entries.size());
- entries.forEach(this::transport);
+ spooler.processFiles(this::transport);
} catch (IOException e) {
e.printStackTrace();
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
index 1df236a037f..17895a9ce57 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -53,7 +53,7 @@ abstract class AbstractThreadedLogger implements Logger {
/**
* Actually transports the entry to it's destination
*/
- abstract void transport(LoggerEntry entry);
+ abstract boolean transport(LoggerEntry entry);
private static class WorkerThread extends Thread {
diff --git a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
index 6d815059066..f5d8c31be5d 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/LocalDiskLogger.java
@@ -7,20 +7,22 @@ import java.io.IOException;
public class LocalDiskLogger extends AbstractThreadedLogger {
- private String logFilePath;
+ private final String logFilePath;
public LocalDiskLogger(LocalDiskLoggerConfig config) {
logFilePath = config.path();
}
@Override
- void transport(LoggerEntry entry) {
+ boolean transport(LoggerEntry entry) {
String json = entry.toJson();
try (FileWriter fw = new FileWriter(logFilePath, true)) {
fw.write(json);
fw.write(System.getProperty("line.separator"));
+ return true;
} catch (IOException e) {
e.printStackTrace();
+ return false;
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
index 8fa51a0b47c..0b50985d974 100644
--- a/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
+++ b/container-search/src/main/java/com/yahoo/search/logging/Spooler.java
@@ -14,6 +14,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.logging.Level;
import java.util.stream.Stream;
@@ -46,32 +47,27 @@ public class Spooler {
createDirs(spoolPath);
}
- public boolean write(LoggerEntry entry) {
+ void write(LoggerEntry entry) {
writeEntry(entry);
-
- return true;
}
- public List<LoggerEntry> processFiles() throws IOException {
+ public void processFiles(Function<LoggerEntry, Boolean> transport) throws IOException {
List<Path> files = listFilesInPath(readyPath);
-
if (files.size() == 0) {
log.log(Level.INFO, "No files in ready path " + readyPath.toFile().getAbsolutePath());
- return List.of();
+ return;
}
log.log(Level.FINE, "Files in ready path: " + files.size());
List<File> fileList = getFiles(files, 50); // TODO
if ( ! fileList.isEmpty()) {
- return processFiles(fileList);
+ processFiles(fileList, transport);
}
- return List.of();
}
List<Path> listFilesInPath(Path path) throws IOException {
List<Path> files;
- System.out.println("Path " + path + " exists: " + path.toFile().exists());
try (Stream<Path> stream = Files.list(path)) {
files = stream.toList();
// TODO: Or check if stream is empty
@@ -81,18 +77,18 @@ public class Spooler {
return files;
}
- public ArrayList<LoggerEntry> processFiles(List<File> files) throws IOException {
- ArrayList<LoggerEntry> entries = new ArrayList<>();
+ public void processFiles(List<File> files, Function<LoggerEntry, Boolean> transport) throws IOException {
for (File f : files) {
log.log(Level.INFO, "Found file " + f);
var content = Files.readAllBytes(f.toPath());
var entry = LoggerEntry.fromJson(content);
- Path file = f.toPath();
- Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
- Files.move(file, target);
- entries.add(entry);
+
+ if (transport.apply(entry)) {
+ Path file = f.toPath();
+ Path target = spoolPath.resolve(successesPath).resolve(f.toPath().relativize(file)).resolve(f.getName());
+ Files.move(file, target);
+ }
}
- return entries;
}
public Path processingPath() { return processingPath; }
@@ -159,7 +155,7 @@ public class Spooler {
if (file.exists() && file.canRead() && file.canWrite()) {
log.log(Level.INFO, "Directory " + path + " already exists");
} else if (file.mkdirs()) {
- log.log(Level.INFO, "Created " + path);
+ log.log(Level.FINE, "Created " + path);
} else {
log.log(Level.WARNING, "Could not create " + path + ", please check permissions");
}
diff --git a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
index abd3b27b70b..2413dac18d9 100644
--- a/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/logging/SpoolerTest.java
@@ -27,12 +27,8 @@ public class SpoolerTest {
Spooler spooler = new Spooler(spoolDir);
TestLogger logger = new TestLogger(spooler);
- assertTrue(logger.newEntry()
- .blob("Yo entry".getBytes())
- .send());
- assertTrue(logger.newEntry()
- .blob("Yo entry 2".getBytes())
- .send());
+ assertTrue(sendEntry(logger, "Yo entry"));
+ assertTrue(sendEntry(logger, "Yo entry 2"));
Path readyPath = spooler.readyPath();
Path readyFile1 = readyPath.resolve("1");
@@ -41,9 +37,8 @@ public class SpoolerTest {
waitUntilFileExists(readyFile2);
// Check content after being moved to ready path
- String content = Files.readString(readyFile1);
- assertTrue(content.contains(Base64.getEncoder().encodeToString("Yo entry".getBytes())));
- assertTrue(Files.readString(readyFile2).contains(Base64.getEncoder().encodeToString("Yo entry 2".getBytes())));
+ assertContent(readyFile1, "Yo entry");
+ assertContent(readyFile2, "Yo entry 2");
// Process files (read, transport files)
logger.manualRun();
@@ -56,6 +51,12 @@ public class SpoolerTest {
assertEquals(0, spooler.listFilesInPath(spooler.failuresPath()).size());
}
+ private boolean sendEntry(Logger logger, String x) {
+ return logger.newEntry()
+ .blob(x.getBytes())
+ .send();
+ }
+
private void waitUntilFileExists(Path path) {
Instant end = Instant.now().plus(Duration.ofSeconds(1));
while (!path.toFile().exists() && Instant.now().isBefore(end)) {
@@ -69,6 +70,10 @@ public class SpoolerTest {
assertTrue(path.toFile().exists());
}
+ private void assertContent(Path file, String expectedContent) throws IOException {
+ String content = Files.readString(file);
+ assertTrue(content.contains(Base64.getEncoder().encodeToString(expectedContent.getBytes())));
+ }
private static class TestLogger extends AbstractSpoolingLogger {
@@ -79,9 +84,9 @@ public class SpoolerTest {
}
@Override
- void transport(LoggerEntry entry) {
- System.out.println("Called transport()");
+ boolean transport(LoggerEntry entry) {
entriesSent.add(entry);
+ return true;
}
@Override
@@ -91,7 +96,8 @@ public class SpoolerTest {
@Override
public boolean send(LoggerEntry entry) {
- return spooler.write(entry);
+ spooler.write(entry);
+ return true;
}
public void manualRun() {