diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-20 15:34:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-20 15:34:25 +0200 |
commit | b2421dbec063b9be46611b08e69c5407a151f778 (patch) | |
tree | 6d1291f5856c1c0d38d4f5d9516d3ac53c9b7fed | |
parent | ae46707c055f3bd67319d7ba774bf34713291fae (diff) | |
parent | 607e3cb63d4e5b1abd36a5519b4e08b2af391622 (diff) |
Merge pull request #9146 from vespa-engine/balder/use-sender-thread-all-the-way
Balder/use sender thread all the way
8 files changed, 42 insertions, 187 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java index 63e5dbb2d04..7211e4cead0 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.log.LogLevel; import java.util.ArrayDeque; @@ -9,9 +8,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -27,7 +23,6 @@ public class Messenger implements Runnable { private static final Logger log = Logger.getLogger(Messenger.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final List<Task> children = new ArrayList<>(); - private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send")); private final Queue<Task> queue = new ArrayDeque<>(); private final Thread thread = new Thread(this, "Messenger"); @@ -69,13 +64,8 @@ public class Messenger implements Runnable { public void deliverMessage(final Message msg, final MessageHandler handler) { if (destroyed.get()) { msg.discard(); - return; - } - try { - sendExecutor.execute(new MessageTask(msg, handler)); - } catch (RejectedExecutionException e) { - msg.discard(); - log.warning("Execution rejected " + e.getMessage()); + } else { + handler.handleMessage(msg); } } @@ -88,7 +78,11 @@ public class Messenger implements Runnable { * @param handler The handler to return to. */ public void deliverReply(final Reply reply, final ReplyHandler handler) { - enqueue(new ReplyTask(reply, handler)); + if (destroyed.get()) { + reply.discard(); + } else { + handler.handleReply(reply); + } } /** @@ -136,7 +130,6 @@ public class Messenger implements Runnable { boolean done = false; enqueue(Terminate.INSTANCE); if (!destroyed.getAndSet(true)) { - sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();}); try { synchronized (this) { while (!queue.isEmpty()) { @@ -219,49 +212,6 @@ public class Messenger implements Runnable { void destroy(); } - private static class MessageTask implements Runnable { - - final MessageHandler handler; - Message msg; - - MessageTask(final Message msg, final MessageHandler handler) { - this.msg = msg; - this.handler = handler; - } - - @Override - public void run() { - final Message msg = this.msg; - this.msg = null; - handler.handleMessage(msg); - } - } - - private static class ReplyTask implements Task { - - final ReplyHandler handler; - Reply reply; - - ReplyTask(final Reply reply, final ReplyHandler handler) { - this.reply = reply; - this.handler = handler; - } - - @Override - public void run() { - final Reply reply = this.reply; - this.reply = null; - handler.handleReply(reply); - } - - @Override - public void destroy() { - if (reply != null) { - reply.discard(); - } - } - } - private static class SyncTask implements Task { final CountDownLatch latch = new CountDownLatch(1); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java index dd1e5858aee..c644b551a79 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java @@ -2,8 +2,6 @@ package com.yahoo.feedapi; import com.yahoo.document.Document; -import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.EmptyReply; @@ -24,97 +22,41 @@ public class DummySessionFactory implements SessionFactory { } public final List<Message> messages; - private boolean autoReply = false; - private ReplyFactory autoReplyFactory = null; - private Error autoError; - private int sessionsCreated = 0; - OutputStream output = null; + private boolean autoReply; + private OutputStream output = null; - protected DummySessionFactory() { - messages = new ArrayList<>(); - } - - public static DummySessionFactory createDefault() { - return new DummySessionFactory(); - } - - protected DummySessionFactory(boolean autoReply) { + private DummySessionFactory(boolean autoReply) { this.autoReply = autoReply; messages = new ArrayList<>(); } - protected DummySessionFactory(ReplyFactory autoReplyFactory) { - this.autoReply = true; - this.autoReplyFactory = autoReplyFactory; - messages = new ArrayList<>(); - } - - public static DummySessionFactory createWithAutoReplyFactory(ReplyFactory autoReplyFactory) { - return new DummySessionFactory(autoReplyFactory); - } - - protected DummySessionFactory(Error e) { - autoReply = true; - this.autoError = e; - messages = new ArrayList<>(); - } - - public static DummySessionFactory createWithErrorAutoReply(Error e) { - return new DummySessionFactory(e); - } - public static DummySessionFactory createWithAutoReply() { return new DummySessionFactory(true); } - public DummySessionFactory(Error e, OutputStream out) { + public DummySessionFactory(OutputStream out) { messages = null; autoReply = true; output = out; } - public int sessionsCreated() { - return sessionsCreated; - } - void add(Message m) { + private void add(Message m) { if (messages != null) { messages.add(m); } - } @Override public SendSession createSendSession(ReplyHandler r, Metric metric) { - ++sessionsCreated; - if (output != null) { return new DumpDocuments(output, r, this); } if (autoReply) { - return new AutoReplySession(r, autoReplyFactory, autoError, this); + return new AutoReplySession(r, null, null, this); } return new DummySession(r, this); } - @Override - public VisitorSession createVisitorSession(VisitorParameters p) { - return null; - } - - public void sendReply(Message m, Error error) { - MyContext ctxt = (MyContext) m.getContext(); - - Reply r = new EmptyReply(); - r.setMessage(m); - r.setContext(ctxt.oldContext); - - if (error != null) { - r.addError(error); - } - - ctxt.handler.handleReply(r); - } - private class MyContext { MyContext(ReplyHandler handler, Object ctxt) { this.handler = handler; @@ -132,7 +74,7 @@ public class DummySessionFactory implements SessionFactory { Error e; DummySessionFactory owner; - public AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory, + AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory, Error e, DummySessionFactory owner) { this.handler = handler; this.replyFactory = replyFactory; @@ -145,7 +87,7 @@ public class DummySessionFactory implements SessionFactory { } @Override - protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { + protected Result onSend(Message m, boolean blockIfQueueFull) { owner.add(m); handleMessage(m); Reply r; @@ -174,7 +116,7 @@ public class DummySessionFactory implements SessionFactory { private class DumpDocuments extends AutoReplySession { final OutputStream out; - public DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) { + DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) { super(r, null, null, factory); this.out = out; } @@ -191,13 +133,13 @@ public class DummySessionFactory implements SessionFactory { ReplyHandler handler; DummySessionFactory owner; - public DummySession(ReplyHandler handler, DummySessionFactory owner) { + DummySession(ReplyHandler handler, DummySessionFactory owner) { this.handler = handler; this.owner = owner; } @Override - protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException { + protected Result onSend(Message m, boolean blockIfQueueFull) { m.setContext(new MyContext(handler, m.getContext())); owner.add(m); return Result.ACCEPTED; diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java index 54e638717e0..12a4ecde493 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java @@ -3,8 +3,6 @@ package com.yahoo.feedapi; import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; import com.yahoo.documentapi.messagebus.MessageBusParams; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; @@ -30,7 +28,6 @@ public class MessageBusSessionFactory implements SessionFactory { String NUM_UPDATES = "num_updates"; } - @SuppressWarnings("unused") // used from extensions public MessageBusSessionFactory(MessagePropertyProcessor processor) { this(processor, null, null); } @@ -66,15 +63,6 @@ public class MessageBusSessionFactory implements SessionFactory { access.shutdown(); } - @Override - public synchronized VisitorSession createVisitorSession(VisitorParameters params) { - try { - return access.createVisitorSession(params); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private class SourceSessionWrapper extends SendSession { private final SourceSession session; diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java index 3897f1d7d2a..11688ba62e4 100644 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java @@ -40,10 +40,6 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib private LoadTypeSet loadTypes = null; private boolean configChanged = false; - public MessagePropertyProcessor(String configId, String loadTypeConfig) { - new ConfigSubscriber().subscribe(this, FeederConfig.class, configId); - loadTypes = new LoadTypeSet(loadTypeConfig); - } public MessagePropertyProcessor(FeederConfig config, LoadTypeConfig loadTypeCfg) { loadTypes = new LoadTypeSet(); @@ -282,14 +278,6 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib this.priority = priority; } - public LoadType getLoadType() { - return loadType; - } - - public void setLoadType(LoadType loadType) { - this.loadType = loadType; - } - public boolean getAbortOnDocumentError() { return abortOnDocumentError; } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java index 0bba3964bf7..6dce2b6f315 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.feedapi; -import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.ReplyHandler; @@ -20,13 +18,4 @@ public interface SessionFactory { * @return The session to use for sending messages. */ SendSession createSendSession(ReplyHandler handler, Metric metric); - - /** - * Creates a messagebus session for visiting data. - * - * @param params Parameters to the visitor - * @return A visitor session. - */ - VisitorSession createVisitorSession(VisitorParameters params); - } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java index e0e12b26ae6..9d0c740789e 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java @@ -27,10 +27,6 @@ public class SingleSender implements SimpleFeedAccess { this.blockingQueue = blockingQueue; } - public SingleSender(SharedSender.ResultCallback owner, SharedSender sender) { - this(owner, sender, true); - } - @Override public void put(Document doc) { send(new PutDocumentMessage(new DocumentPut(doc))); diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java index 0d23af1fec5..fa28f56e34a 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java @@ -22,19 +22,19 @@ import static java.lang.System.out; * Argument parsing class for the vespa feeder. */ public class Arguments { - public FeederConfig getFeederConfig() { + FeederConfig getFeederConfig() { return new FeederConfig(feederConfigBuilder); } - public List<String> getFiles() { + List<String> getFiles() { return files; } - public String getMode() { + String getMode() { return mode; } - public boolean isVerbose() { + boolean isVerbose() { return verbose; } @@ -44,12 +44,12 @@ public class Arguments { private String mode = "standard"; private boolean validateOnly = false; private boolean verbose = false; - SessionFactory sessionFactory; - MessagePropertyProcessor propertyProcessor = null; + private SessionFactory sessionFactory; + private MessagePropertyProcessor propertyProcessor = null; private String priority = null; private int numThreads = 1; - public MessagePropertyProcessor getPropertyProcessor() { + MessagePropertyProcessor getPropertyProcessor() { return propertyProcessor; } @@ -89,11 +89,11 @@ public class Arguments { " -v [ --verbose ] Enable verbose output of progress.\n"); } - public class HelpShownException extends Exception { + class HelpShownException extends Exception { } - public Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException { + Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException { parse(argList); if (factory != null) { @@ -101,17 +101,17 @@ public class Arguments { } else if (validateOnly) { if (dumpDocumentsFile != null) { BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(dumpDocumentsFile)); - sessionFactory = new DummySessionFactory(null, out); + sessionFactory = new DummySessionFactory(out); } else { - sessionFactory = new DummySessionFactory(null, null); + sessionFactory = new DummySessionFactory(null); } } else { sessionFactory = new MessageBusSessionFactory(propertyProcessor); } } - void parse(String[] argList) throws HelpShownException { - List<String> args = new LinkedList<String>(); + private void parse(String[] argList) throws HelpShownException { + List<String> args = new LinkedList<>(); args.addAll(Arrays.asList(argList)); while (!args.isEmpty()) { @@ -187,11 +187,11 @@ public class Arguments { return priority; } - public int getNumThreads() { + int getNumThreads() { return numThreads; } - public SessionFactory getSessionFactory() { + SessionFactory getSessionFactory() { return sessionFactory; } diff --git a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java index 4de286398e9..0fc0cdf017c 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java @@ -1,12 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespafeeder; -import static org.junit.Assert.*; - import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; @@ -16,17 +13,23 @@ import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentTypeManagerConfigurer; import com.yahoo.document.DocumentUpdate; -import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.feedapi.DummySessionFactory; import com.yahoo.feedhandler.VespaFeedHandler; import com.yahoo.text.Utf8; import com.yahoo.vespaclient.config.FeederConfig; -import com.yahoo.vespafeeder.Arguments.HelpShownException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class VespaFeederTestCase { @Rule @@ -203,8 +206,7 @@ public class VespaFeederTestCase { feed("src/test/files/malformedfeed.json", false); } - protected FeedFixture feed(String feed, boolean abortOnDataError) throws HelpShownException, - FileNotFoundException, Exception { + protected FeedFixture feed(String feed, boolean abortOnDataError) throws Exception { String abortOnDataErrorArgument = abortOnDataError ? "" : " --abortondataerror no"; FeedFixture feedFixture = new FeedFixture(); Arguments arguments = new Arguments(("--file " |