summaryrefslogtreecommitdiffstats
path: root/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
blob: f7828b4dc9fe499848bd4c3d77b206d1ec67cf97 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.logserver;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import com.yahoo.io.SelectLoopHook;
import com.yahoo.log.LogLevel;
import com.yahoo.log.LogMessage;
import com.yahoo.logserver.handlers.LogHandler;


/**
 * This is the central point from which LogMessage objects are
 * propagated throughout the logserver architecture.
 *
 * @author Bjorn Borud
 */
public class LogDispatcher implements LogHandler, SelectLoopHook {
    private static final Logger log = Logger.getLogger(LogDispatcher.class.getName());

    private final List<LogHandler> handlers = new ArrayList<>();
    private int messageCount = 0;
    private boolean hasBeenShutDown = false;
    private boolean batchedMode = false;
    private final int batchSize = 5000;
    private List<LogMessage> currentBatchList;
    private int roundCount = 0;
    @SuppressWarnings("unused")
    private int lastRoundCount = 0;

    public LogDispatcher() {
    }

    /**
     * Dispatches a message to all the LogHandler instances we've
     * got registered.  The main entry point for LogMessage instances
     * into the log server.
     *
     * @param msg The LogMessage instance we wish to dispatch to the
     *            plugins
     */
    public synchronized void handle(LogMessage msg) {
        if (msg == null) {
            throw new NullPointerException("LogMessage was null");
        }

        if (batchedMode) {
            addToBatch(msg);
        } else {
            for (LogHandler h : handlers) {
                h.handle(msg);
            }
        }
        messageCount++;
    }

    private void addToBatch(LogMessage msg) {
        if (currentBatchList == null) {
            currentBatchList = new ArrayList<LogMessage>(batchSize);
            currentBatchList.add(msg);
            return;
        }

        currentBatchList.add(msg);

        if (currentBatchList.size() == batchSize) {
            flushBatch();
        }
    }

    private void flushBatch() {
        if (currentBatchList == null) {
            return;
        }

        for (LogHandler ht : handlers) {
            ht.handle(currentBatchList);
        }
        currentBatchList = null;
    }


    public void handle(List<LogMessage> messages) {
        throw new IllegalStateException("method not supported");
    }

    /**
     * Set the batched mode.  Note that this should only be set
     * at initialization time because it radically changes the
     * behavior of the dispatcher.  When in batched mode, the
     * dispatcher will not enqueue single LogMessage instances
     * but lists of same.
     */
    public void setBatchedMode(boolean batchedMode) {
        this.batchedMode = batchedMode;
    }

    public synchronized void flush() {
        if (batchedMode) {
            flushBatch();
        }

        for (LogHandler h : handlers) {
            if (log.isLoggable(LogLevel.DEBUG)) {
                log.log(LogLevel.DEBUG, "Flushing " + h.toString());
            }
            h.flush();
        }
    }

    public synchronized void close() {
        if (hasBeenShutDown) {
            throw new IllegalStateException("Shutdown already in progress");
        }
        hasBeenShutDown = true;

        for (LogHandler ht : handlers) {
            if (ht instanceof Thread) {
                log.fine("Stopping " + ht);
                ((Thread) ht).interrupt();
            }
        }
        handlers.clear();

        log.log(LogLevel.DEBUG, "Logdispatcher shut down.  Handled " + messageCount + " messages");
    }

    /**
     * Register handler thread with the dispatcher.  If the handler
     * thread has already been registered, we log a warning and
     * just do nothing.
     * <p>
     * <p>
     * If the thread is not alive it will be start()'ed.
     */
    public synchronized void registerLogHandler(LogHandler ht) {
        if (hasBeenShutDown) {
            throw new IllegalStateException("Tried to register LogHandler on" +
                                                    " LogDispatcher which was shut down");
        }

        if (handlers.contains(ht)) {
            log.warning("LogHandler was already registered: " + ht);
            return;
        }
        handlers.add(ht);

        if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) {
            ((Thread) ht).start();
        }

        log.fine("Added (and possibly started) LogHandler " + ht);
    }

    /**
     * Make defensive copy and return array of LogHandlers.
     */
    public LogHandler[] getLogHandlers() {
        LogHandler[] h = new LogHandler[handlers.size()];
        return handlers.toArray(h);
    }

    /**
     * Return message counter.
     *
     * @return Returns the number of messages that we have seen.
     */
    public synchronized int getMessageCount() {
        return messageCount;
    }

    /**
     * Hook which is called when the select loop has finished.
     */
    public void selectLoopHook(boolean before) {
        if (batchedMode) {
            flushBatch();
        }

        lastRoundCount = messageCount - roundCount;
        roundCount = messageCount;
    }
}