summaryrefslogtreecommitdiffstats
path: root/logserver/src/main/java/com/yahoo
diff options
context:
space:
mode:
Diffstat (limited to 'logserver/src/main/java/com/yahoo')
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/AbstractPluginLoader.java46
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java31
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/Flusher.java57
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java188
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/PluginLoader.java22
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/Server.java208
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/demo/.gitignore0
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/LevelFilter.java30
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/LogFilter.java23
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/LogFilterManager.java99
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/MetricsFilter.java50
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/MuteFilter.java30
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/NoMetricsFilter.java21
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/filter/NullFilter.java19
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatter.java28
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatterManager.java70
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/formatter/NullFormatter.java29
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/formatter/TextFormatter.java51
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/AbstractLogHandler.java137
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/HandlerThread.java241
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/LogHandler.java51
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolder.java186
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderConnection.java141
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderPlugin.java51
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsHandler.java233
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsPlugin.java54
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java78
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java130
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java411
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java58
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/net/LogConnection.java272
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/net/LogConnectionFactory.java41
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/net/control/Levels.java125
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/net/control/State.java54
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/testutils/VerifyLogfile.java65
-rw-r--r--logserver/src/main/java/com/yahoo/plugin/Config.java23
-rw-r--r--logserver/src/main/java/com/yahoo/plugin/Plugin.java33
-rw-r--r--logserver/src/main/java/com/yahoo/plugin/SystemPropertyConfig.java35
38 files changed, 3421 insertions, 0 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/AbstractPluginLoader.java b/logserver/src/main/java/com/yahoo/logserver/AbstractPluginLoader.java
new file mode 100644
index 00000000000..6d215b9e831
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/AbstractPluginLoader.java
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.plugin.Plugin;
+import com.yahoo.plugin.SystemPropertyConfig;
+
+import java.util.logging.Logger;
+
+/**
+ * TODO: describe class
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public abstract class AbstractPluginLoader implements PluginLoader {
+ private static final Logger log = Logger.getLogger(AbstractPluginLoader.class.getName());
+
+ public abstract void loadPlugins();
+
+ protected void loadFromClass (Class<? extends Plugin> pluginClass) {
+ Plugin plugin;
+ try {
+ plugin = (Plugin)pluginClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ log.log(LogLevel.ERROR, pluginClass.getName() + ": load failed: " + e);
+ throw new RuntimeException(e);
+ }
+
+ String pname = plugin.getPluginName();
+ String prefix = Server.APPNAME + "." + pname + ".";
+ SystemPropertyConfig config = new SystemPropertyConfig(prefix);
+ String enable = config.get("enable", "true");
+
+ if (!enable.equals("true")) {
+ log.log(LogLevel.INFO, pname + ": plugin disabled by config");
+ return;
+ }
+
+ try {
+ plugin.initPlugin(config);
+ log.log(LogLevel.DEBUG, pname + ": plugin loaded");
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, pname + ": init failed", e);
+ }
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java b/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java
new file mode 100644
index 00000000000..f2ef60267fe
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+import java.util.logging.Logger;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.logserver.handlers.archive.ArchiverPlugin;
+import com.yahoo.logserver.handlers.lasterrorsholder.LastErrorsHolderPlugin;
+import com.yahoo.logserver.handlers.logmetrics.LogMetricsPlugin;
+import com.yahoo.logserver.handlers.replicator.ReplicatorPlugin;
+
+/**
+ * Load a set of builtin plugins
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public class BuiltinPluginLoader extends AbstractPluginLoader {
+ private static final Logger log = Logger.getLogger(BuiltinPluginLoader.class.getName());
+
+ public void loadPlugins() {
+ log.log(LogLevel.DEBUG, "starting to load builtin plugins");
+
+ loadFromClass(ArchiverPlugin.class);
+ loadFromClass(ReplicatorPlugin.class);
+ loadFromClass(LogMetricsPlugin.class);
+ loadFromClass(LastErrorsHolderPlugin.class);
+
+ log.log(LogLevel.DEBUG, "done loading builtin plugins");
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/Flusher.java b/logserver/src/main/java/com/yahoo/logserver/Flusher.java
new file mode 100644
index 00000000000..ee2b317f573
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/Flusher.java
@@ -0,0 +1,57 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.logserver.handlers.LogHandler;
+
+/**
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class Flusher extends Thread {
+ private static final Logger log = Logger.getLogger(Flusher.class.getName());
+ private static final Flusher instance;
+ private static final List<WeakReference<LogHandler>> logHandlers =
+ new ArrayList<WeakReference<LogHandler>>();
+
+ static {
+ instance = new Flusher();
+ instance.start();
+ }
+
+ Flusher() {
+ super("flusher");
+ }
+
+ public static synchronized void register(LogHandler logHandler) {
+ logHandlers.add(new WeakReference<LogHandler>(logHandler));
+ }
+
+ public synchronized void run() {
+ try {
+ while(!isInterrupted()) {
+ Thread.sleep(2000);
+ Iterator<WeakReference<LogHandler>> it = logHandlers.iterator();
+ while (it.hasNext()) {
+ WeakReference<LogHandler> r = it.next();
+ LogHandler h = r.get();
+ if (h == null) {
+ it.remove();
+ } else {
+ h.flush();
+ }
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Flushing " + h);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.log(LogLevel.WARNING, "flusher was interrupted", e);
+ }
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
new file mode 100644
index 00000000000..6de34d4a899
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
@@ -0,0 +1,188 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.yahoo.io.SelectLoopHook;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.handlers.LogHandler;
+
+
+/**
+ * This is the central point from which LogMessage objects are
+ * propagated throughout the logserver architecture.
+ *
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class LogDispatcher implements LogHandler, SelectLoopHook
+{
+ private static final Logger log = Logger.getLogger(LogDispatcher.class.getName());
+
+ private final List<LogHandler> handlers = new ArrayList<LogHandler>();
+ private int messageCount = 0;
+ private boolean hasBeenShutDown = false;
+ private boolean batchedMode = false;
+ private final int batchSize = 5000;
+ private List<LogMessage> currentBatchList;
+ private int roundCount = 0;
+ @SuppressWarnings("unused")
+ private int lastRoundCount = 0;
+
+ public LogDispatcher () {
+ }
+
+ /**
+ * Dispatches a message to all the LogHandler instances we've
+ * got registered. The main entry point for LogMessage instances
+ * into the log server.
+ *
+ * @param msg The LogMessage instance we wish to dispatch to the
+ * plugins
+ */
+ public synchronized void handle (LogMessage msg) {
+ if (msg == null) {
+ throw new NullPointerException("LogMessage was null");
+ }
+
+ if (batchedMode) {
+ addToBatch(msg);
+ } else {
+ for (LogHandler h : handlers) {
+ h.handle(msg);
+ }
+ }
+ messageCount++;
+ }
+
+ private void addToBatch (LogMessage msg) {
+ if (currentBatchList == null) {
+ currentBatchList = new ArrayList<LogMessage>(batchSize);
+ currentBatchList.add(msg);
+ return;
+ }
+
+ currentBatchList.add(msg);
+
+ if (currentBatchList.size() == batchSize) {
+ flushBatch();
+ }
+ }
+
+ private void flushBatch () {
+ if (currentBatchList == null) {
+ return;
+ }
+
+ for (LogHandler ht : handlers) {
+ ht.handle(currentBatchList);
+ }
+ currentBatchList = null;
+ }
+
+
+ public void handle (List<LogMessage> messages) {
+ throw new IllegalStateException("method not supported");
+ }
+
+ /**
+ * Set the batched mode. Note that this should only be set
+ * at initialization time because it radically changes the
+ * behavior of the dispatcher. When in batched mode, the
+ * dispatcher will not enqueue single LogMessage instances
+ * but lists of same.
+ */
+ public void setBatchedMode (boolean batchedMode) {
+ this.batchedMode = batchedMode;
+ }
+
+ public synchronized void flush () {
+ if (batchedMode) {
+ flushBatch();
+ }
+
+ for (LogHandler h : handlers) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Flushing " + h.toString());
+ }
+ h.flush();
+ }
+ }
+
+ public synchronized void close () {
+ if (hasBeenShutDown) {
+ throw new IllegalStateException("Shutdown already in progress");
+ }
+ hasBeenShutDown = true;
+
+ for (LogHandler ht : handlers) {
+ if (ht instanceof Thread) {
+ log.fine("Stopping " + ht);
+ ((Thread)ht).interrupt();
+ }
+ }
+ handlers.clear();
+
+ log.log(LogLevel.DEBUG, "Logdispatcher shut down. Handled " + messageCount + " messages");
+ }
+
+ /**
+ * Register handler thread with the dispatcher. If the handler
+ * thread has already been registered, we log a warning and
+ * just do nothing.
+ *
+ * <P>
+ * If the thread is not alive it will be start()'ed.
+ *
+ */
+ public synchronized void registerLogHandler (LogHandler ht) {
+ if (hasBeenShutDown) {
+ throw new IllegalStateException("Tried to register LogHandler on"+
+ " LogDispatcher which was shut down");
+ }
+
+ if (handlers.contains(ht)) {
+ log.warning("LogHandler was already registered: " + ht);
+ return;
+ }
+ handlers.add(ht);
+
+ if ((ht instanceof Thread) && (! ((Thread)ht).isAlive())) {
+ ((Thread)ht).start();
+ }
+
+ log.fine("Added (and possibly started) LogHandler " + ht);
+ }
+
+ /**
+ * Make defensive copy and return array of LogHandlers.
+ */
+ public LogHandler[] getLogHandlers () {
+ LogHandler[] h = new LogHandler[handlers.size()];
+ return handlers.toArray(h);
+ }
+
+ /**
+ * Return message counter.
+ *
+ * @return Returns the number of messages that we have seen.
+ */
+ public synchronized int getMessageCount () {
+ return messageCount;
+ }
+
+ /**
+ * Hook which is called when the select loop has finished.
+ */
+ public void selectLoopHook (boolean before) {
+ if (batchedMode) {
+ flushBatch();
+ }
+
+ lastRoundCount = messageCount - roundCount;
+ roundCount = messageCount;
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/PluginLoader.java b/logserver/src/main/java/com/yahoo/logserver/PluginLoader.java
new file mode 100644
index 00000000000..2357eefb804
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/PluginLoader.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+/**
+ * This interface specifies an API for implementing logserver plugin
+ * loaders. A plugin loader has two basic tasks: to load or unload
+ * all of its knows plugins. In addition, if a plugin loader's
+ * canReload() method returns <code>true</code>, plugins may be loaded
+ * again after they are unloaded.
+ *
+ * <p> Plugins loaded through such reload-capable plugin loaders may
+ * be upgraded without restarting the server.
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public interface PluginLoader
+{
+ /**
+ * Load all plugins known to this loader.
+ */
+ public void loadPlugins();
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/Server.java b/logserver/src/main/java/com/yahoo/logserver/Server.java
new file mode 100644
index 00000000000..d09ae305dc2
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/Server.java
@@ -0,0 +1,208 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver;
+
+import com.yahoo.io.FatalErrorHandler;
+import com.yahoo.io.Listener;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogSetup;
+import com.yahoo.log.event.Event;
+import com.yahoo.logserver.handlers.HandlerThread;
+import com.yahoo.logserver.handlers.LogHandler;
+import com.yahoo.logserver.net.LogConnectionFactory;
+import com.yahoo.logserver.net.control.Levels;
+import com.yahoo.system.CatchSigTerm;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class implements the log server itself. At present there is
+ * no runtime configuration; the server starts up, binds port 19081
+ * and loads builtin plugins.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+
+public class Server implements Runnable {
+ private final AtomicBoolean signalCaught = new AtomicBoolean(false);
+ public static final String APPNAME = "logserver";
+ private static final Server instance = new Server();
+ private static final Logger log = Logger.getLogger(Server.class.getName());
+ private static final FatalErrorHandler fatalErrorHandler =
+ new FatalErrorHandler();
+ private static final HashMap<String,HandlerThread> handlerThreads =
+ new HashMap<String,HandlerThread>();
+ private static final HashMap<LogHandler,String> threadNameForHandler =
+ new HashMap<LogHandler,String>();
+
+ static {
+ LogSetup.initVespaLogging("ADM");
+ }
+
+ // the port is a String because we want to use it as the default
+ // value of a System.getProperty().
+ private static final String LISTEN_PORT = "19081";
+
+ private int listenPort;
+ private Listener listener;
+ private final LogDispatcher dispatch;
+
+ private final boolean isInitialized;
+
+ /**
+ * Server constructor
+ */
+ private Server () {
+ dispatch = new LogDispatcher();
+ dispatch.setBatchedMode(true);
+ isInitialized = false;
+ }
+
+ public static Server getInstance () {
+ return instance;
+ }
+
+ private HandlerThread getHandlerThread (String threadName) {
+ threadName += " handler thread";
+ HandlerThread ht = handlerThreads.get(threadName);
+ if (ht == null) {
+ ht = new HandlerThread(threadName);
+ handlerThreads.put(threadName, ht);
+ ht.setFatalErrorHandler(fatalErrorHandler);
+ dispatch.registerLogHandler(ht);
+ }
+ return ht;
+ }
+
+ public void registerPluginLoader (PluginLoader loader) {
+ loader.loadPlugins();
+ }
+
+ public void registerLogHandler (LogHandler lh, String threadName) {
+ HandlerThread ht = getHandlerThread(threadName);
+ ht.registerHandler(lh);
+ threadNameForHandler.put(lh, threadName);
+ }
+
+ public void unregisterLogHandler (LogHandler lh) {
+ String threadName = threadNameForHandler.get(lh);
+ unregisterLogHandler(lh, threadName);
+ }
+
+ public void unregisterLogHandler (LogHandler lh, String threadName) {
+ HandlerThread ht = getHandlerThread(threadName);
+ ht.unregisterHandler(lh);
+ threadNameForHandler.remove(lh);
+ }
+
+ public void registerFlusher (LogHandler lh) {
+ Flusher.register(lh);
+ }
+
+ /** Included only for consistency */
+ public void unregisterFlusher (LogHandler lh) {
+ /* NOP */
+ }
+
+ /**
+ * Initialize the server and start up all its plugins,
+ *
+ * @param listenPort The port on which the logserver accepts log
+ * messages.
+ */
+ public void initialize (int listenPort) {
+ if (isInitialized) {
+ throw new IllegalStateException(APPNAME + " already initialized");
+ }
+
+ this.listenPort = listenPort;
+
+ // plugins
+ registerPluginLoader(new BuiltinPluginLoader());
+
+ // main listener
+ listener = new Listener(APPNAME);
+ listener.addSelectLoopPostHook(dispatch);
+ listener.setFatalErrorHandler(fatalErrorHandler);
+ }
+
+ /**
+ * Sets up the listen port and starts the Listener. Then waits for
+ * Listener to exit.
+ */
+ public void run() {
+ try {
+ listener.listen(new LogConnectionFactory(dispatch), listenPort);
+ log.log(LogLevel.CONFIG, APPNAME + ".listenport=" + listenPort);
+ } catch (IOException e) {
+ log.log(LogLevel.ERROR, "Unable to initialize", e);
+ return;
+ }
+
+ log.fine("Starting listener...");
+ listener.start();
+ Event.started(APPNAME);
+ try {
+ listener.join();
+ log.fine("listener thread exited");
+ }
+ catch (InterruptedException e) {
+ log.log(Level.WARNING, "Server was interrupted", e);
+ }
+ }
+ private void setupSigTermHandler() {
+ CatchSigTerm.setup(signalCaught); // catch termination signal
+ }
+
+ private void waitForShutdown() {
+ synchronized (signalCaught) {
+ while (!signalCaught.get()) {
+ try {
+ signalCaught.wait();
+ } catch (InterruptedException e) {
+ // empty
+ }
+ }
+ }
+ Event.stopping(APPNAME, "shutdown");
+ dispatch.close();
+ Event.stopped(APPNAME, 0, 0);
+ System.exit(0);
+ }
+
+ public static HashMap<LogHandler, String> threadNameForHandler() {
+ return threadNameForHandler;
+ }
+
+ public static void help () {
+ System.out.println();
+ System.out.println("System properties:");
+ System.out.println(" - " + APPNAME + ".listenport (" + LISTEN_PORT + ")");
+ System.out.println(" - " + APPNAME + ".queue.size ("
+ + HandlerThread.DEFAULT_QUEUESIZE
+ + ")");
+ System.out.println(" - logserver.default.loglevels ("
+ + (new Levels()).toString()
+ + ")");
+ System.out.println();
+ }
+
+ public static void main (String[] args) {
+ if (args.length > 0 && "-help".equals(args[0])) {
+ help();
+ System.exit(0);
+ }
+
+ String portString = System.getProperty(APPNAME + ".listenport", LISTEN_PORT);
+ Server server = Server.getInstance();
+ server.setupSigTermHandler();
+ server.initialize(Integer.parseInt(portString));
+
+ Thread t = new Thread(server, "logserver main");
+ t.start();
+ server.waitForShutdown();
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/demo/.gitignore b/logserver/src/main/java/com/yahoo/logserver/demo/.gitignore
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/demo/.gitignore
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/LevelFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/LevelFilter.java
new file mode 100644
index 00000000000..b229af522ab
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/LevelFilter.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogMessage;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.logging.Level;
+/**
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class LevelFilter implements LogFilter {
+ private final Set<Level> levels = new HashSet<Level>();
+
+ public void addLevel (Level level) {
+ levels.add(level);
+ }
+
+ public void removeLevel (Level level) {
+ levels.remove(level);
+ }
+
+ public boolean isLoggable (LogMessage msg) {
+ return levels.contains(msg.getLevel());
+ }
+
+ public String description () {
+ return "Match specific log levels";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/LogFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/LogFilter.java
new file mode 100644
index 00000000000..8c4c3911c9e
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/LogFilter.java
@@ -0,0 +1,23 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * This interface is analogous to the java.util.logging.Filter
+ * interface. Classes implementing this interface should be
+ * <b>stateless/immutable if possible so filters can be
+ * shared</b>.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public interface LogFilter {
+ /**
+ * Determine if this log message is loggable.
+ *
+ * @param msg The log message
+ *
+ */
+ public boolean isLoggable (LogMessage msg);
+ public String description ();
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/LogFilterManager.java b/logserver/src/main/java/com/yahoo/logserver/filter/LogFilterManager.java
new file mode 100644
index 00000000000..a68aff86e10
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/LogFilterManager.java
@@ -0,0 +1,99 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.yahoo.log.LogLevel;
+
+/**
+ * The LogFilterManager keeps track of associations between
+ * LogFilter names and instances, so that access to filters
+ * is truly global. It also manages the LogFilter namespace
+ * to ensure that system-defined filters are not tampered with.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+
+public class LogFilterManager {
+ private static final LogFilterManager instance;
+
+ static {
+ instance = new LogFilterManager();
+
+ LevelFilter allEvents = new LevelFilter();
+ allEvents.addLevel(LogLevel.EVENT);
+ instance.addLogFilterInternal("system.allevents", allEvents);
+ instance.addLogFilterInternal("system.metricsevents", new MetricsFilter());
+ instance.addLogFilterInternal("system.nometricsevents", new NoMetricsFilter());
+ instance.addLogFilterInternal("system.all", new NullFilter());
+ instance.addLogFilterInternal("system.mute", MuteFilter.getInstance());
+ }
+
+ private final Map<String,LogFilter> filters = new HashMap<String,LogFilter>();
+
+ private LogFilterManager () {
+ }
+
+ /**
+ * Public interface for adding a name-logfilter mapping. If
+ * there exists a mapping already the old mapping is replaced
+ * with the new mapping.
+ *
+ * If the name is within the namespace reserved for internal
+ * built-in filters it will throw an exception
+ */
+ public static void addLogFilter (String name, LogFilter filter) {
+ if (filter == null) {
+ throw new NullPointerException("filter cannot be null");
+ }
+
+ if (name == null) {
+ throw new NullPointerException("name cannot be null");
+ }
+
+ String n = name.toLowerCase();
+
+ if (n.startsWith("system.")) {
+ throw new IllegalArgumentException("'system' namespace is reserved");
+ }
+
+ instance.addLogFilterInternal(n, filter);
+ }
+
+ /**
+ * LogFilter lookup function
+ *
+ * @param name The name of the LogFilter to be looked up.
+ * @return Returns the LogFilter associated with this name or
+ * <code>null</code> if not found.
+ */
+ public static LogFilter getLogFilter (String name) {
+ return instance.filters.get(name);
+ }
+
+
+ /**
+ * Get the names of the defined filters.
+ *
+ * @return Returns an array containing the names of filters that
+ * have been registered.
+ *
+ */
+ public static String[] getFilterNames () {
+ synchronized(instance.filters) {
+ String[] filterNames= new String[instance.filters.keySet().size()];
+ instance.filters.keySet().toArray(filterNames);
+ return filterNames;
+ }
+ }
+
+ /**
+ * Internal method which takes care of the job of adding
+ * LogFilter mappings but doesn't perform any of the checks
+ * performed by the public method for adding mappings.
+ */
+ private void addLogFilterInternal (String name, LogFilter filter) {
+ filters.put(name, filter);
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/MetricsFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/MetricsFilter.java
new file mode 100644
index 00000000000..fa626de1d08
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/MetricsFilter.java
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.event.Count;
+import com.yahoo.log.event.CountGroup;
+import com.yahoo.log.event.Event;
+import com.yahoo.log.event.Histogram;
+import com.yahoo.log.event.MalformedEventException;
+import com.yahoo.log.event.Value;
+import com.yahoo.log.event.ValueGroup;
+import com.yahoo.log.LogMessage;
+
+/**
+ * This filter matches events that are used for monitoring, specificly
+ * the Count and Value events.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class MetricsFilter implements LogFilter {
+ public boolean isLoggable (LogMessage msg) {
+ if (msg.getLevel() != LogLevel.EVENT) {
+ return false;
+ }
+
+ Event event;
+ try {
+ event = msg.getEvent();
+ }
+ catch (MalformedEventException e) {
+ return false;
+ }
+
+ // if it is not Count, Value or something which will generate
+ // Count or Value we don't care
+ if (! ((event instanceof Count)
+ || (event instanceof Value)
+ || (event instanceof Histogram)
+ || (event instanceof CountGroup)
+ || (event instanceof ValueGroup))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public String description () {
+ return "Match all events representing system metrics (Counts, Values, etc).";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/MuteFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/MuteFilter.java
new file mode 100644
index 00000000000..a360cba6b8f
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/MuteFilter.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * Filter which always returns false.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class MuteFilter implements LogFilter {
+ private static final MuteFilter instance = new MuteFilter();
+
+ /**
+ * Singleton, private constructor.
+ */
+ private MuteFilter () {}
+
+ public static MuteFilter getInstance() {
+ return instance;
+ }
+
+ public boolean isLoggable (LogMessage msg) {
+ return false;
+ }
+
+ public String description () {
+ return "Matches no messages. Mute.";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/NoMetricsFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/NoMetricsFilter.java
new file mode 100644
index 00000000000..5621ea60d0c
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/NoMetricsFilter.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * This filter is the complement of MetricsFilter
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class NoMetricsFilter implements LogFilter {
+ final MetricsFilter filter = new MetricsFilter();
+
+ public boolean isLoggable (LogMessage msg) {
+ return (! filter.isLoggable(msg));
+ }
+
+ public String description () {
+ return "Matches all log messages except Count and Value events";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/filter/NullFilter.java b/logserver/src/main/java/com/yahoo/logserver/filter/NullFilter.java
new file mode 100644
index 00000000000..a62b2f7171b
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/filter/NullFilter.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.filter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ *
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class NullFilter implements LogFilter {
+ public boolean isLoggable (LogMessage msg) {
+ return true;
+ }
+
+ public String description () {
+ return "Match all log messages";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatter.java b/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatter.java
new file mode 100644
index 00000000000..ae217634e1c
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatter.java
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.formatter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * This interface is analogous to the java.util.logging.Formatter
+ * interface. Classes implementing this interface should be
+ * <b>stateless/immutable if possible so formatters can be
+ * shared</b>. If it does have state it must not prevent
+ * concurrent use.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public interface LogFormatter {
+ /**
+ * Format log message as a string.
+ *
+ * @param msg The log message
+ *
+ */
+ public String format (LogMessage msg);
+
+ /**
+ * Returns a textual description of the formatter
+ */
+ public String description ();
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatterManager.java b/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatterManager.java
new file mode 100644
index 00000000000..fb81b35dcf0
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/formatter/LogFormatterManager.java
@@ -0,0 +1,70 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * $Id$
+ *
+ */
+
+package com.yahoo.logserver.formatter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This singleton class implements a central registry of LogFormatter
+ * instances.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class LogFormatterManager {
+ private static final LogFormatterManager instance;
+ static {
+ instance = new LogFormatterManager();
+ instance.addLogFormatterInternal("system.textformatter", new TextFormatter());
+ instance.addLogFormatterInternal("system.nullformatter", new NullFormatter());
+ }
+
+ private final Map<String,LogFormatter> logFormatters =
+ new HashMap<String,LogFormatter>();
+
+ private LogFormatterManager () {}
+
+ /**
+ * LogFormatter lookup function
+ *
+ * @param name The name of the LogFormatter to be looked up.
+ * @return Returns the LogFormatter associated with this name or
+ * <code>null</code> if not found.
+ */
+ public static LogFormatter getLogFormatter (String name) {
+ synchronized(instance.logFormatters) {
+ return instance.logFormatters.get(name);
+ }
+ }
+
+ /**
+ * Get the names of the defined formatters.
+ *
+ * @return Returns an array containing the names of formatters that
+ * have been registered.
+ *
+ */
+ public static String[] getFormatterNames () {
+ synchronized(instance.logFormatters) {
+ String[] formatterNames = new String[instance.logFormatters.keySet().size()];
+ instance.logFormatters.keySet().toArray(formatterNames);
+ return formatterNames;
+ }
+ }
+
+ /**
+ * Internal method which takes care of the job of adding
+ * LogFormatter mappings but doesn't perform any of the checks
+ * performed by the public method for adding mappings.
+ */
+ private void addLogFormatterInternal (String name, LogFormatter logFormatter) {
+ synchronized(logFormatters) {
+ logFormatters.put(name, logFormatter);
+ }
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/formatter/NullFormatter.java b/logserver/src/main/java/com/yahoo/logserver/formatter/NullFormatter.java
new file mode 100644
index 00000000000..646ab0ce423
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/formatter/NullFormatter.java
@@ -0,0 +1,29 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * $Id$
+ *
+ */
+
+package com.yahoo.logserver.formatter;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * This formatter doesn't really format anything. It just
+ * calls the LogMessage toString() method. This is kind of
+ * pointless and silly, but we include it for symmetry...
+ * or completeness....or...whatever.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class NullFormatter implements LogFormatter {
+
+ public String format (LogMessage msg) {
+ return msg.toString();
+ }
+
+ public String description() {
+ return "Format message in native VESPA format";
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/formatter/TextFormatter.java b/logserver/src/main/java/com/yahoo/logserver/formatter/TextFormatter.java
new file mode 100644
index 00000000000..d94dcceaf29
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/formatter/TextFormatter.java
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * $Id$
+ *
+ */
+
+package com.yahoo.logserver.formatter;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import com.yahoo.log.LogMessage;
+
+/**
+ * Creates human-readable text representation of log message.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class TextFormatter implements LogFormatter {
+ static final SimpleDateFormat dateFormat;
+
+ static {
+ dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ public String format (LogMessage msg) {
+ StringBuffer sbuf = new StringBuffer(150);
+ sbuf.append(dateFormat.format(new Date(msg.getTime())))
+ .append(" ")
+ .append(msg.getHost())
+ .append(" ")
+ .append(msg.getThreadProcess())
+ .append(" ")
+ .append(msg.getService())
+ .append(" ")
+ .append(msg.getComponent())
+ .append(" ")
+ .append(msg.getLevel().toString())
+ .append(" ")
+ .append(msg.getPayload())
+ .append("\n");
+
+ return sbuf.toString();
+ }
+
+ public String description() {
+ return "Format log-message as human readable text";
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/AbstractLogHandler.java b/logserver/src/main/java/com/yahoo/logserver/handlers/AbstractLogHandler.java
new file mode 100644
index 00000000000..f498568e7f2
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/AbstractLogHandler.java
@@ -0,0 +1,137 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.filter.LogFilter;
+
+/**
+ * This abstract class is the one you would usually want to
+ * extend when you are writing a LogHandler since it takes care
+ * of quite a bit of tedious work for you (log message counting,
+ * handling of lists of messages versus single instances etc).
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public abstract class AbstractLogHandler implements LogHandler {
+ private long count = 0;
+ private long filtered = 0;
+ private LogFilter filter = null;
+ private String name;
+
+ /**
+ * This is the entry point for each log handler. Takes care
+ * of calling the actual doHandle() method. Provided to make
+ * it possible to extend what a handler does in a uniform way.
+ *
+ * @param msg The message we are about to handle
+ */
+ public final void handle (LogMessage msg) {
+ if ((filter != null) && (! filter.isLoggable(msg))) {
+ filtered++;
+ return;
+ }
+
+ if (doHandle(msg)) {
+ count++;
+ }
+ }
+
+ /**
+ * Handle a list of LogMessage instances
+ *
+ * @param messages List of LogMessage instances.
+ */
+ public final void handle (List<LogMessage> messages) {
+ Iterator<LogMessage> it = messages.iterator();
+ while (it.hasNext()) {
+ handle(it.next());
+ }
+ }
+
+ /**
+ * Set LogFilter for this LogHandler. If the LogFilter is
+ * <code>null</code>, filtering has in effect been turned
+ * off.
+ *
+ * @param filter The filter to be used for this handler
+ *
+ */
+ public void setLogFilter (LogFilter filter) {
+ this.filter = filter;
+ }
+
+ /**
+ * @return Returns the log filter for this handler or
+ * <code>null</code> if no filter is in effect.
+ */
+ public LogFilter getLogFilter () {
+ return filter;
+ }
+
+ /**
+ * Returns the internal counter which keeps track of the number
+ * of times doHandle has been called.
+ *
+ * @return Returns the number of times doHandle has been called.
+ */
+ public final long getCount () {
+ return count;
+ }
+
+ public String getName () {
+ if (name == null) {
+ String n = this.getClass().getName();
+ int x = n.lastIndexOf('.');
+ if (x != -1) {
+ n = n.substring(x+1);
+ }
+ name = n;
+ }
+ return name;
+ }
+
+ public void setName (String name) {
+ this.name = name;
+ }
+
+ /**
+ * The method which actually handles the log message and
+ * does something to it. This is the one you wish to
+ * override when you write a new handler.
+ *
+ * <P>
+ * <em>
+ * If your handle method is slow you should document this fact
+ * so that decisions can be made with regard to configuration.
+ * </em>
+ *
+ * @param msg The LogMessage we are about to handle
+ * @return Returns <code>true</code> if the message was
+ * handled and <code>false</code> if it was ignored.
+ */
+ public abstract boolean doHandle (LogMessage msg);
+
+ /**
+ * Flush LogMessages.
+ */
+ public abstract void flush ();
+
+ /**
+ * Close this loghandler. After a loghandler is closed calling
+ * the #handle() has undefined behavior, but it should be assumed
+ * that log messages will be silently dropped.
+ *
+ * <P>
+ * #close() usually implies #flush() but don't bet on it.
+ */
+ public abstract void close ();
+
+ /**
+ * Force implementation of (hopefully meaningful) toString()
+ */
+ public abstract String toString();
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/HandlerThread.java b/logserver/src/main/java/com/yahoo/logserver/handlers/HandlerThread.java
new file mode 100644
index 00000000000..8af5202739b
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/HandlerThread.java
@@ -0,0 +1,241 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Logger;
+
+import com.yahoo.io.FatalErrorHandler;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This handler implements a dispatcher which runs in its own
+ * thread. The purpose of this handler is to isolate execution
+ * of handlers from the main server IO threads.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+
+public class HandlerThread extends Thread implements LogHandler
+{
+ private static final Logger log = Logger.getLogger(HandlerThread.class.getName());
+
+ // default queue size is 200
+ public static final int DEFAULT_QUEUESIZE = 200;
+ private static int queueSize = DEFAULT_QUEUESIZE;
+
+ private FatalErrorHandler fatalErrorHandler;
+
+ // set other queue size if specified
+ static {
+ String queueSizeStr = System.getProperty("logserver.queue.size");
+ if (queueSizeStr != null) {
+ queueSize = Integer.parseInt(queueSizeStr);
+
+ // should never be smaller than 50
+ if (queueSize < 50) {
+ queueSize = 50;
+ }
+
+ log.info("set queue size to " + queueSize);
+ }
+ }
+
+ private static class ItemOrList {
+ final LogMessage item;
+ final List<LogMessage> list;
+ ItemOrList(LogMessage i) {
+ this.item = i;
+ this.list = null;
+ }
+ ItemOrList(List<LogMessage> l) {
+ this.item = null;
+ this.list = l;
+ }
+ public String toString() {
+ return "item="+item+", list="+list;
+ }
+ }
+
+ private final BlockingQueue<ItemOrList> queue;
+ private final List<LogHandler> handlers = new ArrayList<LogHandler>();
+ private long count;
+ @SuppressWarnings("unused")
+ private long droppedCount = 0;
+ @SuppressWarnings("unused")
+ private boolean queueWasFull = false;
+ @SuppressWarnings("unused")
+ private long lastDropLogMessage = 0;
+ @SuppressWarnings("unused")
+ private long lastAcceptingLogMessage = 0;
+
+ public HandlerThread (String name) {
+ super(name);
+ queue = new LinkedBlockingQueue<>(queueSize);
+ log.log(LogLevel.CONFIG, "logserver.queue.size=" + queueSize);
+ }
+
+ /**
+ * Register a handler for fatal errors.
+ *
+ * @param f The FatalErrorHandler instance to be registered
+ */
+ public synchronized void setFatalErrorHandler (FatalErrorHandler f) {
+ fatalErrorHandler = f;
+ }
+
+ /**
+ * Called by the LogDispatch to put a LogMessage onto the Queue
+ *
+ * @param message The LogMessage we wish to dispatch to this
+ * handler thread.
+ */
+ public void handle (LogMessage message) {
+ handleInternal(new ItemOrList(message));
+ }
+
+ /**
+ * Called by the LogDispatch to put a list of LogMessage
+ * instances onto the Queue.
+ */
+ public void handle (List<LogMessage> messages) {
+ handleInternal(new ItemOrList(messages));
+ }
+
+ private void handleInternal (ItemOrList o) {
+ boolean done = false;
+ while (! done) {
+ try {
+ queue.put(o);
+ done = true;
+ }
+ catch (InterruptedException e) {
+ // NOP
+ }
+ }
+ }
+
+ public void flush () {
+ Iterator<LogHandler> it = handlers.iterator();
+ while (it.hasNext()) {
+ LogHandler handler = (LogHandler)it.next();
+ handler.flush();
+ }
+ }
+
+ public void close () {
+ Iterator<LogHandler> it = handlers.iterator();
+ while (it.hasNext()) {
+ LogHandler handler = (LogHandler)it.next();
+ handler.close();
+ }
+ }
+
+ public long getCount () {
+ return count;
+ }
+
+ /**
+ * Register a LogHandler
+ */
+ public synchronized void registerHandler (LogHandler handler) {
+ log.fine("Registering handler " + handler);
+ handlers.add(handler);
+ }
+
+ /**
+ * Unregister a Loghandler
+ */
+ public synchronized void unregisterHandler (LogHandler handler) {
+ int idx;
+ while ((idx = handlers.indexOf(handler)) != -1) {
+ try {
+ handlers.remove(idx);
+ } catch (IndexOutOfBoundsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Return an array of the registered handlers.
+ *
+ * @return Returns an array of the handlers registered
+ */
+ public LogHandler[] getHandlers () {
+ LogHandler[] h = new LogHandler[handlers.size()];
+ return handlers.toArray(h);
+ }
+
+
+ /**
+ * Return the underlying queue used to send LogMessage instances
+ * to this handler thread.
+ */
+ public BlockingQueue<ItemOrList> getQueue () {
+ return queue;
+ }
+
+ /**
+ * Consume messages from the incoming queue and hand
+ * them off to the handlers.
+ */
+ public void run () {
+ if (queue == null) {
+ throw new NullPointerException("channel is not allowed to be null");
+ }
+
+ // TODO: Make the legmessage elements some kind of composite structure to handle both individual messages and lists uniformly.
+ List<ItemOrList> drainList = new ArrayList<ItemOrList>(queue.size() + 1);
+ try {
+ for (;;) {
+ drainList.clear();
+ // block in take(), then see if there is more
+ // to be had with drainTo()
+ drainList.add(queue.take());
+ queue.drainTo(drainList);
+
+ for (ItemOrList o : drainList) {
+ // we can get two types of elements here: single log
+ // messages or lists of log messages, so we need to
+ // handle them accordingly.
+
+ if (o.item != null) {
+ for (LogHandler handler : handlers) {
+ handler.handle(o.item);
+ }
+ } else if (o.list != null) {
+ for (LogHandler handler : handlers) {
+ handler.handle(o.list);
+ }
+ } else {
+ throw new IllegalArgumentException("not LogMessage or List: " + o);
+ }
+ count++;
+ }
+ }
+ } catch (InterruptedException e) {
+ // NOP
+ } catch (Throwable t) {
+ if (fatalErrorHandler != null) {
+ fatalErrorHandler.handle(t, null);
+ }
+ } finally {
+ log.fine("Handler thread "
+ + getName()
+ + " exiting, removing handlers");
+ for (LogHandler handler : handlers) {
+ log.fine("Removing handler " + handler);
+ handler.close();
+ }
+ handlers.clear();
+ log.fine("Handler thread " + getName() + " done");
+ }
+
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/LogHandler.java b/logserver/src/main/java/com/yahoo/logserver/handlers/LogHandler.java
new file mode 100644
index 00000000000..6fb009fcda2
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/LogHandler.java
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers;
+
+import com.yahoo.log.LogMessage;
+import java.util.List;
+
+/**
+ * The LogHandler interface defines the interface used for all
+ * parts of the logserver which consume log messages.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public interface LogHandler {
+ /**
+ * This is the entry point for the log handling. This method
+ * should return as quickly as possible in implementations
+ * so if you need to initiate time consuming processing you
+ * should look into some design alternatives.
+ *
+ * @param msg The log message
+ */
+ public void handle (LogMessage msg);
+
+ /**
+ * Instead of taking a single log message, this method can take
+ * a List of them. The List abstraction was chosen because the
+ * order needs to be preserved.
+ *
+ * @param messages a List containing zero or more LogMessage
+ * instances.
+ */
+ public void handle (List<LogMessage> messages);
+
+ /**
+ * Any log messages received so far should be dealt with
+ * before this method returns -- within reason ,of course.
+ * (<em>Within reason is loosely defined to be 2-5 seconds</em>)
+ */
+ public void flush ();
+
+ /**
+ * Signals that we want to end logging and should close down the
+ * unerlying logging mechanism -- whatever this maps to
+ * semantically for the underlying implementation. After this
+ * method has been called it is considered an error to submit more
+ * log messages to the handle() methods and an implementation
+ * may elect to throw runtime exceptions.
+ *
+ */
+ public void close ();
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolder.java b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolder.java
new file mode 100644
index 00000000000..93552bf9f00
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolder.java
@@ -0,0 +1,186 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.lasterrorsholder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.yahoo.io.Connection;
+import com.yahoo.io.ConnectionFactory;
+import com.yahoo.io.Listener;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.handlers.AbstractLogHandler;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+/**
+ * The LastErrorsHolder handler is used for holding the last n
+ * messages at level error or higher. Connecting to this handler
+ * will return a Json object with the last errors (default is last 100 errors)
+ *
+ * @author musum
+ */
+public class LastErrorsHolder extends AbstractLogHandler implements ConnectionFactory {
+ private static final Logger log = Logger.getLogger(LastErrorsHolder.class.getName());
+ private static final int maxNumErrors = 100;
+ private final Object lock = new Object();
+
+ private int port;
+ private Listener listener;
+
+ private final ArrayList<LogMessage> errors = new ArrayList<>();
+ private int numberOfErrors = 0;
+
+ /**
+ * @param port The port to which this handler listens to.
+ */
+ public LastErrorsHolder(int port) throws IOException {
+ this.port = port;
+ listen(port);
+ }
+
+ public void listen(int port) throws IOException {
+ if (listener != null) {
+ throw new IllegalStateException("already listening to port " + this.port);
+ }
+ listener = new Listener("last-errors-holder");
+ listener.listen(this, port);
+ listener.start();
+ log.log(LogLevel.CONFIG, "port=" + port);
+ }
+
+ public boolean doHandle(LogMessage msg) {
+ if (msg.getLevel().equals(LogLevel.ERROR) || msg.getLevel().equals(LogLevel.FATAL)) {
+ synchronized (lock) {
+ numberOfErrors++;
+ if (errors.size() < maxNumErrors) {
+ errors.add(msg);
+ } else if (numberOfErrors == maxNumErrors) {
+ log.log(LogLevel.DEBUG, String.format("Not storing errors, have reached maximum number of errors: %d, total number of errors received: %d",
+ maxNumErrors, numberOfErrors));
+ }
+ }
+ }
+ return true;
+ }
+
+ public void close() {
+ try {
+ listener.interrupt();
+ listener.join();
+ log.log(LogLevel.DEBUG, "listener stopped");
+ } catch (InterruptedException e) {
+ log.log(LogLevel.WARNING, "listener was interrupted", e);
+ }
+ }
+
+ public void flush() {
+ }
+
+ /**
+ * Factory method for creating new connections. Since we just return a result
+ * when client connection happens, we also write the result here
+ *
+ * @param socket The new SocketChannel
+ * @param listener The Listener instance we want to use
+ */
+ public Connection newConnection(SocketChannel socket, Listener listener) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "New last-errors-holder connection: " + socket);
+ }
+ LastErrorsHolderConnection connection = new LastErrorsHolderConnection(socket);
+ synchronized (lock) {
+ Messages messages = new Messages();
+ for (LogMessage error : errors) {
+ messages.addMessage(
+ new Message(error.getTime()/1000,
+ error.getHost(),
+ error.getService(),
+ error.getLevel().getName(),
+ error.getPayload()));
+ }
+ messages.setNumberOfErrors(numberOfErrors);
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ StringWriter stringWriter = new StringWriter();
+ mapper.writeValue(stringWriter, messages);
+ connection.enqueue(StandardCharsets.UTF_8.encode(stringWriter.toString()));
+ } catch (IOException e) {
+ log.log(LogLevel.WARNING, "Could not enqueue log message", e);
+ }
+
+ errors.clear();
+ numberOfErrors = 0;
+ }
+
+ return connection;
+ }
+
+ public String toString() {
+ return LastErrorsHolder.class.getName();
+ }
+
+
+ static class Messages {
+ private final List<Message> messages = new ArrayList<>();
+ private long errorCount = 0; // There might be more errors than number of messages
+
+ void addMessage(Message message) {
+ messages.add(message);
+ }
+
+ void setNumberOfErrors(long errorCount) {
+ this.errorCount = errorCount;
+ }
+
+ public List<Message> getMessages() {
+ return messages;
+ }
+
+ public long getErrorCount() {
+ return errorCount;
+ }
+ }
+
+ static class Message {
+ private final long time;
+ private final String hostname;
+ private final String service;
+ private final String logLevel;
+ private final String message;
+
+ Message(long time, String hostname, String service, String logLevel, String message) {
+ this.time = time;
+ this.hostname = hostname;
+ this.service = service;
+ this.logLevel = logLevel;
+ this.message = message;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getService() {
+ return service;
+ }
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderConnection.java b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderConnection.java
new file mode 100644
index 00000000000..0dc89e95d11
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderConnection.java
@@ -0,0 +1,141 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.lasterrorsholder;
+
+import com.yahoo.io.Connection;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.filter.LogFilter;
+import com.yahoo.logserver.filter.LogFilterManager;
+import com.yahoo.logserver.formatter.LogFormatter;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.logging.Logger;
+
+/**
+ * LastErrorsHandler client connection.
+ *
+ * @author musum
+ */
+public class LastErrorsHolderConnection implements Connection, LogFilter {
+ private static final Logger log = Logger.getLogger(LastErrorsHolderConnection.class.getName());
+
+ private final SocketChannel socket;
+ private ByteBuffer writeBuffer;
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(4096);
+ private LogFilter filter = null;
+ protected LogFormatter formatter = null;
+ private static final String filterName = "system.mute";
+
+ /**
+ * Constructs a LastErrorsHolderConnection. Note that initially the
+ * filter of this connection is set to MuteFilter, which mutes
+ * all log messages.
+ */
+ public LastErrorsHolderConnection(SocketChannel socket) {
+ this.socket = socket;
+ this.filter = LogFilterManager.getLogFilter(filterName);
+ }
+
+ /**
+ * Check if the message is wanted by this particular replicator
+ * connection. The reason we provide this method is because we
+ * want to be able to determine if a message is wanted by any
+ * client before committing resources to creating a ByteBuffer to
+ * serialize it into.
+ *
+ * @param msg The log message offered
+ */
+ public boolean isLoggable(LogMessage msg) {
+ if (filter == null) {
+ return true;
+ }
+ return filter.isLoggable(msg);
+ }
+
+ /**
+ * Return the description of the currently active filter.
+ */
+ public String description() {
+ if (filter == null) {
+ return "No filter defined";
+ }
+ return filter.description();
+ }
+
+
+ /**
+ * Enqueues a ByteBuffer containing the message destined
+ * for the client.
+ *
+ * @param buffer the ByteBuffer into which the log message is
+ * serialized.
+ */
+ public synchronized void enqueue(ByteBuffer buffer) throws IOException {
+ writeBuffer = buffer;
+ write();
+ }
+
+ public void read() throws IOException {
+ if (!readBuffer.hasRemaining()) {
+ log.warning("Log message too long. Message exceeds "
+ + readBuffer.capacity()
+ + " bytes. Connection dropped.");
+ close();
+ return;
+ }
+
+
+ int ret = socket.read(readBuffer);
+ if (ret == -1) {
+ close();
+ return;
+ }
+
+ if (ret == 0) {
+ if (log.isLoggable(LogLevel.INFO)) {
+ log.log(LogLevel.INFO, "zero byte read occurred");
+ }
+ }
+
+ readBuffer.flip();
+ }
+
+ public synchronized void write() throws IOException {
+ if (!socket.isOpen()) {
+ close();
+ }
+ do {
+ try {
+ socket.write(writeBuffer);
+ } catch (IOException e) {
+ log.log(LogLevel.WARNING, "Error writing", e);
+ close();
+ return;
+ }
+ } while (writeBuffer.hasRemaining());
+ }
+
+ public synchronized void close() throws IOException {
+ socket.close();
+ writeBuffer = null;
+ }
+
+ public int selectOps() {
+ return SelectionKey.OP_READ;
+ }
+
+ public SocketChannel socketChannel() {
+ return socket;
+ }
+
+ public void connect() {
+ }
+
+ void setFilter(LogFilter filter) {
+ this.filter = filter;
+ }
+}
+
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderPlugin.java b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderPlugin.java
new file mode 100644
index 00000000000..7cfdbe2b578
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/lasterrorsholder/LastErrorsHolderPlugin.java
@@ -0,0 +1,51 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.lasterrorsholder;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.logserver.Server;
+import com.yahoo.plugin.Config;
+import com.yahoo.plugin.Plugin;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+public class LastErrorsHolderPlugin implements Plugin {
+ private static final String DEFAULT_PORT = "19082";
+ private static final Logger log = Logger.getLogger(LastErrorsHolderPlugin.class.getName());
+ private LastErrorsHolder lastErrorsHolder;
+ private final Server server = Server.getInstance();
+
+ public String getPluginName() {
+ return "last-errors-holder";
+ }
+
+ /**
+ * Initialize the plugin
+ */
+ public void initPlugin(Config config) {
+ if (lastErrorsHolder != null) {
+ throw new IllegalStateException("plugin already initialized: " + getPluginName());
+ }
+ int listenPort = config.getInt("port", DEFAULT_PORT);
+ String threadName = config.get("thread", getPluginName());
+ try {
+ lastErrorsHolder = new LastErrorsHolder(listenPort);
+ } catch (IOException e) {
+ log.log(LogLevel.WARNING, "init failed: " + e);
+ return;
+ }
+ server.registerLogHandler(lastErrorsHolder, threadName);
+ }
+
+ /**
+ * Shut down the plugin.
+ */
+ public void shutdownPlugin() {
+
+ if (lastErrorsHolder == null) {
+ throw new IllegalStateException("plugin not initialized: " + getPluginName());
+ }
+ server.unregisterLogHandler(lastErrorsHolder);
+ lastErrorsHolder = null;
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsHandler.java b/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsHandler.java
new file mode 100644
index 00000000000..fee835b8189
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsHandler.java
@@ -0,0 +1,233 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * $Id$
+ */
+package com.yahoo.logserver.handlers.logmetrics;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.ArrayList;
+import java.util.logging.Logger;
+import java.util.logging.Level;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.event.Event;
+import com.yahoo.logserver.filter.LevelFilter;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.handlers.AbstractLogHandler;
+
+/**
+ * The LogMetricsHandler stores a count of the number of log messages
+ * per level per host and sends an event count for this five minutes.
+ *
+ *
+ * @author <a href="mailto:musum@yahoo-inc.com">Harald Musum</a>
+ */
+public class LogMetricsHandler extends AbstractLogHandler
+{
+ public static final long EVENTINTERVAL = 5 * 60; // in seconds
+
+ private static final Logger log =
+ Logger.getLogger(LogMetricsHandler.class.getName());
+
+ // A list of log metrics per host and per log level
+ private final List<LevelCount> logMetrics = new ArrayList<LevelCount>();
+
+ // The log levels that are handled by this plugin
+ private static final Level[] levels = {LogLevel.INFO,
+ LogLevel.WARNING,
+ LogLevel.SEVERE,
+ LogLevel.ERROR,
+ LogLevel.FATAL};
+
+
+ /**
+ * Constructor sets a default log filter ignoring the config,
+ * debug and spam levels.
+ */
+ public LogMetricsHandler() {
+ LevelFilter filter = new LevelFilter();
+
+ for (Level level : Arrays.asList(levels)) {
+ filter.addLevel(level);
+ }
+
+ setLogFilter(filter);
+
+ // Start thread that sends events.
+ EventGenerator eventThread = new EventGenerator();
+ new Thread(eventThread).start();
+ }
+
+ public boolean doHandle(LogMessage message) {
+ String host = message.getHost();
+ Level logLevel = message.getLevel();
+
+ boolean found = false;
+ if (logMetrics.size() > 0) {
+ LevelCount count;
+ // Loop through the list logMetrics and check if there
+ // exists an element with the same host and level.
+ for (int i = 0; i < logMetrics.size(); i++) {
+ count = logMetrics.get(i);
+ if (count.getHost().equals(host) &&
+ count.getLevel().getName().equals(logLevel.getName())) {
+ count.addCount(1);
+ found = true;
+ break;
+ }
+ }
+ }
+
+ // There is no element in logMetrics with the same host and
+ // level as in the message, so create a new object and add it
+ // to the list.
+ if (!found) {
+ for (Level level : Arrays.asList(levels)) {
+ LevelCount levelCount;
+ if (level.getName().equals(logLevel.getName())) {
+ levelCount = new LevelCount(host,
+ level,
+ 1);
+ } else {
+ levelCount = new LevelCount(host,
+ level,
+ 0);
+
+ }
+ logMetrics.add(levelCount);
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * Create event count for each log level and report it. For now we
+ * add up the numbers for all host on each level and report that.
+ *
+ */
+ private void sendEvents() {
+ Map<String,Long> levelCount = getMetricsPerLevel();
+ for (Map.Entry<String,Long> entry : levelCount.entrySet()) {
+ String key = entry.getKey();
+ Long count = entry.getValue();
+ Event.count("log_message." + key.toLowerCase(), count.longValue());
+ }
+ }
+
+ public void flush() {}
+
+ public void close() {}
+
+ public String toString() {
+ return LogMetricsHandler.class.getName();
+ }
+
+ /**
+ * Returns the total number of log messages processed by this
+ * plugin.
+ *
+ * @return A count of log messages
+ */
+ public long getMetricsCount() {
+ long count = 0;
+ for (LevelCount levelCount : logMetrics) {
+ count = count + levelCount.getCount();
+ }
+ return count;
+ }
+
+ /**
+ * Returns a Map of log level counts (level is key and count is
+ * value).
+ *
+ * @return A Map of log level counts
+ */
+ public Map<String,Long> getMetricsPerLevel() {
+ Map<String, Long> levelCounts = new TreeMap<String, Long>();
+ // Loop through all levels summing the count for all hosts.
+ for (Level level : Arrays.asList(levels)) {
+ String levelName = level.getName();
+ long count = 0;
+ for (LevelCount levelCount : logMetrics) {
+ if (levelName.equals(levelCount.getLevel().getName())) {
+ count += levelCount.getCount();
+ }
+ }
+ levelCounts.put(levelName, count);
+ }
+ return levelCounts;
+ }
+
+ /**
+ * The LevelCount class represents the number (count) of log
+ * messages with the same log level for a host.
+ *
+ */
+ private class LevelCount {
+ private final String host;
+ private final Level level;
+ private long count;
+
+ LevelCount(String host, Level level, long count) {
+ this.host = host;
+ this.level = level;
+ this.count = count;
+ }
+
+ LevelCount(String host, Level level) {
+ this(host, level, 0);
+ }
+
+ Level getLevel() {
+ return level;
+ }
+
+ String getHost() {
+ return host;
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ void setCount(long count) {
+ this.count = count;
+ }
+
+ void addCount(long add) {
+ count += add;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Host=" + host + ", level = " + level.getName() +
+ ",count=" + count);
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Implements a thread that sends events every EVENTINTERVAL
+ * seconds.
+ *
+ */
+ private class EventGenerator implements Runnable {
+ public void run() {
+ // Send events every EVENTINTERVAL seconds
+ while (true) {
+ try {
+ Thread.sleep(EVENTINTERVAL * 1000);
+ } catch (InterruptedException e) {
+ log.log(LogLevel.WARNING, e.getMessage());
+ }
+ sendEvents();
+ }
+ }
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsPlugin.java b/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsPlugin.java
new file mode 100644
index 00000000000..204e35d52bb
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/logmetrics/LogMetricsPlugin.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/*
+ * $Id$
+ */
+package com.yahoo.logserver.handlers.logmetrics;
+
+import java.util.logging.Logger;
+
+import com.yahoo.logserver.Server;
+import com.yahoo.plugin.Config;
+import com.yahoo.plugin.Plugin;
+
+
+public class LogMetricsPlugin implements Plugin
+{
+ private static final Logger log =
+ Logger.getLogger(LogMetricsPlugin.class.getName());
+ private LogMetricsHandler logMetricsHandler;
+ private final Server server = Server.getInstance();
+
+ public String getPluginName() {
+ return "logmetrics";
+ }
+
+ /** Initialize the logmetrics plugin
+ *
+ * @param config Plugin config object, keys used:
+ * <code>thread</code> - name of handler thread this plugin runs in
+ *
+ */
+ public void initPlugin(Config config) {
+ if (logMetricsHandler != null) {
+ log.finer("LogMetricsPlugin doubly initialized");
+ throw new IllegalStateException(
+ "plugin already initialized: " + getPluginName());
+ }
+ String threadName = config.get("thread", getPluginName());
+ logMetricsHandler = new LogMetricsHandler();
+ server.registerLogHandler(logMetricsHandler, threadName);
+ }
+
+ /**
+ * Shut down the logmetrics plugin.
+ */
+ public void shutdownPlugin() {
+ if (logMetricsHandler == null) {
+ log.finer("LogMetricsPlugin shutdown before initialize");
+ throw new IllegalStateException("plugin not initialized: " +
+ getPluginName());
+ }
+ server.unregisterLogHandler(logMetricsHandler);
+ logMetricsHandler = null;
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java
new file mode 100644
index 00000000000..f20adbe3c91
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java
@@ -0,0 +1,78 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.replicator;
+
+import java.util.Map;
+import java.util.IdentityHashMap;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.formatter.LogFormatter;
+import com.yahoo.logserver.formatter.LogFormatterManager;
+
+/**
+ * This class is used to cache log messages that have been
+ * formatted into ByteBuffers. The purpose of this class
+ * is to make it easier to support multiple message formats while
+ * still ensuring we don't format more messages than we strictly need
+ * to and that we don't keep around more buffers that we ought to.
+ *
+ * <P>
+ * This is not a general purpose class, I think, so please
+ * refer to the source code of the Replicator class for
+ * information on how to use this.
+ *
+ * <P>
+ * This class is not threadsafe.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class FormattedBufferCache {
+ // the documentation says " All of the methods defined in this
+ // class are safe for use by multiple concurrent threads." so
+ // we have only one instance of the charset for this class.
+ //
+ static private final Charset charset = Charset.forName("utf-8");
+
+ private final IdentityHashMap<LogFormatter, ByteBuffer> buffers;
+
+ public FormattedBufferCache () {
+ // hope this is a good hash size
+ int initialSize = LogFormatterManager.getFormatterNames().length * 2;
+ buffers = new IdentityHashMap<LogFormatter, ByteBuffer>(initialSize);
+ }
+
+ /**
+ * Return a ByteBuffer slice of a buffer containing the
+ * LogMessage formatted by the LogFormatter. If one didn't
+ * exist in the cache from before, it will after this
+ * method returns.
+ *
+ * @param msg The log message you wish to format
+ * @param formatter The log formatter you wish to use for formatting
+ * @return Returns a ByteBuffer slice
+ */
+ public ByteBuffer getFormatted (LogMessage msg, LogFormatter formatter) {
+ ByteBuffer bb = buffers.get(formatter);
+ if (bb == null) {
+ bb = charset.encode(formatter.format(msg));
+ buffers.put(formatter, bb);
+ }
+ return bb.slice();
+ }
+
+ /**
+ * After we're done distributing the log message to all the
+ * clients we clear the cache so we are ready for the next
+ * message.
+ */
+ public void reset () {
+ buffers.clear();
+ }
+
+ /**
+ * This is here for test purposes. Don't get any bright ideas.
+ */
+ public Map<LogFormatter,ByteBuffer> getUnderlyingMapOnlyForTesting() {
+ return buffers;
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java
new file mode 100644
index 00000000000..f8dc700931f
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java
@@ -0,0 +1,130 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.replicator;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+import com.yahoo.io.Connection;
+import com.yahoo.io.ConnectionFactory;
+import com.yahoo.io.Listener;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.handlers.AbstractLogHandler;
+
+/**
+ * The Replicator plugin is used for replicating log messages sent
+ * to the logserver.
+ *
+ * <P>
+ * Per default the replicator will start dropping messages enqueued
+ * to a client if the outbound message queue reaches 5000 messages.
+ * This limit can be configured by setting the system property
+ * <code>logserver.replicator.maxqueuelength</code> to the desired
+ * value.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class Replicator extends AbstractLogHandler
+implements ConnectionFactory
+{
+ private static final Logger log = Logger.getLogger(Replicator.class.getName());
+
+ private int port;
+ private Listener listener;
+ private final Set<ReplicatorConnection> connections = new HashSet<ReplicatorConnection>();
+ private final FormattedBufferCache bufferCache = new FormattedBufferCache();
+
+ /**
+ * @param port The port to which the replicator listens.
+ *
+ */
+ public Replicator (int port) throws IOException {
+ this.port = port;
+ listen(port);
+ }
+
+ public Replicator () {
+ }
+
+ public void listen (int port) throws IOException {
+ if (listener != null) {
+ throw new IllegalStateException("already listening to port " + this.port);
+ }
+ listener = new Listener("replicator");
+ listener.listen(this, port);
+ listener.start();
+ log.log(LogLevel.CONFIG, "port=" + port);
+ }
+
+ public synchronized boolean doHandle (LogMessage msg) {
+ boolean logged = false;
+ bufferCache.reset();
+ for (ReplicatorConnection c : connections) {
+ try {
+ if (c.isLoggable(msg)) {
+ c.enqueue(bufferCache.getFormatted(msg, c.formatter));
+ logged = true;
+ }
+ }
+ catch (IOException e) {
+ log.log(LogLevel.DEBUG, "Writing failed", e);
+ }
+ }
+ return logged;
+ }
+
+ public void close() {
+ // kill the listener thread, then wait for it to
+ // shut down.
+ try {
+ listener.interrupt();
+ listener.join();
+ log.log(LogLevel.DEBUG, "Replicator listener stopped");
+ }
+ catch (InterruptedException e) {
+ log.log(LogLevel.WARNING,
+ "Replicator listener was interrupted",
+ e);
+ }
+ }
+
+ /**
+ * Currently a NOP, but we might want to have some best-effort
+ * mechanism for trying to flush all connections within some
+ * time-frame.
+ */
+ public void flush() {}
+
+ /**
+ * Factory method for wrapping new connections in the proper
+ * (Replicator)Connection objects.
+ *
+ * @param socket The new SocketChannel
+ * @param listener The Listener instance we want to use
+ */
+ public synchronized Connection newConnection (SocketChannel socket,
+ Listener listener) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.fine("New replicator connection: " + socket);
+ }
+ ReplicatorConnection n =
+ new ReplicatorConnection(socket, listener, this);
+ connections.add(n);
+ return n;
+ }
+
+ /**
+ * Removes a ReplicatorConnection from the set of active
+ * connections.
+ */
+ protected synchronized void deRegisterConnection (ReplicatorConnection conn) {
+ connections.remove(conn);
+ }
+
+ public String toString () {
+ return Replicator.class.getName();
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java
new file mode 100644
index 00000000000..2978f751665
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java
@@ -0,0 +1,411 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.replicator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.LinkedList;
+import java.util.StringTokenizer;
+import java.util.logging.Logger;
+
+import com.yahoo.io.Connection;
+import com.yahoo.io.IOUtils;
+import com.yahoo.io.Listener;
+import com.yahoo.io.ReadLine;
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.LogMessage;
+import com.yahoo.logserver.filter.LogFilter;
+import com.yahoo.logserver.filter.LogFilterManager;
+import com.yahoo.logserver.formatter.LogFormatter;
+import com.yahoo.logserver.formatter.LogFormatterManager;
+
+/**
+ * Replication client connection.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class ReplicatorConnection implements Connection, LogFilter
+{
+ private static final Logger log
+ = Logger.getLogger(ReplicatorConnection.class.getName());
+
+ /** The maximum number of queued messages before we start dropping */
+ private static final int maxQueueLength;
+ /** The maximum number of times we go over maxQueueLength before we log a warning */
+ private static final int maxRetriesBeforeWarning = 10;
+ /** Count of how many times we have received a message while the queue is full */
+ private static int queueFullCount = 0;
+
+ static {
+ String maxQueue = System.getProperty("logserver.replicator.maxqueuelength",
+ "5000");
+ maxQueueLength = Integer.parseInt(maxQueue);
+ }
+
+ private final SocketChannel socket;
+ private final String remoteHost;
+ private final Listener listener;
+ private final Replicator replicator;
+ private final LinkedList<ByteBuffer> writeBufferList = new LinkedList<ByteBuffer>();
+ private ByteBuffer writeBuffer;
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(4096);
+ private LogFilter filter = null;
+ protected LogFormatter formatter = null;
+ private String filterName = "system.mute";
+ private String formatterName = "system.nullformatter";
+ private boolean droppingMode = false;
+ private int numHandled = 0;
+ private int numQueued = 0;
+ private int numDropped = 0;
+ private long totalBytesWritten = 0;
+
+ /**
+ * Constructs a ReplicatorConnection. Note that initially the
+ * filter of this connection is set to MuteFilter, which mutes
+ * all log messages.
+ *
+ */
+ public ReplicatorConnection (SocketChannel socket,
+ Listener listener,
+ Replicator replicator) {
+ this.socket = socket;
+ this.listener = listener;
+ this.replicator = replicator;
+ this.filter = LogFilterManager.getLogFilter(filterName);
+ this.formatter = LogFormatterManager.getLogFormatter(formatterName);
+
+ // this might take some time
+ remoteHost = socket.socket().getInetAddress().getHostName();
+
+ }
+
+ /**
+ * Returns the remote hostname of this replicator connection.
+ *
+ * @return Returns the remote host name
+ */
+ public String getRemoteHost() {
+ return remoteHost;
+ }
+
+ /**
+ * Check if the message is wanted by this particular replicator
+ * connection. The reason we provide this method is because we
+ * want to be able to determine of a message is wanted by any
+ * client before committing resources to creating a ByteBuffer to
+ * serialize it into.
+ *
+ * @param msg The log message offered
+ *
+ */
+ public boolean isLoggable (LogMessage msg) {
+ if (filter == null) {
+ return true;
+ }
+ return filter.isLoggable(msg);
+ }
+
+ /**
+ * Return the description of the currently active filter.
+ */
+ public String description () {
+ if (filter == null) {
+ return "No filter defined";
+ }
+ return filter.description();
+ }
+
+
+ /**
+ * Enqueues a ByteBuffer containing the message destined
+ * for the client.
+ *
+ * @param buffer the ByteBuffer into which the log message is
+ * serialized.
+ */
+ public synchronized void enqueue (ByteBuffer buffer) throws IOException {
+ if (writeBuffer == null) {
+ writeBuffer = buffer;
+ } else {
+ // if we've reached the max we bail out
+ if (writeBufferList.size() > maxQueueLength) {
+ queueFullCount++;
+ if (! droppingMode) {
+ droppingMode = true;
+ String message = "client at " + remoteHost + " can't keep up, dropping messages";
+ if (queueFullCount > maxRetriesBeforeWarning) {
+ log.log(LogLevel.WARNING, message);
+ queueFullCount = 0;
+ } else {
+ log.log(LogLevel.DEBUG, message);
+ }
+ }
+ numDropped++;
+ return;
+ }
+ writeBufferList.addLast(buffer);
+ listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true);
+ droppingMode = false;
+ numQueued++;
+ }
+ numHandled++;
+ write();
+ }
+
+
+ public void read () throws IOException {
+ if (! readBuffer.hasRemaining()) {
+ log.warning("Log message too long. Message exceeds "
+ + readBuffer.capacity()
+ + " bytes. Connection dropped.");
+ close();
+ return;
+ }
+
+
+ int ret = socket.read(readBuffer);
+ if (ret == -1) {
+ close();
+ return;
+ }
+
+ if (ret == 0) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.fine("zero byte read occurred");
+ }
+ }
+
+ readBuffer.flip();
+
+ String s;
+ while ((s = ReadLine.readLine(readBuffer)) != null) {
+ onCommand(s);
+ }
+
+ }
+
+ public synchronized void write () throws IOException {
+ if (! socket.isOpen()) {
+ // throw new IllegalStateException("SocketChannel not open in write()");
+ close();
+ }
+
+ int bytesWritten;
+ do {
+ // if writeBufferList is not set we need to fetch the next buffer
+ if (writeBuffer == null) {
+
+ // if the list is empty, signal the selector we do not need
+ // to do any writing for a while yet and bail
+ if (writeBufferList.isEmpty()) {
+ listener.modifyInterestOpsBatch(this,
+ SelectionKey.OP_WRITE,
+ false);
+ return;
+ }
+ writeBuffer = writeBufferList.removeFirst();
+ }
+
+
+ // invariants: we have a writeBuffer
+
+ // when the client drops off we actually need
+ // to handle that here and close the connection
+ // XXX: I am not sure why this works and the
+ // close method call on IOException in
+ // Listener doesn't. this should be investigated!
+ //
+ try {
+ bytesWritten = socket.write(writeBuffer);
+ }
+ catch (IOException e) {
+ close();
+ return;
+ }
+ totalBytesWritten += bytesWritten;
+
+ // buffer drained so we forget it and see what happens when we
+ // go around. if indeed we go around
+ if ((writeBuffer != null) && (!writeBuffer.hasRemaining())) {
+ writeBuffer = null;
+ }
+ } while (bytesWritten > 0);
+ }
+
+ public synchronized void close () throws IOException {
+ replicator.deRegisterConnection(this);
+ socket.close();
+ writeBuffer = null;
+ writeBufferList.clear();
+ log.log(LogLevel.DEBUG, "closing connection to " + remoteHost);
+ }
+
+ public int selectOps () {
+ return SelectionKey.OP_READ;
+ }
+
+ public SocketChannel socketChannel () {
+ return socket;
+ }
+
+ public void connect () {
+ }
+
+
+ // ========================================================
+ // ==== command processing
+ // ========================================================
+
+ void onCommand (String s) {
+ log.log(LogLevel.DEBUG, "COMMAND: '" + s + "' from " + remoteHost);
+ StringTokenizer st = new StringTokenizer(s.toLowerCase());
+ while (st.hasMoreTokens()) {
+ String tok = st.nextToken();
+ if ("ping".equals(tok)) {
+ if (st.hasMoreTokens()) {
+ print("# 202 pong " + st.nextToken() + "\n");
+ } else {
+ print("# 202 pong\n");
+ }
+ return;
+ }
+
+ if ("use".equals(tok)) {
+ if (st.hasMoreTokens()) {
+ onUse(st.nextToken());
+ }
+ return;
+ }
+
+ if ("formatter".equals(tok)) {
+ if (st.hasMoreTokens()) {
+ onFormatter(st.nextToken());
+ }
+ return;
+ }
+
+ if ("quit".equals(tok)) {
+ print("# 201 bye\n");
+ try { close(); } catch (IOException e) {e.printStackTrace();}
+ return;
+ }
+
+ if ("list".equals(tok)) {
+ onList();
+ return;
+ }
+
+ if ("listformatters".equals(tok)) {
+ onListFormatters();
+ return;
+ }
+
+ if ("stats".equals(tok)) {
+ onStats();
+ return;
+ }
+ }
+ }
+
+ void onFormatter (String formatterName) {
+ LogFormatter newFormatter = LogFormatterManager.getLogFormatter(formatterName);
+ if (newFormatter == null) {
+ print("# 405 formatter not found '" + formatterName + "'\n");
+ return;
+ }
+ formatter = newFormatter;
+ this.formatterName = formatterName;
+ print("# 202 using '" + formatter + "'\n");
+ }
+
+ void onUse (String filterName) {
+ LogFilter newFilter = LogFilterManager.getLogFilter(filterName);
+ if (newFilter == null) {
+ print("# 404 filter not found '" + filterName + "'\n");
+ return;
+ }
+ filter = newFilter;
+ this.filterName = filterName;
+ print("# 200 using '" + filter + "'\n");
+ }
+
+
+
+ void onList () {
+ print("# 203 filter list\n");
+ String filterNames[] = LogFilterManager.getFilterNames();
+ for (int i = 0; i < filterNames.length; i++) {
+ LogFilter f = LogFilterManager.getLogFilter(filterNames[i]);
+ print("# 204 " + filterNames[i] + " - " + f.description() + "\n");
+ }
+ print("# 205 end filter list\n");
+ }
+
+ void onListFormatters () {
+ print("# 206 formatter list\n");
+ String formatterNames[] = LogFormatterManager.getFormatterNames();
+ for (int i = 0; i < formatterNames.length; i++) {
+ LogFormatter fmt = LogFormatterManager.getLogFormatter(formatterNames[i]);
+ print("# 207 " + formatterNames[i] + " - " + fmt.description() + "\n");
+ }
+ print("# 208 end formatter list\n");
+ }
+
+ private void print (String s) {
+ try {
+ enqueue(IOUtils.utf8ByteBuffer(s));
+ }
+ catch (IOException e) {
+ log.log(LogLevel.WARNING, "error printing", e);
+ try {
+ close();
+ }
+ catch (IOException e2) {
+ // ignore
+ }
+ }
+ }
+
+ void onStats () {
+
+ print(new StringBuilder(80)
+ .append("# 206 stats start (this connection)\n")
+ .append("# 207 ").append(numHandled).append(" handled\n")
+ .append("# 208 ").append(numDropped).append(" dropped\n")
+ .append("# 209 ").append(numQueued)
+ .append(" handled and queued\n")
+ .append("# 210 ").append(totalBytesWritten)
+ .append(" total bytes written\n")
+ .append("# 211 stats end\n")
+ .toString()
+ );
+ }
+
+
+ public int getNumHandled () {
+ return numHandled;
+ }
+
+ public int getNumQueued () {
+ return numQueued;
+ }
+
+ public int getNumDropped () {
+ return numDropped;
+ }
+
+ public long getTotalBytesWritten () {
+ return totalBytesWritten;
+ }
+
+ public String getLogFilterName () {
+ return filterName;
+ }
+
+ void setFilter(LogFilter filter) {
+ this.filter = filter;
+ }
+
+}
+
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java
new file mode 100644
index 00000000000..03bc66c8262
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java
@@ -0,0 +1,58 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.handlers.replicator;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.logserver.Server;
+import com.yahoo.plugin.Config;
+import com.yahoo.plugin.Plugin;
+
+public class ReplicatorPlugin implements Plugin
+{
+ private static final String DEFAULT_PORT = "19083";
+ private static final Logger log =
+ Logger.getLogger(ReplicatorPlugin.class.getName());
+ private Replicator replicator;
+ private final Server server = Server.getInstance();
+
+ public String getPluginName () {
+ return "replicator";
+ }
+
+ /**
+ * Initialize the replicator plugin
+ */
+ public void initPlugin (Config config) {
+
+ if (replicator != null) {
+ throw new IllegalStateException(
+ "plugin already initialized: " + getPluginName());
+ }
+ int listenPort = config.getInt("port", DEFAULT_PORT);
+ String threadName = config.get("thread", getPluginName());
+ try {
+ replicator = new Replicator(listenPort);
+ }
+ catch (IOException e) {
+ log.log(LogLevel.WARNING, "init failed: " + e);
+ return;
+ }
+ server.registerLogHandler(replicator, threadName);
+ }
+
+ /**
+ * Shut down the replicator plugin.
+ */
+ public void shutdownPlugin () {
+
+ if (replicator == null) {
+ throw new IllegalStateException(
+ "plugin not initialized: " + getPluginName());
+ }
+ server.unregisterLogHandler(replicator);
+ replicator = null;
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/net/LogConnection.java b/logserver/src/main/java/com/yahoo/logserver/net/LogConnection.java
new file mode 100644
index 00000000000..1a48c5fb0f4
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/net/LogConnection.java
@@ -0,0 +1,272 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.net;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.logserver.LogDispatcher;
+import com.yahoo.log.LogMessage;
+import com.yahoo.log.InvalidLogFormatException;
+
+import com.yahoo.io.Connection;
+import com.yahoo.io.Listener;
+import com.yahoo.io.ReadLine;
+
+import com.yahoo.logserver.net.control.Levels;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+import java.util.logging.Level;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.HashSet;
+
+import java.nio.charset.Charset;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+
+/**
+ * TODO
+ * <UL>
+ * <LI> send invalid log messages to somewhere so they can be
+ * analyzed and errors can be corrected.
+ * </UL>
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+
+public class LogConnection implements Connection {
+ private static final Logger log = Logger.getLogger(LogConnection.class.getName());
+
+ public static final int READBUFFER_SIZE = (32 * 1024);
+
+ static private final Charset charset = Charset.forName("utf-8");
+
+ // the set of active connections
+ private static final Set<LogConnection> activeConnections = new HashSet<>();
+
+ private final SocketChannel socket;
+ private final Listener listener;
+ private final LogDispatcher dispatcher;
+
+ private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(READBUFFER_SIZE);
+
+ private final LinkedList<ByteBuffer> writeBufferList = new LinkedList<>();
+ private ByteBuffer writeBuffer;
+
+ // counters
+ private long totalBytesRead = 0;
+ private long totalBytesWritten = 0;
+
+ // default log levels for logd
+ final Levels defaultLevels;
+
+ public LogConnection (SocketChannel socket,
+ Listener listener,
+ LogDispatcher dispatcher,
+ Levels defaultLevels) {
+ this.socket = socket;
+ this.listener = listener;
+ this.dispatcher = dispatcher;
+ this.defaultLevels = defaultLevels;
+
+ addToActiveSet(this);
+
+ // send the "setdefaultstate" command to logd. no better
+ // place to put it for now.
+ sendDefaultState();
+ }
+
+ /**
+ * Send the default state to the
+ */
+ public void sendDefaultState () {
+ if (defaultLevels == null) {
+ return;
+ }
+
+ try {
+ enqueue(charset.encode("setdefaultstate "
+ + defaultLevels.toString()
+ + "\n"));
+ enqueue(charset.encode("setallstates "
+ + defaultLevels.toString()
+ + "\n"));
+ }
+ catch (IOException e) {
+ log.log(LogLevel.WARNING, "Unable to send default state", e);
+ }
+ }
+
+ /**
+ * Return a shallow copy of the set of active connections.
+ *
+ */
+ public static Set<LogConnection> getActiveConnections () {
+ synchronized(activeConnections) {
+ return new HashSet<>(activeConnections);
+ }
+ }
+
+ /**
+ * @return Return total number of bytes written to connection
+ */
+ public long getTotalBytesWritten () {
+ return totalBytesWritten;
+ }
+
+ /**
+ * @return Return total number of bytes read from connection
+ */
+ public long getTotalBytesRead () {
+ return totalBytesRead;
+ }
+
+ /**
+ * Internal method for adding connection to the set
+ * of active connections.
+ *
+ * @param connection The connection to be added
+ */
+ private static void addToActiveSet (LogConnection connection) {
+ synchronized(activeConnections) {
+ activeConnections.add(connection);
+ }
+ }
+
+ /**
+ * Internal method to remove connection from the set of
+ * active connections.
+ *
+ * @param connection The connection to remove
+ * @throws IllegalStateException if the connection does not
+ * exist in the set
+ *
+ */
+ private static void removeFromActiveSet (LogConnection connection) {
+ synchronized(activeConnections) {
+ activeConnections.remove(connection);
+ }
+ }
+
+ /**
+ *
+ */
+ public synchronized void enqueue (ByteBuffer buffer) throws IOException {
+ if (writeBuffer == null) {
+ writeBuffer = buffer;
+ } else {
+ writeBufferList.addLast(buffer);
+ listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true);
+ }
+ write();
+ }
+
+ public void connect () throws IOException {
+ throw new RuntimeException("connect() is not supposed to be called");
+ }
+
+ public synchronized void write () throws IOException {
+ int bytesWritten;
+ do {
+ // if writeBufferList is not set we need to fetch the next buffer
+ if (writeBuffer == null) {
+
+ // if the list is empty, signal the selector we do not need
+ // to do any writing for a while yet and bail
+ if (writeBufferList.isEmpty()) {
+ listener.modifyInterestOpsBatch(this,
+ SelectionKey.OP_WRITE,
+ false);
+ return;
+ }
+ writeBuffer = writeBufferList.removeFirst();
+ }
+
+ // invariants: we have a writeBuffer
+
+ bytesWritten = socket.write(writeBuffer);
+ totalBytesWritten += bytesWritten;
+
+ // buffer drained so we forget it and see what happens when we
+ // go around. if indeed we go around
+ if ((writeBuffer != null) && (!writeBuffer.hasRemaining())) {
+ writeBuffer = null;
+ }
+ } while (bytesWritten > 0);
+ }
+
+ public void read() throws IOException {
+ if (! readBuffer.hasRemaining()) {
+
+ try {
+ readBuffer.putChar(readBuffer.capacity() - 2, '\n');
+ readBuffer.flip();
+ String s = ReadLine.readLine(readBuffer);
+ if (s == null) {
+ return;
+ }
+ int count = 200;
+ log.log(LogLevel.WARNING, "Log message too long. Message from "
+ + socket.socket().getInetAddress() + " exceeds "
+ + readBuffer.capacity()
+ + ". Skipping buffer (might be part of same long message). Printing first " + count + " characters of line: " +
+ s.substring(0, count));
+
+ LogMessage msg = LogMessage.parseNativeFormat(s);
+ dispatcher.handle(msg);
+ }
+ catch (InvalidLogFormatException e) {
+ log.log(LogLevel.DEBUG, "Invalid log message", e);
+ }
+ finally {
+ readBuffer.clear();
+ }
+ return;
+ }
+
+ int ret = socket.read(readBuffer);
+ if (ret == -1) {
+ close();
+ return;
+ }
+
+ if (ret == 0) {
+ if (log.isLoggable(Level.FINE)) {
+ log.log(LogLevel.DEBUG, "zero byte read occurred");
+ }
+ }
+
+ // update global counter
+ totalBytesRead += ret;
+
+ readBuffer.flip();
+
+ String s;
+ while ((s = ReadLine.readLine(readBuffer)) != null) {
+ try {
+ LogMessage msg = LogMessage.parseNativeFormat(s);
+ dispatcher.handle(msg);
+ }
+ catch (InvalidLogFormatException e) {
+ log.log(LogLevel.DEBUG, "Invalid log message", e);
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if (log.isLoggable(Level.FINE)) {
+ log.log(LogLevel.INFO, this + ": closing");
+ }
+ socket.close();
+ removeFromActiveSet(this);
+ }
+
+ public int selectOps() {
+ return SelectionKey.OP_READ;
+ }
+
+ public SocketChannel socketChannel() {
+ return socket;
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/net/LogConnectionFactory.java b/logserver/src/main/java/com/yahoo/logserver/net/LogConnectionFactory.java
new file mode 100644
index 00000000000..5d97420f30a
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/net/LogConnectionFactory.java
@@ -0,0 +1,41 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.net;
+
+import com.yahoo.logserver.LogDispatcher;
+import com.yahoo.io.Connection;
+import com.yahoo.io.Listener;
+import com.yahoo.io.ConnectionFactory;
+
+import com.yahoo.logserver.net.control.Levels;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+
+public class LogConnectionFactory implements ConnectionFactory
+{
+ private static final Logger log
+ = Logger.getLogger(LogConnectionFactory.class.getName());
+
+ final LogDispatcher dispatcher;
+ final Levels defaultLogLevels;
+
+ public LogConnectionFactory (LogDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ defaultLogLevels = Levels.parse(System.getProperty("logserver.default.loglevels", ""));
+ }
+
+ public Connection newConnection (SocketChannel socket, Listener listener) {
+ if (log.isLoggable(Level.FINE)) {
+ log.fine("New connection: " + socket);
+ }
+ return new LogConnection(socket,
+ listener,
+ dispatcher,
+ (Levels)defaultLogLevels.clone());
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/net/control/Levels.java b/logserver/src/main/java/com/yahoo/logserver/net/control/Levels.java
new file mode 100644
index 00000000000..c9b749f4417
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/net/control/Levels.java
@@ -0,0 +1,125 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.net.control;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This class is used to represent the state of each log level
+ * in a set of states.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class Levels implements Cloneable {
+ private final Map<String, State> levelsMap = new LinkedHashMap<String, State>(10);
+
+ /**
+ * The constructor initializes the Levels object to its default
+ * state.
+ */
+ public Levels () {
+ levelsMap.put("event", State.FORWARD);
+ levelsMap.put("fatal", State.FORWARD);
+ levelsMap.put("error", State.FORWARD);
+ levelsMap.put("warning", State.FORWARD);
+ levelsMap.put("info", State.FORWARD);
+ levelsMap.put("config", State.FORWARD);
+ levelsMap.put("debug", State.NOFORWARD);
+ levelsMap.put("spam", State.NOFORWARD);
+ }
+
+ /**
+ * Parse a levels representation and return a Levels object
+ * representing the levels.
+ *
+ * @param levels A string representation of the levels
+ * @return new instance of Levels, possibly having no
+ * real values if none could be found in the
+ * <code>levels</code> parameter.
+ *
+ */
+ public static Levels parse (String levels) {
+ return (new Levels()).updateLevels(levels);
+ }
+
+
+ /**
+ * Update the levels given a string representation of the state;
+ * the levels mentioned here will be updated, the ones omitted
+ * will retain their state as before the function call.
+ *
+ * @param levels string representation of levels
+ *
+ */
+ public Levels updateLevels (String levels) {
+ String[] parts = levels.split(",");
+ if (parts.length < 1) {
+ return this;
+ }
+
+ for (int i = 0; i < parts.length; i++) {
+ String pair = parts[i];
+ int offset = pair.indexOf('=');
+ if (offset != -1) {
+ String name = pair.substring(0,offset).trim().toLowerCase();
+ String value = pair.substring(offset+1).trim().toLowerCase();
+ setLevelState(name, State.parse(value));
+ }
+ }
+ return this;
+ }
+
+
+ /**
+ * Set the state of a given level.
+ *
+ * @param level name of the level
+ * @param state the state
+ * @return returns reference to <code>this</code> for chaning
+ */
+ public Levels setLevelState(String level, State state) {
+ levelsMap.put(level, state);
+ return this;
+ }
+
+ /**
+ * Get the state of a given level.
+ *
+ */
+ public State getLevelState (String level) {
+ State s = levelsMap.get(level);
+ if (s == null) {
+ return State.UNKNOWN;
+ }
+ return s;
+ }
+
+ /**
+ * For equivalent configurations the toString method should
+ * emit equal strings.
+ *
+ */
+ public String toString () {
+ StringBuilder sbuf = new StringBuilder(80);
+ boolean first = true;
+ for (Map.Entry<String, State> me : levelsMap.entrySet()) {
+ // commas between
+ if (!first) {
+ sbuf.append(',');
+ } else {
+ first = false;
+ }
+ sbuf.append(me.getKey())
+ .append('=')
+ .append(me.getValue());
+ }
+
+ return sbuf.toString();
+ }
+
+ public Object clone() {
+ // quick and dirty, but easily verifiable to be correct
+ return parse(this.toString());
+ }
+
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/net/control/State.java b/logserver/src/main/java/com/yahoo/logserver/net/control/State.java
new file mode 100644
index 00000000000..5d2a4f0fbd4
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/net/control/State.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.net.control;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * This value class represents the logging state of a component.
+ * the valid states are:
+ *
+ * <UL>
+ * <LI> forward - store locally and send to log server
+ * <LI> store - store locally only
+ * <LI> noforward - do not send to logserver
+ * <LI> off - do not generate the message in the program
+ * </UL>
+ *
+ * XXX This does not appear to be in use.
+ */
+public class State {
+ private static final Map<String, State> nameToState = new HashMap<String, State>();
+
+ public static final State FORWARD = new State("forward");
+ public static final State NOFORWARD = new State("noforward");
+ // public static final State STORE = new State("store");
+ // public static final State OFF = new State("off");
+ public static final State UNKNOWN = new State("unknown");
+
+ private String name;
+
+ /**
+ * Typesafe enum. Only able to instantiate self.
+ * TODO: Rewrite to enum
+ */
+ private State () {}
+
+ /**
+ * Creates state with given name
+ */
+ private State (String name) {
+ this.name = name;
+ synchronized (State.class) {
+ nameToState.put(name, this);
+ }
+ }
+
+ public static State parse (String s) {
+ return nameToState.containsKey(s) ? nameToState.get(s) : UNKNOWN;
+ }
+
+ public String toString () {
+ return name;
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/logserver/testutils/VerifyLogfile.java b/logserver/src/main/java/com/yahoo/logserver/testutils/VerifyLogfile.java
new file mode 100644
index 00000000000..623bf04aa36
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/logserver/testutils/VerifyLogfile.java
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.logserver.testutils;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.log.event.MalformedEventException;
+import com.yahoo.log.InvalidLogFormatException;
+import com.yahoo.log.LogMessage;
+
+/**
+ * This utility is used to check that the log messages contained
+ * in a log file are correct. Any incorrectly formatted log
+ * message is output to stdout.
+ *
+ * @author <a href="mailto:borud@yahoo-inc.com">Bjorn Borud</a>
+ */
+public class VerifyLogfile {
+
+ public static void main (String[] args) throws IOException {
+ int messages = 0;
+ int events = 0;
+ int invalidLogMessages = 0;
+ int invalidEvents = 0;
+ int numFiles = 0;
+
+ if (args.length < 1) {
+ System.err.println("\nPlease provide name of log file(s)\n");
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ BufferedReader br = new BufferedReader(new FileReader(args[i]));
+ numFiles++;
+ for (String line = br.readLine();
+ line != null;
+ line = br.readLine())
+ {
+ messages++;
+ LogMessage m;
+ try {
+ m = LogMessage.parseNativeFormat(line);
+ if (m.getLevel() == LogLevel.EVENT) {
+ events++;
+ m.getEvent();
+ }
+ } catch (MalformedEventException e) {
+ System.out.println("EVENT\t" + line);
+ invalidEvents++;
+ } catch (InvalidLogFormatException e) {
+ System.out.println("MESSAGE\t" + line);
+ invalidLogMessages++;
+ }
+ }
+ br.close();
+ }
+
+ System.err.println("numFiles: " + numFiles);
+ System.err.println("messages: " + messages);
+ System.err.println("events: " + events);
+ System.err.println("invalidLogMessages: " + invalidLogMessages);
+ System.err.println("invalidEvents: " + invalidEvents);
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/plugin/Config.java b/logserver/src/main/java/com/yahoo/plugin/Config.java
new file mode 100644
index 00000000000..a7ea2b44caf
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/plugin/Config.java
@@ -0,0 +1,23 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.plugin;
+
+/**
+ * This interface specifies an API for configuring runtime-loadable
+ * server plugins.
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public abstract class Config
+{
+ /**
+ * @return a config value for the specified key
+ */
+ public abstract String get(String key, String defaultValue);
+
+ /**
+ * @return a config value as an integer
+ */
+ public int getInt(String key, String defaultValue) {
+ return Integer.parseInt(get(key, defaultValue));
+ }
+}
diff --git a/logserver/src/main/java/com/yahoo/plugin/Plugin.java b/logserver/src/main/java/com/yahoo/plugin/Plugin.java
new file mode 100644
index 00000000000..e53bc3a1faa
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/plugin/Plugin.java
@@ -0,0 +1,33 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.plugin;
+
+/**
+ * This interface specifies an API for runtime-loadable server
+ * plugins. The interface is deliberately simple to allow it to be
+ * used in different servers. Typically, the initPlugin() method
+ * calls application-specific registration methods to connect the
+ * plugin to the hosting application.
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public interface Plugin
+{
+ /**
+ * @return a unique and simple name for the plugin
+ */
+ public String getPluginName();
+
+ /**
+ * Initialize the plugin.
+ *
+ * @param config Config object for this plugin
+ */
+ public void initPlugin(Config config);
+
+ /**
+ * Shut down the plugin. Must clean up all resources allocated by
+ * initPlugin() or any of the handler methods.
+ */
+ public void shutdownPlugin();
+
+}
diff --git a/logserver/src/main/java/com/yahoo/plugin/SystemPropertyConfig.java b/logserver/src/main/java/com/yahoo/plugin/SystemPropertyConfig.java
new file mode 100644
index 00000000000..763dc2d1f3e
--- /dev/null
+++ b/logserver/src/main/java/com/yahoo/plugin/SystemPropertyConfig.java
@@ -0,0 +1,35 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.plugin;
+
+/**
+ * This class implements plugin config through system properties.
+ * Each plugin typically has its own system property prefix, such as
+ * "logserver.archiver.". A request for the config key "foo" will
+ * then return the contents of the "logserver.archiver.foo" system
+ * property.
+ *
+ * @author <a href="mailto:stig@yahoo-inc.com">Stig Bakken</a>
+ */
+public class SystemPropertyConfig extends Config
+{
+ private final String prefix;
+
+ /**
+ * @param prefix Prefix string prepended to config keys
+ * as they are looked up as system properties.
+ */
+ public SystemPropertyConfig(String prefix) {
+ this.prefix = prefix;
+ }
+
+ /**
+ * @return a config value for the specified key
+ */
+ public String get(String key, String defaultValue) {
+ return System.getProperty(prefix + key, defaultValue);
+ }
+
+ public String toString () {
+ return "Prefix=" + prefix;
+ }
+}