aboutsummaryrefslogtreecommitdiffstats
path: root/logserver/src/main/java/com/yahoo
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2019-03-20 11:28:51 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2019-03-20 13:15:39 +0100
commitcddf79bc63fdb5213eed6a51e5fc6c7200539e0f (patch)
tree7a5a6f46a15afa37abf7c8a79903cf4ae0c26bfd /logserver/src/main/java/com/yahoo
parentb5ffed4ce1dcad6a3c1e206c4357f95740ad8e7b (diff)
Remove replicator plugin from logserver
Diffstat (limited to 'logserver/src/main/java/com/yahoo')
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java6
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java77
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java125
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java407
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java55
5 files changed, 2 insertions, 668 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java b/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java
index 22453528777..32427604bd6 100644
--- a/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java
+++ b/logserver/src/main/java/com/yahoo/logserver/BuiltinPluginLoader.java
@@ -1,12 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.logserver;
-import java.util.logging.Logger;
-
import com.yahoo.log.LogLevel;
import com.yahoo.logserver.handlers.archive.ArchiverPlugin;
import com.yahoo.logserver.handlers.logmetrics.LogMetricsPlugin;
-import com.yahoo.logserver.handlers.replicator.ReplicatorPlugin;
+
+import java.util.logging.Logger;
/**
* Load a set of builtin plugins
@@ -21,7 +20,6 @@ public class BuiltinPluginLoader extends AbstractPluginLoader {
log.log(LogLevel.DEBUG, "starting to load builtin plugins");
loadFromClass(ArchiverPlugin.class);
- loadFromClass(ReplicatorPlugin.class);
loadFromClass(LogMetricsPlugin.class);
log.log(LogLevel.DEBUG, "done loading builtin plugins");
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java
deleted file mode 100644
index 483e875e03a..00000000000
--- a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/FormattedBufferCache.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.logserver.handlers.replicator;
-
-import java.util.Map;
-import java.util.IdentityHashMap;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import com.yahoo.log.LogMessage;
-import com.yahoo.logserver.formatter.LogFormatter;
-import com.yahoo.logserver.formatter.LogFormatterManager;
-
-/**
- * This class is used to cache log messages that have been
- * formatted into ByteBuffers. The purpose of this class
- * is to make it easier to support multiple message formats while
- * still ensuring we don't format more messages than we strictly need
- * to and that we don't keep around more buffers that we ought to.
- * <p>
- * This is not a general purpose class, I think, so please
- * refer to the source code of the Replicator class for
- * information on how to use this.
- * <p>
- * This class is not threadsafe.
- *
- * @author Bjorn Borud
- */
-public class FormattedBufferCache {
- // the documentation says " All of the methods defined in this
- // class are safe for use by multiple concurrent threads." so
- // we have only one instance of the charset for this class.
- //
- static private final Charset charset = Charset.forName("utf-8");
-
- private final IdentityHashMap<LogFormatter, ByteBuffer> buffers;
-
- public FormattedBufferCache() {
- // hope this is a good hash size
- int initialSize = LogFormatterManager.getFormatterNames().length * 2;
- buffers = new IdentityHashMap<LogFormatter, ByteBuffer>(initialSize);
- }
-
- /**
- * Return a ByteBuffer slice of a buffer containing the
- * LogMessage formatted by the LogFormatter. If one didn't
- * exist in the cache from before, it will after this
- * method returns.
- *
- * @param msg The log message you wish to format
- * @param formatter The log formatter you wish to use for formatting
- * @return Returns a ByteBuffer slice
- */
- public ByteBuffer getFormatted(LogMessage msg, LogFormatter formatter) {
- ByteBuffer bb = buffers.get(formatter);
- if (bb == null) {
- bb = charset.encode(formatter.format(msg));
- buffers.put(formatter, bb);
- }
- return bb.slice();
- }
-
- /**
- * After we're done distributing the log message to all the
- * clients we clear the cache so we are ready for the next
- * message.
- */
- public void reset() {
- buffers.clear();
- }
-
- /**
- * This is here for test purposes. Don't get any bright ideas.
- */
- public Map<LogFormatter, ByteBuffer> getUnderlyingMapOnlyForTesting() {
- return buffers;
- }
-}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java
deleted file mode 100644
index d9cb2c82a49..00000000000
--- a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/Replicator.java
+++ /dev/null
@@ -1,125 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.logserver.handlers.replicator;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Logger;
-
-import com.yahoo.io.Connection;
-import com.yahoo.io.ConnectionFactory;
-import com.yahoo.io.Listener;
-import com.yahoo.log.LogLevel;
-import com.yahoo.log.LogMessage;
-import com.yahoo.logserver.handlers.AbstractLogHandler;
-
-/**
- * The Replicator plugin is used for replicating log messages sent
- * to the logserver.
- * <p>
- * Per default the replicator will start dropping messages enqueued
- * to a client if the outbound message queue reaches 5000 messages.
- * This limit can be configured by setting the system property
- * <code>logserver.replicator.maxqueuelength</code> to the desired
- * value.
- *
- * @author Bjorn Borud
- */
-public class Replicator extends AbstractLogHandler implements ConnectionFactory {
- private static final Logger log = Logger.getLogger(Replicator.class.getName());
-
- private int port;
- private Listener listener;
- private final Set<ReplicatorConnection> connections = new HashSet<ReplicatorConnection>();
- private final FormattedBufferCache bufferCache = new FormattedBufferCache();
-
- /**
- * @param port The port to which the replicator listens.
- */
- public Replicator(int port) throws IOException {
- this.port = port;
- listen(port);
- }
-
- public Replicator() {
- }
-
- public void listen(int port) throws IOException {
- if (listener != null) {
- throw new IllegalStateException("already listening to port " + this.port);
- }
- listener = new Listener("replicator");
- listener.listen(this, port);
- listener.start();
- log.log(LogLevel.CONFIG, "port=" + port);
- }
-
- public synchronized boolean doHandle(LogMessage msg) {
- boolean logged = false;
- bufferCache.reset();
- for (ReplicatorConnection c : connections) {
- try {
- if (c.isLoggable(msg)) {
- c.enqueue(bufferCache.getFormatted(msg, c.formatter));
- logged = true;
- }
- } catch (IOException e) {
- log.log(LogLevel.DEBUG, "Writing failed", e);
- }
- }
- return logged;
- }
-
- public void close() {
- // kill the listener thread, then wait for it to
- // shut down.
- try {
- listener.interrupt();
- listener.join();
- log.log(LogLevel.DEBUG, "Replicator listener stopped");
- } catch (InterruptedException e) {
- log.log(LogLevel.WARNING,
- "Replicator listener was interrupted",
- e);
- }
- }
-
- /**
- * Currently a NOP, but we might want to have some best-effort
- * mechanism for trying to flush all connections within some
- * time-frame.
- */
- public void flush() {}
-
- /**
- * Factory method for wrapping new connections in the proper
- * (Replicator)Connection objects.
- *
- * @param socket The new SocketChannel
- * @param listener The Listener instance we want to use
- */
- public synchronized Connection newConnection(SocketChannel socket,
- Listener listener) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.fine("New replicator connection: " + socket);
- }
- ReplicatorConnection n =
- new ReplicatorConnection(socket, listener, this);
- connections.add(n);
- return n;
- }
-
- /**
- * Removes a ReplicatorConnection from the set of active
- * connections.
- */
- protected synchronized void deRegisterConnection(ReplicatorConnection conn) {
- connections.remove(conn);
- }
-
- public String toString() {
- return Replicator.class.getName();
- }
-
-}
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java
deleted file mode 100644
index 7393257edef..00000000000
--- a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorConnection.java
+++ /dev/null
@@ -1,407 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.logserver.handlers.replicator;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.LinkedList;
-import java.util.StringTokenizer;
-import java.util.logging.Logger;
-
-import com.yahoo.io.Connection;
-import com.yahoo.io.IOUtils;
-import com.yahoo.io.Listener;
-import com.yahoo.io.ReadLine;
-import com.yahoo.log.LogLevel;
-import com.yahoo.log.LogMessage;
-import com.yahoo.logserver.filter.LogFilter;
-import com.yahoo.logserver.filter.LogFilterManager;
-import com.yahoo.logserver.formatter.LogFormatter;
-import com.yahoo.logserver.formatter.LogFormatterManager;
-
-/**
- * Replication client connection.
- *
- * @author Bjorn Borud
- */
-public class ReplicatorConnection implements Connection, LogFilter {
- private static final Logger log = Logger.getLogger(ReplicatorConnection.class.getName());
-
- /**
- * The maximum number of queued messages before we start dropping
- */
- private static final int maxQueueLength;
- /**
- * The maximum number of times we go over maxQueueLength before we log a warning
- */
- private static final int maxRetriesBeforeWarning = 10;
- /**
- * Count of how many times we have received a message while the queue is full
- */
- private static int queueFullCount = 0;
-
- static {
- String maxQueue = System.getProperty("logserver.replicator.maxqueuelength",
- "5000");
- maxQueueLength = Integer.parseInt(maxQueue);
- }
-
- private final SocketChannel socket;
- private final String remoteHost;
- private final Listener listener;
- private final Replicator replicator;
- private final LinkedList<ByteBuffer> writeBufferList = new LinkedList<ByteBuffer>();
- private ByteBuffer writeBuffer;
- private final ByteBuffer readBuffer = ByteBuffer.allocate(4096);
- private LogFilter filter = null;
- protected LogFormatter formatter = null;
- private String filterName = "system.mute";
- private String formatterName = "system.nullformatter";
- private boolean droppingMode = false;
- private int numHandled = 0;
- private int numQueued = 0;
- private int numDropped = 0;
- private long totalBytesWritten = 0;
-
- /**
- * Constructs a ReplicatorConnection. Note that initially the
- * filter of this connection is set to MuteFilter, which mutes
- * all log messages.
- */
- public ReplicatorConnection(SocketChannel socket, Listener listener, Replicator replicator) {
- this.socket = socket;
- this.listener = listener;
- this.replicator = replicator;
- this.filter = LogFilterManager.getLogFilter(filterName);
- this.formatter = LogFormatterManager.getLogFormatter(formatterName);
-
- // this might take some time
- remoteHost = socket.socket().getInetAddress().getHostName();
-
- }
-
- /**
- * Returns the remote hostname of this replicator connection.
- *
- * @return Returns the remote host name
- */
- public String getRemoteHost() {
- return remoteHost;
- }
-
- /**
- * Check if the message is wanted by this particular replicator
- * connection. The reason we provide this method is because we
- * want to be able to determine of a message is wanted by any
- * client before committing resources to creating a ByteBuffer to
- * serialize it into.
- *
- * @param msg The log message offered
- */
- public boolean isLoggable(LogMessage msg) {
- if (filter == null) {
- return true;
- }
- return filter.isLoggable(msg);
- }
-
- /**
- * Return the description of the currently active filter.
- */
- public String description() {
- if (filter == null) {
- return "No filter defined";
- }
- return filter.description();
- }
-
-
- /**
- * Enqueues a ByteBuffer containing the message destined
- * for the client.
- *
- * @param buffer the ByteBuffer into which the log message is
- * serialized.
- */
- public synchronized void enqueue(ByteBuffer buffer) throws IOException {
- if (writeBuffer == null) {
- writeBuffer = buffer;
- } else {
- // if we've reached the max we bail out
- if (writeBufferList.size() > maxQueueLength) {
- queueFullCount++;
- if (! droppingMode) {
- droppingMode = true;
- String message = "client at " + remoteHost + " can't keep up, dropping messages";
- if (queueFullCount > maxRetriesBeforeWarning) {
- log.log(LogLevel.WARNING, message);
- queueFullCount = 0;
- } else {
- log.log(LogLevel.DEBUG, message);
- }
- }
- numDropped++;
- return;
- }
- writeBufferList.addLast(buffer);
- listener.modifyInterestOps(this, SelectionKey.OP_WRITE, true);
- droppingMode = false;
- numQueued++;
- }
- numHandled++;
- write();
- }
-
-
- public void read() throws IOException {
- if (! readBuffer.hasRemaining()) {
- log.warning("Log message too long. Message exceeds "
- + readBuffer.capacity()
- + " bytes. Connection dropped.");
- close();
- return;
- }
-
-
- int ret = socket.read(readBuffer);
- if (ret == - 1) {
- close();
- return;
- }
-
- if (ret == 0) {
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.fine("zero byte read occurred");
- }
- }
-
- readBuffer.flip();
-
- String s;
- while ((s = ReadLine.readLine(readBuffer)) != null) {
- onCommand(s);
- }
-
- }
-
- public synchronized void write() throws IOException {
- if (! socket.isOpen()) {
- // throw new IllegalStateException("SocketChannel not open in write()");
- close();
- }
-
- int bytesWritten;
- do {
- // if writeBufferList is not set we need to fetch the next buffer
- if (writeBuffer == null) {
-
- // if the list is empty, signal the selector we do not need
- // to do any writing for a while yet and bail
- if (writeBufferList.isEmpty()) {
- listener.modifyInterestOpsBatch(this,
- SelectionKey.OP_WRITE,
- false);
- return;
- }
- writeBuffer = writeBufferList.removeFirst();
- }
-
-
- // invariants: we have a writeBuffer
-
- // when the client drops off we actually need
- // to handle that here and close the connection
- // XXX: I am not sure why this works and the
- // close method call on IOException in
- // Listener doesn't. this should be investigated!
- //
- try {
- bytesWritten = socket.write(writeBuffer);
- } catch (IOException e) {
- close();
- return;
- }
- totalBytesWritten += bytesWritten;
-
- // buffer drained so we forget it and see what happens when we
- // go around. if indeed we go around
- if ((writeBuffer != null) && (! writeBuffer.hasRemaining())) {
- writeBuffer = null;
- }
- } while (bytesWritten > 0);
- }
-
- public synchronized void close() throws IOException {
- replicator.deRegisterConnection(this);
- socket.close();
- writeBuffer = null;
- writeBufferList.clear();
- log.log(LogLevel.DEBUG, "closing connection to " + remoteHost);
- }
-
- public int selectOps() {
- return SelectionKey.OP_READ;
- }
-
- public SocketChannel socketChannel() {
- return socket;
- }
-
- public void connect() {
- }
-
-
- // ========================================================
- // ==== command processing
- // ========================================================
-
- void onCommand(String s) {
- log.log(LogLevel.DEBUG, "COMMAND: '" + s + "' from " + remoteHost);
- StringTokenizer st = new StringTokenizer(s.toLowerCase());
- while (st.hasMoreTokens()) {
- String tok = st.nextToken();
- if ("ping".equals(tok)) {
- if (st.hasMoreTokens()) {
- print("# 202 pong " + st.nextToken() + "\n");
- } else {
- print("# 202 pong\n");
- }
- return;
- }
-
- if ("use".equals(tok)) {
- if (st.hasMoreTokens()) {
- onUse(st.nextToken());
- }
- return;
- }
-
- if ("formatter".equals(tok)) {
- if (st.hasMoreTokens()) {
- onFormatter(st.nextToken());
- }
- return;
- }
-
- if ("quit".equals(tok)) {
- print("# 201 bye\n");
- try { close(); } catch (IOException e) {e.printStackTrace();}
- return;
- }
-
- if ("list".equals(tok)) {
- onList();
- return;
- }
-
- if ("listformatters".equals(tok)) {
- onListFormatters();
- return;
- }
-
- if ("stats".equals(tok)) {
- onStats();
- return;
- }
- }
- }
-
- void onFormatter(String formatterName) {
- LogFormatter newFormatter = LogFormatterManager.getLogFormatter(formatterName);
- if (newFormatter == null) {
- print("# 405 formatter not found '" + formatterName + "'\n");
- return;
- }
- formatter = newFormatter;
- this.formatterName = formatterName;
- print("# 202 using '" + formatter + "'\n");
- }
-
- void onUse(String filterName) {
- LogFilter newFilter = LogFilterManager.getLogFilter(filterName);
- if (newFilter == null) {
- print("# 404 filter not found '" + filterName + "'\n");
- return;
- }
- filter = newFilter;
- this.filterName = filterName;
- print("# 200 using '" + filter + "'\n");
- }
-
-
- void onList() {
- print("# 203 filter list\n");
- String filterNames[] = LogFilterManager.getFilterNames();
- for (int i = 0; i < filterNames.length; i++) {
- LogFilter f = LogFilterManager.getLogFilter(filterNames[i]);
- print("# 204 " + filterNames[i] + " - " + f.description() + "\n");
- }
- print("# 205 end filter list\n");
- }
-
- void onListFormatters() {
- print("# 206 formatter list\n");
- String formatterNames[] = LogFormatterManager.getFormatterNames();
- for (int i = 0; i < formatterNames.length; i++) {
- LogFormatter fmt = LogFormatterManager.getLogFormatter(formatterNames[i]);
- print("# 207 " + formatterNames[i] + " - " + fmt.description() + "\n");
- }
- print("# 208 end formatter list\n");
- }
-
- private void print(String s) {
- try {
- enqueue(IOUtils.utf8ByteBuffer(s));
- } catch (IOException e) {
- log.log(LogLevel.WARNING, "error printing", e);
- try {
- close();
- } catch (IOException e2) {
- // ignore
- }
- }
- }
-
- void onStats() {
-
- print(new StringBuilder(80)
- .append("# 206 stats start (this connection)\n")
- .append("# 207 ").append(numHandled).append(" handled\n")
- .append("# 208 ").append(numDropped).append(" dropped\n")
- .append("# 209 ").append(numQueued)
- .append(" handled and queued\n")
- .append("# 210 ").append(totalBytesWritten)
- .append(" total bytes written\n")
- .append("# 211 stats end\n")
- .toString()
- );
- }
-
-
- public int getNumHandled() {
- return numHandled;
- }
-
- public int getNumQueued() {
- return numQueued;
- }
-
- public int getNumDropped() {
- return numDropped;
- }
-
- public long getTotalBytesWritten() {
- return totalBytesWritten;
- }
-
- public String getLogFilterName() {
- return filterName;
- }
-
- void setFilter(LogFilter filter) {
- this.filter = filter;
- }
-
-}
-
diff --git a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java b/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java
deleted file mode 100644
index 0aa976c0361..00000000000
--- a/logserver/src/main/java/com/yahoo/logserver/handlers/replicator/ReplicatorPlugin.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.logserver.handlers.replicator;
-
-import java.io.IOException;
-import java.util.logging.Logger;
-
-import com.yahoo.log.LogLevel;
-import com.yahoo.logserver.Server;
-import com.yahoo.plugin.Config;
-import com.yahoo.plugin.Plugin;
-
-public class ReplicatorPlugin implements Plugin {
- private static final String DEFAULT_PORT = "19083";
- private static final Logger log = Logger.getLogger(ReplicatorPlugin.class.getName());
-
- private Replicator replicator;
- private final Server server = Server.getInstance();
-
- public String getPluginName() {
- return "replicator";
- }
-
- /**
- * Initialize the replicator plugin
- */
- public void initPlugin(Config config) {
- if (replicator != null) {
- throw new IllegalStateException(
- "plugin already initialized: " + getPluginName());
- }
- int listenPort = config.getInt("port", DEFAULT_PORT);
- String threadName = config.get("thread", getPluginName());
- try {
- replicator = new Replicator(listenPort);
- } catch (IOException e) {
- log.log(LogLevel.WARNING, "init failed: " + e);
- return;
- }
- server.registerLogHandler(replicator, threadName);
- }
-
- /**
- * Shut down the replicator plugin.
- */
- public void shutdownPlugin() {
-
- if (replicator == null) {
- throw new IllegalStateException(
- "plugin not initialized: " + getPluginName());
- }
- server.unregisterLogHandler(replicator);
- replicator = null;
- }
-
-}