summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java110
1 files changed, 110 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
new file mode 100644
index 00000000000..1df236a037f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
@@ -0,0 +1,110 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+package com.yahoo.search.logging;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+abstract class AbstractThreadedLogger implements Logger {
+
+ private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName());
+
+ final static int DEFAULT_MAX_THREADS = 1;
+ final static int DEFAULT_QUEUE_SIZE = 1000;
+
+ protected final WorkerThreadExecutor executor;
+
+ AbstractThreadedLogger() {
+ this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize) {
+ executor = new WorkerThreadExecutor(threads, queueSize);
+ }
+
+ AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) {
+ executor = new WorkerThreadExecutor(threads, queueSize, factory);
+ }
+
+ @Override
+ public boolean send(LoggerEntry entry) {
+ return enqueue(entry);
+ }
+
+ protected boolean enqueue(LoggerEntry entry) {
+ // Todo: metric things
+ try {
+ executor.execute(() -> dequeue(entry));
+ } catch (RejectedExecutionException e) {
+ return false;
+ }
+ return true;
+ }
+
+ protected void dequeue(LoggerEntry entry) {
+ transport(entry); // This happens in worker thread
+ }
+
+ /**
+ * Actually transports the entry to it's destination
+ */
+ abstract void transport(LoggerEntry entry);
+
+
+ private static class WorkerThread extends Thread {
+
+ public WorkerThread(Runnable r) {
+ super(r);
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ } catch (Exception e) {
+ log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e);
+ }
+ }
+
+ }
+
+ protected static class WorkerThreadExecutor implements Executor {
+
+ protected final ThreadPoolExecutor executor;
+
+ WorkerThreadExecutor(int threads, int queueSize) {
+ this(threads, queueSize, WorkerThread::new);
+ }
+
+ WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) {
+ executor = new ThreadPoolExecutor(
+ threads, threads,
+ 0L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ threadFactory);
+ }
+
+ public void close() {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void execute(Runnable r) {
+ executor.execute(r);
+ }
+
+ }
+
+}