aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/logging/AbstractThreadedLogger.java
blob: cad6da7e1d8e7fd7bca6c3714c255eb3ed8df017 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

package com.yahoo.search.logging;

import com.yahoo.component.AbstractComponent;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

abstract class AbstractThreadedLogger extends AbstractComponent implements Logger {

    private final static java.util.logging.Logger log = java.util.logging.Logger.getLogger(AbstractThreadedLogger.class.getName());

    final static int DEFAULT_MAX_THREADS = 1;
    final static int DEFAULT_QUEUE_SIZE = 1000;

    protected final WorkerThreadExecutor executor;

    AbstractThreadedLogger() {
        this(DEFAULT_MAX_THREADS, DEFAULT_QUEUE_SIZE);
    }

    AbstractThreadedLogger(int threads, int queueSize) {
        executor = new WorkerThreadExecutor(threads, queueSize);
    }

    AbstractThreadedLogger(int threads, int queueSize, ThreadFactory factory) {
        executor = new WorkerThreadExecutor(threads, queueSize, factory);
    }

    @Override
    public boolean send(LoggerEntry entry) {
        return enqueue(entry);
    }

    protected boolean enqueue(LoggerEntry entry) {
        // Todo: metric things
        try {
            executor.execute(() -> dequeue(entry));
        } catch (RejectedExecutionException e) {
            return false;
        }
        return true;
    }

    protected void dequeue(LoggerEntry entry) {
        transport(entry);  // This happens in worker thread
    }

    /**
     * Actually transports the entry to its destination
     */
    public abstract boolean transport(LoggerEntry entry);

    /** Synchronously shuts down and waits for enqueued entries to be sent. */
    @Override
    public void deconstruct() {
        executor.close();
    }

    private static class WorkerThread extends Thread {

        public WorkerThread(Runnable r) {
            super(r);
        }

        @Override
        public void run() {
            try {
                super.run();
            } catch (Exception e) {
                log.log(Level.SEVERE, String.format("Error while sending logger entry: %s", e), e);
            }
        }

    }

    protected static class WorkerThreadExecutor implements Executor {

        protected final ThreadPoolExecutor executor;

        WorkerThreadExecutor(int threads, int queueSize) {
            this(threads, queueSize, WorkerThread::new);
        }

        WorkerThreadExecutor(int threads, int queueSize, ThreadFactory threadFactory) {
            executor = new ThreadPoolExecutor(
                    threads, threads,
                    0L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(queueSize),
                    threadFactory);
        }

        public void close() {
            try {
                executor.shutdown();
                executor.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                //
            } finally {
                executor.shutdownNow();
            }
        }

        @Override
        public void execute(Runnable r) {
            executor.execute(r);
        }

    }

}