diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-02-12 15:05:57 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-02-12 15:05:57 +0100 |
commit | 16fd92c6221ef0233bd176e7ce8d8b85e8588e59 (patch) | |
tree | 5e90831c2641bc3b024dcd7f3dd529fc85559c75 /logserver | |
parent | 7d8fdd685c3fedb2b263b356c67f84d92a2a6f89 (diff) |
1 - Avoid concurrent access to 'handlers' array by using a CopyOnWriteArrayList.
2 - Reduce synchronizations, but make it consistent by using Atomic primitives.
Diffstat (limited to 'logserver')
-rw-r--r-- | logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java | 118 |
1 files changed, 65 insertions, 53 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java index 26abdd43815..7b22e744083 100644 --- a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java +++ b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java @@ -3,6 +3,9 @@ package com.yahoo.logserver; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import com.yahoo.io.SelectLoopHook; @@ -20,18 +23,14 @@ import com.yahoo.logserver.handlers.LogHandler; public class LogDispatcher implements LogHandler, SelectLoopHook { private static final Logger log = Logger.getLogger(LogDispatcher.class.getName()); - private final List<LogHandler> handlers = new ArrayList<>(); - private int messageCount = 0; - private boolean hasBeenShutDown = false; - private boolean batchedMode = false; + private final List<LogHandler> handlers = new CopyOnWriteArrayList<>(); + private final AtomicInteger messageCount = new AtomicInteger(0); + private final AtomicBoolean batchedMode = new AtomicBoolean(false); private final int batchSize = 5000; - private List<LogMessage> currentBatchList; - private int roundCount = 0; - @SuppressWarnings("unused") - private int lastRoundCount = 0; + private final AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); + private List<LogMessage> currentBatchList = null; - public LogDispatcher() { - } + public LogDispatcher() { } /** * Dispatches a message to all the LogHandler instances we've @@ -41,47 +40,53 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * @param msg The LogMessage instance we wish to dispatch to the * plugins */ - public synchronized void handle(LogMessage msg) { + public void handle(LogMessage msg) { if (msg == null) { throw new NullPointerException("LogMessage was null"); } - if (batchedMode) { + if (batchedMode.get()) { addToBatch(msg); } else { - for (LogHandler h : handlers) { - h.handle(msg); - } + send(msg); } - messageCount++; + messageCount.incrementAndGet(); } private void addToBatch(LogMessage msg) { - if (currentBatchList == null) { - currentBatchList = new ArrayList<LogMessage>(batchSize); - currentBatchList.add(msg); - return; - } + List<LogMessage> toSend = null; + synchronized (this) { + if (currentBatchList == null) { + currentBatchList = new ArrayList<LogMessage>(batchSize); + currentBatchList.add(msg); + return; + } - currentBatchList.add(msg); + currentBatchList.add(msg); - if (currentBatchList.size() == batchSize) { - flushBatch(); + if (currentBatchList.size() == batchSize) { + toSend = stealBatch(); + } } + flushBatch(toSend); } - private void flushBatch() { - List<LogMessage> todo; - synchronized(this) { - todo = currentBatchList; - currentBatchList = null; + private void send(List<LogMessage> messages) { + for (LogHandler ht : handlers) { + ht.handle(messages); } - if (todo == null) return; + } + private void send(LogMessage message) { for (LogHandler ht : handlers) { - ht.handle(todo); + ht.handle(message); } } + private void flushBatch(List<LogMessage> todo) { + if (todo == null) { return; } + send(todo); + } + public void handle(List<LogMessage> messages) { throw new IllegalStateException("method not supported"); } @@ -94,12 +99,20 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * but lists of same. */ public void setBatchedMode(boolean batchedMode) { - this.batchedMode = batchedMode; + this.batchedMode.set(batchedMode); } - public synchronized void flush() { - if (batchedMode) { - flushBatch(); + private List<LogMessage> stealBatch() { + List<LogMessage> toSend = null; + synchronized (this) { + toSend = currentBatchList; + currentBatchList = null; + } + return toSend; + } + public void flush() { + if (batchedMode.get()) { + flushBatch(stealBatch()); } for (LogHandler h : handlers) { @@ -110,15 +123,15 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { } } - public synchronized void close() { - if (hasBeenShutDown) { + public void close() { + if (hasBeenShutDown.getAndSet(true)) { throw new IllegalStateException("Shutdown already in progress"); } - hasBeenShutDown = true; for (LogHandler ht : handlers) { if (ht instanceof Thread) { log.fine("Stopping " + ht); + // Todo: Very bad, never do.... ((Thread) ht).interrupt(); } } @@ -134,17 +147,18 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * <p> * If the thread is not alive it will be start()'ed. */ - public synchronized void registerLogHandler(LogHandler ht) { - if (hasBeenShutDown) { - throw new IllegalStateException("Tried to register LogHandler on" + - " LogDispatcher which was shut down"); + public void registerLogHandler(LogHandler ht) { + if (hasBeenShutDown.get()) { + throw new IllegalStateException("Tried to register LogHandler on LogDispatcher which was shut down"); } - if (handlers.contains(ht)) { - log.warning("LogHandler was already registered: " + ht); - return; + synchronized (this) { + if (handlers.contains(ht)) { + log.warning("LogHandler was already registered: " + ht); + return; + } + handlers.add(ht); } - handlers.add(ht); if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) { ((Thread) ht).start(); @@ -157,6 +171,7 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * Make defensive copy and return array of LogHandlers. */ public LogHandler[] getLogHandlers() { + handlers.toArray(); LogHandler[] h = new LogHandler[handlers.size()]; return handlers.toArray(h); } @@ -166,19 +181,16 @@ public class LogDispatcher implements LogHandler, SelectLoopHook { * * @return Returns the number of messages that we have seen. */ - public synchronized int getMessageCount() { - return messageCount; + public int getMessageCount() { + return messageCount.get(); } /** * Hook which is called when the select loop has finished. */ public void selectLoopHook(boolean before) { - if (batchedMode) { - flushBatch(); + if (batchedMode.get()) { + flushBatch(stealBatch()); } - - lastRoundCount = messageCount - roundCount; - roundCount = messageCount; } } |