summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-20 15:34:25 +0200
committerGitHub <noreply@github.com>2019-04-20 15:34:25 +0200
commitb2421dbec063b9be46611b08e69c5407a151f778 (patch)
tree6d1291f5856c1c0d38d4f5d9516d3ac53c9b7fed
parentae46707c055f3bd67319d7ba774bf34713291fae (diff)
parent607e3cb63d4e5b1abd36a5519b4e08b2af391622 (diff)
Merge pull request #9146 from vespa-engine/balder/use-sender-thread-all-the-way
Balder/use sender thread all the way
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java64
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java80
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java12
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java12
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java11
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java4
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java30
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java16
8 files changed, 42 insertions, 187 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index 63e5dbb2d04..7211e4cead0 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.log.LogLevel;
import java.util.ArrayDeque;
@@ -9,9 +8,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -27,7 +23,6 @@ public class Messenger implements Runnable {
private static final Logger log = Logger.getLogger(Messenger.class.getName());
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final List<Task> children = new ArrayList<>();
- private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("messenger.send"));
private final Queue<Task> queue = new ArrayDeque<>();
private final Thread thread = new Thread(this, "Messenger");
@@ -69,13 +64,8 @@ public class Messenger implements Runnable {
public void deliverMessage(final Message msg, final MessageHandler handler) {
if (destroyed.get()) {
msg.discard();
- return;
- }
- try {
- sendExecutor.execute(new MessageTask(msg, handler));
- } catch (RejectedExecutionException e) {
- msg.discard();
- log.warning("Execution rejected " + e.getMessage());
+ } else {
+ handler.handleMessage(msg);
}
}
@@ -88,7 +78,11 @@ public class Messenger implements Runnable {
* @param handler The handler to return to.
*/
public void deliverReply(final Reply reply, final ReplyHandler handler) {
- enqueue(new ReplyTask(reply, handler));
+ if (destroyed.get()) {
+ reply.discard();
+ } else {
+ handler.handleReply(reply);
+ }
}
/**
@@ -136,7 +130,6 @@ public class Messenger implements Runnable {
boolean done = false;
enqueue(Terminate.INSTANCE);
if (!destroyed.getAndSet(true)) {
- sendExecutor.shutdownNow().forEach((Runnable task) -> {((MessageTask) task).msg.discard();});
try {
synchronized (this) {
while (!queue.isEmpty()) {
@@ -219,49 +212,6 @@ public class Messenger implements Runnable {
void destroy();
}
- private static class MessageTask implements Runnable {
-
- final MessageHandler handler;
- Message msg;
-
- MessageTask(final Message msg, final MessageHandler handler) {
- this.msg = msg;
- this.handler = handler;
- }
-
- @Override
- public void run() {
- final Message msg = this.msg;
- this.msg = null;
- handler.handleMessage(msg);
- }
- }
-
- private static class ReplyTask implements Task {
-
- final ReplyHandler handler;
- Reply reply;
-
- ReplyTask(final Reply reply, final ReplyHandler handler) {
- this.reply = reply;
- this.handler = handler;
- }
-
- @Override
- public void run() {
- final Reply reply = this.reply;
- this.reply = null;
- handler.handleReply(reply);
- }
-
- @Override
- public void destroy() {
- if (reply != null) {
- reply.discard();
- }
- }
- }
-
private static class SyncTask implements Task {
final CountDownLatch latch = new CountDownLatch(1);
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
index dd1e5858aee..c644b551a79 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
@@ -2,8 +2,6 @@
package com.yahoo.feedapi;
import com.yahoo.document.Document;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.EmptyReply;
@@ -24,97 +22,41 @@ public class DummySessionFactory implements SessionFactory {
}
public final List<Message> messages;
- private boolean autoReply = false;
- private ReplyFactory autoReplyFactory = null;
- private Error autoError;
- private int sessionsCreated = 0;
- OutputStream output = null;
+ private boolean autoReply;
+ private OutputStream output = null;
- protected DummySessionFactory() {
- messages = new ArrayList<>();
- }
-
- public static DummySessionFactory createDefault() {
- return new DummySessionFactory();
- }
-
- protected DummySessionFactory(boolean autoReply) {
+ private DummySessionFactory(boolean autoReply) {
this.autoReply = autoReply;
messages = new ArrayList<>();
}
- protected DummySessionFactory(ReplyFactory autoReplyFactory) {
- this.autoReply = true;
- this.autoReplyFactory = autoReplyFactory;
- messages = new ArrayList<>();
- }
-
- public static DummySessionFactory createWithAutoReplyFactory(ReplyFactory autoReplyFactory) {
- return new DummySessionFactory(autoReplyFactory);
- }
-
- protected DummySessionFactory(Error e) {
- autoReply = true;
- this.autoError = e;
- messages = new ArrayList<>();
- }
-
- public static DummySessionFactory createWithErrorAutoReply(Error e) {
- return new DummySessionFactory(e);
- }
-
public static DummySessionFactory createWithAutoReply() {
return new DummySessionFactory(true);
}
- public DummySessionFactory(Error e, OutputStream out) {
+ public DummySessionFactory(OutputStream out) {
messages = null;
autoReply = true;
output = out;
}
- public int sessionsCreated() {
- return sessionsCreated;
- }
- void add(Message m) {
+ private void add(Message m) {
if (messages != null) {
messages.add(m);
}
-
}
@Override
public SendSession createSendSession(ReplyHandler r, Metric metric) {
- ++sessionsCreated;
-
if (output != null) {
return new DumpDocuments(output, r, this);
}
if (autoReply) {
- return new AutoReplySession(r, autoReplyFactory, autoError, this);
+ return new AutoReplySession(r, null, null, this);
}
return new DummySession(r, this);
}
- @Override
- public VisitorSession createVisitorSession(VisitorParameters p) {
- return null;
- }
-
- public void sendReply(Message m, Error error) {
- MyContext ctxt = (MyContext) m.getContext();
-
- Reply r = new EmptyReply();
- r.setMessage(m);
- r.setContext(ctxt.oldContext);
-
- if (error != null) {
- r.addError(error);
- }
-
- ctxt.handler.handleReply(r);
- }
-
private class MyContext {
MyContext(ReplyHandler handler, Object ctxt) {
this.handler = handler;
@@ -132,7 +74,7 @@ public class DummySessionFactory implements SessionFactory {
Error e;
DummySessionFactory owner;
- public AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory,
+ AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory,
Error e, DummySessionFactory owner) {
this.handler = handler;
this.replyFactory = replyFactory;
@@ -145,7 +87,7 @@ public class DummySessionFactory implements SessionFactory {
}
@Override
- protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
+ protected Result onSend(Message m, boolean blockIfQueueFull) {
owner.add(m);
handleMessage(m);
Reply r;
@@ -174,7 +116,7 @@ public class DummySessionFactory implements SessionFactory {
private class DumpDocuments extends AutoReplySession {
final OutputStream out;
- public DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) {
+ DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) {
super(r, null, null, factory);
this.out = out;
}
@@ -191,13 +133,13 @@ public class DummySessionFactory implements SessionFactory {
ReplyHandler handler;
DummySessionFactory owner;
- public DummySession(ReplyHandler handler, DummySessionFactory owner) {
+ DummySession(ReplyHandler handler, DummySessionFactory owner) {
this.handler = handler;
this.owner = owner;
}
@Override
- protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
+ protected Result onSend(Message m, boolean blockIfQueueFull) {
m.setContext(new MyContext(handler, m.getContext()));
owner.add(m);
return Result.ACCEPTED;
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
index 54e638717e0..12a4ecde493 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
@@ -3,8 +3,6 @@ package com.yahoo.feedapi;
import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
import com.yahoo.documentapi.messagebus.MessageBusParams;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
@@ -30,7 +28,6 @@ public class MessageBusSessionFactory implements SessionFactory {
String NUM_UPDATES = "num_updates";
}
- @SuppressWarnings("unused") // used from extensions
public MessageBusSessionFactory(MessagePropertyProcessor processor) {
this(processor, null, null);
}
@@ -66,15 +63,6 @@ public class MessageBusSessionFactory implements SessionFactory {
access.shutdown();
}
- @Override
- public synchronized VisitorSession createVisitorSession(VisitorParameters params) {
- try {
- return access.createVisitorSession(params);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
private class SourceSessionWrapper extends SendSession {
private final SourceSession session;
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
index 3897f1d7d2a..11688ba62e4 100644
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
@@ -40,10 +40,6 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
private LoadTypeSet loadTypes = null;
private boolean configChanged = false;
- public MessagePropertyProcessor(String configId, String loadTypeConfig) {
- new ConfigSubscriber().subscribe(this, FeederConfig.class, configId);
- loadTypes = new LoadTypeSet(loadTypeConfig);
- }
public MessagePropertyProcessor(FeederConfig config, LoadTypeConfig loadTypeCfg) {
loadTypes = new LoadTypeSet();
@@ -282,14 +278,6 @@ public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscrib
this.priority = priority;
}
- public LoadType getLoadType() {
- return loadType;
- }
-
- public void setLoadType(LoadType loadType) {
- this.loadType = loadType;
- }
-
public boolean getAbortOnDocumentError() {
return abortOnDocumentError;
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
index 0bba3964bf7..6dce2b6f315 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.feedapi;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.ReplyHandler;
@@ -20,13 +18,4 @@ public interface SessionFactory {
* @return The session to use for sending messages.
*/
SendSession createSendSession(ReplyHandler handler, Metric metric);
-
- /**
- * Creates a messagebus session for visiting data.
- *
- * @param params Parameters to the visitor
- * @return A visitor session.
- */
- VisitorSession createVisitorSession(VisitorParameters params);
-
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
index e0e12b26ae6..9d0c740789e 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
@@ -27,10 +27,6 @@ public class SingleSender implements SimpleFeedAccess {
this.blockingQueue = blockingQueue;
}
- public SingleSender(SharedSender.ResultCallback owner, SharedSender sender) {
- this(owner, sender, true);
- }
-
@Override
public void put(Document doc) {
send(new PutDocumentMessage(new DocumentPut(doc)));
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java
index 0d23af1fec5..fa28f56e34a 100644
--- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java
+++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java
@@ -22,19 +22,19 @@ import static java.lang.System.out;
* Argument parsing class for the vespa feeder.
*/
public class Arguments {
- public FeederConfig getFeederConfig() {
+ FeederConfig getFeederConfig() {
return new FeederConfig(feederConfigBuilder);
}
- public List<String> getFiles() {
+ List<String> getFiles() {
return files;
}
- public String getMode() {
+ String getMode() {
return mode;
}
- public boolean isVerbose() {
+ boolean isVerbose() {
return verbose;
}
@@ -44,12 +44,12 @@ public class Arguments {
private String mode = "standard";
private boolean validateOnly = false;
private boolean verbose = false;
- SessionFactory sessionFactory;
- MessagePropertyProcessor propertyProcessor = null;
+ private SessionFactory sessionFactory;
+ private MessagePropertyProcessor propertyProcessor = null;
private String priority = null;
private int numThreads = 1;
- public MessagePropertyProcessor getPropertyProcessor() {
+ MessagePropertyProcessor getPropertyProcessor() {
return propertyProcessor;
}
@@ -89,11 +89,11 @@ public class Arguments {
" -v [ --verbose ] Enable verbose output of progress.\n");
}
- public class HelpShownException extends Exception {
+ class HelpShownException extends Exception {
}
- public Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException {
+ Arguments(String[] argList, SessionFactory factory) throws HelpShownException, FileNotFoundException {
parse(argList);
if (factory != null) {
@@ -101,17 +101,17 @@ public class Arguments {
} else if (validateOnly) {
if (dumpDocumentsFile != null) {
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(dumpDocumentsFile));
- sessionFactory = new DummySessionFactory(null, out);
+ sessionFactory = new DummySessionFactory(out);
} else {
- sessionFactory = new DummySessionFactory(null, null);
+ sessionFactory = new DummySessionFactory(null);
}
} else {
sessionFactory = new MessageBusSessionFactory(propertyProcessor);
}
}
- void parse(String[] argList) throws HelpShownException {
- List<String> args = new LinkedList<String>();
+ private void parse(String[] argList) throws HelpShownException {
+ List<String> args = new LinkedList<>();
args.addAll(Arrays.asList(argList));
while (!args.isEmpty()) {
@@ -187,11 +187,11 @@ public class Arguments {
return priority;
}
- public int getNumThreads() {
+ int getNumThreads() {
return numThreads;
}
- public SessionFactory getSessionFactory() {
+ SessionFactory getSessionFactory() {
return sessionFactory;
}
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java
index 4de286398e9..0fc0cdf017c 100644
--- a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java
+++ b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java
@@ -1,12 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespafeeder;
-import static org.junit.Assert.*;
-
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -16,17 +13,23 @@ import com.yahoo.clientmetrics.RouteMetricSet;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
import com.yahoo.document.DocumentUpdate;
-import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.feedapi.DummySessionFactory;
import com.yahoo.feedhandler.VespaFeedHandler;
import com.yahoo.text.Utf8;
import com.yahoo.vespaclient.config.FeederConfig;
-import com.yahoo.vespafeeder.Arguments.HelpShownException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
public class VespaFeederTestCase {
@Rule
@@ -203,8 +206,7 @@ public class VespaFeederTestCase {
feed("src/test/files/malformedfeed.json", false);
}
- protected FeedFixture feed(String feed, boolean abortOnDataError) throws HelpShownException,
- FileNotFoundException, Exception {
+ protected FeedFixture feed(String feed, boolean abortOnDataError) throws Exception {
String abortOnDataErrorArgument = abortOnDataError ? "" : " --abortondataerror no";
FeedFixture feedFixture = new FeedFixture();
Arguments arguments = new Arguments(("--file "