summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-02-12 15:53:03 +0100
committerGitHub <noreply@github.com>2018-02-12 15:53:03 +0100
commit523573ae6d5fef655dbee931869eace6a943bc41 (patch)
tree44709d659f7a555e693c087bd3abf1596b48cb29
parentd25e7ac69ad61f952269e0d5a63006f6e5337ca2 (diff)
parent1368bd66f6a1198498850d8b437ec88ca368a986 (diff)
Merge pull request #5015 from vespa-engine/balder/avoid-concurrent-modification
1 - Avoid concurrent access to 'handlers' array by using a CopyOnWrit…
-rw-r--r--logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java117
1 files changed, 64 insertions, 53 deletions
diff --git a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
index 26abdd43815..65fa83598b6 100644
--- a/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
+++ b/logserver/src/main/java/com/yahoo/logserver/LogDispatcher.java
@@ -3,6 +3,9 @@ package com.yahoo.logserver;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import com.yahoo.io.SelectLoopHook;
@@ -20,18 +23,14 @@ import com.yahoo.logserver.handlers.LogHandler;
public class LogDispatcher implements LogHandler, SelectLoopHook {
private static final Logger log = Logger.getLogger(LogDispatcher.class.getName());
- private final List<LogHandler> handlers = new ArrayList<>();
- private int messageCount = 0;
- private boolean hasBeenShutDown = false;
- private boolean batchedMode = false;
+ private final List<LogHandler> handlers = new CopyOnWriteArrayList<>();
+ private final AtomicInteger messageCount = new AtomicInteger(0);
+ private final AtomicBoolean batchedMode = new AtomicBoolean(false);
private final int batchSize = 5000;
- private List<LogMessage> currentBatchList;
- private int roundCount = 0;
- @SuppressWarnings("unused")
- private int lastRoundCount = 0;
+ private final AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
+ private List<LogMessage> currentBatchList = null;
- public LogDispatcher() {
- }
+ public LogDispatcher() { }
/**
* Dispatches a message to all the LogHandler instances we've
@@ -41,47 +40,53 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* @param msg The LogMessage instance we wish to dispatch to the
* plugins
*/
- public synchronized void handle(LogMessage msg) {
+ public void handle(LogMessage msg) {
if (msg == null) {
throw new NullPointerException("LogMessage was null");
}
- if (batchedMode) {
+ if (batchedMode.get()) {
addToBatch(msg);
} else {
- for (LogHandler h : handlers) {
- h.handle(msg);
- }
+ send(msg);
}
- messageCount++;
+ messageCount.incrementAndGet();
}
private void addToBatch(LogMessage msg) {
- if (currentBatchList == null) {
- currentBatchList = new ArrayList<LogMessage>(batchSize);
- currentBatchList.add(msg);
- return;
- }
+ List<LogMessage> toSend = null;
+ synchronized (this) {
+ if (currentBatchList == null) {
+ currentBatchList = new ArrayList<LogMessage>(batchSize);
+ currentBatchList.add(msg);
+ return;
+ }
- currentBatchList.add(msg);
+ currentBatchList.add(msg);
- if (currentBatchList.size() == batchSize) {
- flushBatch();
+ if (currentBatchList.size() == batchSize) {
+ toSend = stealBatch();
+ }
}
+ flushBatch(toSend);
}
- private void flushBatch() {
- List<LogMessage> todo;
- synchronized(this) {
- todo = currentBatchList;
- currentBatchList = null;
+ private void send(List<LogMessage> messages) {
+ for (LogHandler ht : handlers) {
+ ht.handle(messages);
}
- if (todo == null) return;
+ }
+ private void send(LogMessage message) {
for (LogHandler ht : handlers) {
- ht.handle(todo);
+ ht.handle(message);
}
}
+ private void flushBatch(List<LogMessage> todo) {
+ if (todo == null) { return; }
+ send(todo);
+ }
+
public void handle(List<LogMessage> messages) {
throw new IllegalStateException("method not supported");
}
@@ -94,12 +99,20 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* but lists of same.
*/
public void setBatchedMode(boolean batchedMode) {
- this.batchedMode = batchedMode;
+ this.batchedMode.set(batchedMode);
}
- public synchronized void flush() {
- if (batchedMode) {
- flushBatch();
+ private List<LogMessage> stealBatch() {
+ List<LogMessage> toSend = null;
+ synchronized (this) {
+ toSend = currentBatchList;
+ currentBatchList = null;
+ }
+ return toSend;
+ }
+ public void flush() {
+ if (batchedMode.get()) {
+ flushBatch(stealBatch());
}
for (LogHandler h : handlers) {
@@ -110,15 +123,15 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
}
}
- public synchronized void close() {
- if (hasBeenShutDown) {
+ public void close() {
+ if (hasBeenShutDown.getAndSet(true)) {
throw new IllegalStateException("Shutdown already in progress");
}
- hasBeenShutDown = true;
for (LogHandler ht : handlers) {
if (ht instanceof Thread) {
log.fine("Stopping " + ht);
+ // Todo: Very bad, never do....
((Thread) ht).interrupt();
}
}
@@ -134,17 +147,18 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
* <p>
* If the thread is not alive it will be start()'ed.
*/
- public synchronized void registerLogHandler(LogHandler ht) {
- if (hasBeenShutDown) {
- throw new IllegalStateException("Tried to register LogHandler on" +
- " LogDispatcher which was shut down");
+ public void registerLogHandler(LogHandler ht) {
+ if (hasBeenShutDown.get()) {
+ throw new IllegalStateException("Tried to register LogHandler on LogDispatcher which was shut down");
}
- if (handlers.contains(ht)) {
- log.warning("LogHandler was already registered: " + ht);
- return;
+ synchronized (this) {
+ if (handlers.contains(ht)) {
+ log.warning("LogHandler was already registered: " + ht);
+ return;
+ }
+ handlers.add(ht);
}
- handlers.add(ht);
if ((ht instanceof Thread) && (! ((Thread) ht).isAlive())) {
((Thread) ht).start();
@@ -166,19 +180,16 @@ public class LogDispatcher implements LogHandler, SelectLoopHook {
*
* @return Returns the number of messages that we have seen.
*/
- public synchronized int getMessageCount() {
- return messageCount;
+ public int getMessageCount() {
+ return messageCount.get();
}
/**
* Hook which is called when the select loop has finished.
*/
public void selectLoopHook(boolean before) {
- if (batchedMode) {
- flushBatch();
+ if (batchedMode.get()) {
+ flushBatch(stealBatch());
}
-
- lastRoundCount = messageCount - roundCount;
- roundCount = messageCount;
}
}