aboutsummaryrefslogtreecommitdiffstats
path: root/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
blob: 167db6fe915b092adbcc9c5be9d45994f42044e8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.logserver;

import java.util.logging.Level;
import com.yahoo.log.LogMessage;
import com.yahoo.logserver.handlers.LogHandler;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;


/**
 * This is the central point from which LogMessage objects are
 * propagated throughout the logserver architecture.
 *
 * @author Bjorn Borud
 */
public class LogDispatcher implements LogHandler {
    private static final Logger log = Logger.getLogger(LogDispatcher.class.getName());

    private final List<LogHandler> handlers = new CopyOnWriteArrayList<>();
    private final AtomicInteger messageCount = new AtomicInteger(0);
    private final AtomicBoolean batchedMode = new AtomicBoolean(false);
    private final int batchSize = 5000;
    private final AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
    private List<LogMessage> currentBatchList = null;

    public LogDispatcher() { }

    /**
     * Dispatches a message to all the LogHandler instances we've
     * got registered.  The main entry point for LogMessage instances
     * into the log server.
     *
     * @param msg The LogMessage instance we wish to dispatch to the
     *            plugins
     */
    public void handle(LogMessage msg) {
        if (msg == null) {
            throw new NullPointerException("LogMessage was null");
        }

        if (batchedMode.get()) {
            addToBatch(msg);
        } else {
            send(msg);
        }
        messageCount.incrementAndGet();
    }

    private void addToBatch(LogMessage msg) {
        List<LogMessage> toSend = null;
        synchronized (this) {
            if (currentBatchList == null) {
                currentBatchList = new ArrayList<LogMessage>(batchSize);
                currentBatchList.add(msg);
                return;
            }

            currentBatchList.add(msg);

            if (currentBatchList.size() == batchSize) {
                toSend = stealBatch();
            }
        }
        flushBatch(toSend);
    }

    private void send(List<LogMessage> messages) {
        for (LogHandler ht : handlers) {
            ht.handle(messages);
        }
    }
    private void send(LogMessage message) {
        for (LogHandler ht : handlers) {
            ht.handle(message);
        }
    }

    private void flushBatch(List<LogMessage> todo) {
        if (todo == null) { return; }
        send(todo);
    }

    public void handle(List<LogMessage> messages) {
        for (var message : messages) {
            handle(message);
        }
    }

    /**
     * Set the batched mode.  Note that this should only be set
     * at initialization time because it radically changes the
     * behavior of the dispatcher.  When in batched mode, the
     * dispatcher will not enqueue single LogMessage instances
     * but lists of same.
     */
    public void setBatchedMode(boolean batchedMode) {
        this.batchedMode.set(batchedMode);
    }

    private List<LogMessage> stealBatch() {
        List<LogMessage> toSend = null;
        synchronized (this) {
            toSend = currentBatchList;
            currentBatchList = null;
        }
        return toSend;
    }
    public void flush() {
        if (batchedMode.get()) {
            flushBatch(stealBatch());
        }

        for (LogHandler h : handlers) {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "Flushing " + h.toString());
            }
            h.flush();
        }
    }

    public void close() {
        if (hasBeenShutDown.getAndSet(true)) {
            throw new IllegalStateException("Shutdown already in progress");
        }

        for (LogHandler ht : handlers) {
            if (ht instanceof Thread) {
                log.fine("Stopping " + ht);
                // Todo: Very bad, never do....
                ((Thread) ht).interrupt();
            }
        }
        handlers.clear();

        log.log(Level.FINE, "Logdispatcher shut down.  Handled " + messageCount + " messages");
    }

    /**
     * Register handler thread with the dispatcher.  If the handler
     * thread has already been registered, we log a warning and
     * just do nothing.
     * <p>
     * If the thread is not alive it will be start()'ed.
     */
    public void registerLogHandler(LogHandler ht) {
        if (hasBeenShutDown.get()) {
            throw new IllegalStateException("Tried to register LogHandler on LogDispatcher which was shut down");
        }

        synchronized (this) {
            if (handlers.contains(ht)) {
                log.warning("LogHandler was already registered: " + ht);
                return;
            }
            handlers.add(ht);
        }

        if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) {
            ((Thread) ht).start();
        }

        log.fine("Added (and possibly started) LogHandler " + ht);
    }

    /**
     * Make defensive copy and return array of LogHandlers.
     */
    public LogHandler[] getLogHandlers() {
        LogHandler[] h = new LogHandler[handlers.size()];
        return handlers.toArray(h);
    }

    /**
     * Return message counter.
     *
     * @return Returns the number of messages that we have seen.
     */
    public int getMessageCount() {
        return messageCount.get();
    }

}