summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespaclient-core
Publish
Diffstat (limited to 'vespaclient-core')
-rw-r--r--vespaclient-core/.gitignore2
-rw-r--r--vespaclient-core/OWNERS1
-rw-r--r--vespaclient-core/pom.xml59
-rw-r--r--vespaclient-core/src/main/assembly/.gitignore0
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java39
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java105
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java67
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java79
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java211
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java111
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java109
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java364
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java24
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java102
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java8
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java324
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java21
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java32
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java246
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java19
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java114
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java39
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java26
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java163
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java38
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java28
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java102
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java93
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java8
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java15
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java40
-rw-r--r--vespaclient-core/src/main/resources/configdefinitions/feeder.def46
-rw-r--r--vespaclient-core/src/main/resources/configdefinitions/spooler.def34
-rw-r--r--vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java91
-rw-r--r--vespaclient-core/src/test/java/com/yahoo/vespafeeder/.gitignore0
35 files changed, 2760 insertions, 0 deletions
diff --git a/vespaclient-core/.gitignore b/vespaclient-core/.gitignore
new file mode 100644
index 00000000000..016c6f704f0
--- /dev/null
+++ b/vespaclient-core/.gitignore
@@ -0,0 +1,2 @@
+pom.xml.build
+/target
diff --git a/vespaclient-core/OWNERS b/vespaclient-core/OWNERS
new file mode 100644
index 00000000000..0e39145d8c3
--- /dev/null
+++ b/vespaclient-core/OWNERS
@@ -0,0 +1 @@
+dybdahl
diff --git a/vespaclient-core/pom.xml b/vespaclient-core/pom.xml
new file mode 100644
index 00000000000..01705141ee0
--- /dev/null
+++ b/vespaclient-core/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>vespaclient-core</artifactId>
+ <version>6-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>metrics</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-dev</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerArgs>
+ <arg>-Xlint:all</arg>
+ <arg>-Werror</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-class-plugin</artifactId>
+ <version>${project.version}</version>
+ <executions>
+ <execution>
+ <id>config-gen</id>
+ <goals>
+ <goal>config-gen</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/vespaclient-core/src/main/assembly/.gitignore b/vespaclient-core/src/main/assembly/.gitignore
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/vespaclient-core/src/main/assembly/.gitignore
diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java
new file mode 100755
index 00000000000..87c1dfc7030
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/ClientMetrics.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.clientmetrics;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.metrics.*;
+import com.yahoo.text.XMLWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author thomasg
+ */
+public class ClientMetrics {
+
+ MetricSet topSet;
+ SumMetric sum;
+ List<String> routes = new ArrayList<String>();
+
+ public ClientMetrics() {
+ topSet = new SimpleMetricSet("routes", "", "", null);
+ sum = new SumMetric("total", "", "Messages sent to all routes", topSet);
+ }
+
+ public MetricSet getMetricSet() {
+ return topSet;
+ }
+
+ public void addRouteMetricSet(RouteMetricSet metric) {
+ topSet.registerMetric(metric);
+ sum.addMetricToSum(metric);
+ routes.add(metric.getRoute());
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java
new file mode 100644
index 00000000000..b55aa82b6ca
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/MessageTypeMetricSet.java
@@ -0,0 +1,105 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.clientmetrics;
+
+import com.yahoo.documentapi.messagebus.protocol.DocumentIgnoredReply;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.metrics.*;
+import com.yahoo.messagebus.Error;
+
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+* @author thomasg
+*/
+public class MessageTypeMetricSet extends MetricSet {
+ ValueMetric<Long> latency;
+ CountMetric count;
+ CountMetric ignored;
+
+ SumMetric errorSum;
+ MetricSet errors;
+ String msgName;
+
+ class ErrorMetric extends CountMetric {
+ ErrorMetric(String name, MetricSet owner) {
+ super(name, "", "Number of errors of type " + name, owner);
+ }
+
+ ErrorMetric(ErrorMetric other, CopyType copyType, MetricSet owner) {
+ super(other, copyType, owner);
+ }
+
+ @Override
+ public String getXMLTag() {
+ return "error";
+ }
+
+ @Override
+ public Metric clone(CopyType type, MetricSet owner, boolean includeUnused) {
+ return new ErrorMetric(this, type, owner);
+ }
+
+ }
+
+ public MessageTypeMetricSet(String msgName, MetricSet owner) {
+ super(msgName.toLowerCase(), "", "", owner);
+ this.msgName = msgName;
+ latency = new ValueMetric<Long>("latency", "", "Latency (in ms)", this).averageMetric();
+ count = new CountMetric("count", "", "Number received", this);
+ ignored = new CountMetric("ignored", "", "Number ignored due to no matching document routing selectors", this);
+ errors = new SimpleMetricSet("errors", "", "The errors returned", this);
+ errorSum = new SumMetric("total", "", "Total number of errors", errors);
+ }
+
+ public MessageTypeMetricSet(MessageTypeMetricSet source, CopyType copyType, MetricSet owner, boolean includeUnused) {
+ super(source, copyType, owner, includeUnused);
+ msgName = source.msgName;
+ }
+
+ public String getMessageName() {
+ return msgName;
+ }
+
+ public void addReply(Reply r) {
+ if (!r.hasErrors() || onlyTestAndSetConditionFailed(r.getErrors())) {
+ updateSuccessMetrics(r);
+ } else {
+ updateFailureMetrics(r);
+ }
+ }
+
+ private void updateFailureMetrics(Reply r) {
+ String error = DocumentProtocol.getErrorName(r.getError(0).getCode());
+ CountMetric s = (CountMetric)errors.getMetric(error);
+ if (s == null) {
+ s = new ErrorMetric(error, errors);
+ errorSum.addMetricToSum(s);
+ }
+ s.inc();
+ }
+
+ private void updateSuccessMetrics(Reply r) {
+ if (!(r instanceof DocumentIgnoredReply)) {
+ if (r.getMessage().getTimeReceived() != 0) {
+ latency.addValue(SystemTimer.INSTANCE.milliTime() - r.getMessage().getTimeReceived());
+ }
+ count.inc();
+ } else {
+ ignored.inc();
+ }
+ }
+
+ @Override
+ public Metric clone(CopyType type, MetricSet owner, boolean includeUnused)
+ { return new MessageTypeMetricSet(this, type, owner, includeUnused); }
+
+ /**
+ * Returns true if every error in a stream is a test and set condition failed
+ */
+ private static boolean onlyTestAndSetConditionFailed(Stream<Error> errors) {
+ return errors.allMatch(e -> e.getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED);
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java
new file mode 100644
index 00000000000..f6cb5a872e6
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/clientmetrics/RouteMetricSet.java
@@ -0,0 +1,67 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.clientmetrics;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.metrics.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+* @author thomasg
+*/
+public class RouteMetricSet extends MetricSet {
+
+ SumMetric sum;
+ ProgressCallback callback;
+ Map<Integer,MessageTypeMetricSet> typeMap = new HashMap<Integer,MessageTypeMetricSet>();
+
+ public interface ProgressCallback {
+ void onProgress(RouteMetricSet route);
+ void done(RouteMetricSet route);
+ }
+
+ public RouteMetricSet(String route, ProgressCallback callback) {
+ super(route, "", "Messages sent to the named route", null);
+ sum = new SumMetric("total", "", "All kinds of messages sent to the given route", this);
+ this.callback = callback;
+ }
+
+ @Override
+ public String getXMLTag() {
+ return "route";
+ }
+
+ public RouteMetricSet(RouteMetricSet source, CopyType copyType, MetricSet owner, boolean includeUnused) {
+ super(source, copyType, owner, includeUnused);
+ }
+
+ public void addReply(Reply r) {
+ MessageTypeMetricSet type = typeMap.get(r.getMessage().getType());
+ if (type == null) {
+ String msgName = r.getMessage().getClass().getSimpleName().replace("Message", "");
+ type = new MessageTypeMetricSet(msgName, this);
+ sum.addMetricToSum(type);
+ typeMap.put(r.getMessage().getType(), type);
+ }
+
+ type.addReply(r);
+ if (callback != null) {
+ callback.onProgress(this);
+ }
+ }
+
+ public void done() {
+ if (callback != null) {
+ callback.done(this);
+ }
+ }
+
+ @Override
+ public Metric clone(CopyType type, MetricSet owner, boolean includeUnused)
+ { return new RouteMetricSet(this, type, owner, includeUnused); }
+
+ String getRoute() {
+ return getName();
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java
new file mode 100755
index 00000000000..80825810837
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java
@@ -0,0 +1,79 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.*;
+import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vdslib.Entry;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DocprocMessageProcessor implements MessageProcessor {
+ private final DocprocService docproc;
+ private final ComponentRegistry<DocprocService> docprocServiceRegistry;
+
+ public DocprocMessageProcessor(DocprocService docproc, ComponentRegistry<DocprocService> docprocServiceRegistry) {
+ this.docproc = docproc;
+ this.docprocServiceRegistry = docprocServiceRegistry;
+ }
+
+ @Override
+ public void process(Message m) {
+ try {
+ List<DocumentOperation> documentBases = new ArrayList<DocumentOperation>();
+
+ if (m.getType() == DocumentProtocol.MESSAGE_PUTDOCUMENT) {
+ documentBases.add(((PutDocumentMessage) m).getDocumentPut());
+ } else if (m.getType() == DocumentProtocol.MESSAGE_UPDATEDOCUMENT) {
+ documentBases.add(((UpdateDocumentMessage) m).getDocumentUpdate());
+ } else if (m.getType() == DocumentProtocol.MESSAGE_REMOVEDOCUMENT) {
+ documentBases.add(((RemoveDocumentMessage) m).getDocumentRemove());
+ } else if (m.getType() == DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE) {
+ for (DocumentUpdate update : ((BatchDocumentUpdateMessage) m).getUpdates()) {
+ documentBases.add(update);
+ }
+ }
+
+ if (docproc != null) {
+ processDocumentOperations(documentBases, m);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception {
+ Processing processing = Processing.createProcessingFromDocumentOperations(docproc.getName(), documentOperations, new CallStack(docproc.getCallStack()));
+ processing.setServiceName(docproc.getName());
+ processing.setDocprocServiceRegistry(docprocServiceRegistry);
+ processing.setVariable("route", m.getRoute());
+ processing.setVariable("timeout", m.getTimeRemaining());
+
+ DocumentProcessor.Progress progress = docproc.getExecutor().process(processing);
+ while (DocumentProcessor.Progress.LATER.equals(progress)) {
+ Thread.sleep(50);
+ progress = docproc.getExecutor().process(processing);
+ }
+
+ if (progress == DocumentProcessor.Progress.FAILED
+ || progress == DocumentProcessor.Progress.PERMANENT_FAILURE) {
+ throw new RuntimeException("Processing of " + documentOperations + " failed: " + progress + ".");
+ }
+
+ m.setRoute((Route) processing.getVariable("route"));
+ m.setTimeRemaining((Long) processing.getVariable("timeout"));
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
new file mode 100755
index 00000000000..7b22abc8a76
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DummySessionFactory.java
@@ -0,0 +1,211 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DummySessionFactory implements SessionFactory {
+
+ public interface ReplyFactory {
+ Reply createReply(Message m);
+ }
+
+ public final List<Message> messages;
+ private boolean autoReply = false;
+ private ReplyFactory autoReplyFactory = null;
+ private Error autoError;
+ private int sessionsCreated = 0;
+ OutputStream output = null;
+
+ protected DummySessionFactory() {
+ messages = new ArrayList<>();
+ }
+
+ public static DummySessionFactory createDefault() {
+ return new DummySessionFactory();
+ }
+
+ protected DummySessionFactory(boolean autoReply) {
+ this.autoReply = autoReply;
+ messages = new ArrayList<>();
+ }
+
+ protected DummySessionFactory(ReplyFactory autoReplyFactory) {
+ this.autoReply = true;
+ this.autoReplyFactory = autoReplyFactory;
+ messages = new ArrayList<>();
+ }
+
+ public static DummySessionFactory createWithAutoReplyFactory(ReplyFactory autoReplyFactory) {
+ return new DummySessionFactory(autoReplyFactory);
+ }
+
+ protected DummySessionFactory(Error e) {
+ autoReply = true;
+ this.autoError = e;
+ messages = new ArrayList<>();
+ }
+
+ public static DummySessionFactory createWithErrorAutoReply(Error e) {
+ return new DummySessionFactory(e);
+ }
+
+ public static DummySessionFactory createWithAutoReply() {
+ return new DummySessionFactory(true);
+ }
+
+ public DummySessionFactory(Error e, OutputStream out) {
+ messages = null;
+ autoReply = true;
+ output = out;
+ }
+
+ public int sessionsCreated() {
+ return sessionsCreated;
+ }
+ void add(Message m) {
+ if (messages != null) {
+ messages.add(m);
+ }
+
+ }
+
+ @Override
+ public SendSession createSendSession(ReplyHandler r, Metric metric) {
+ ++sessionsCreated;
+
+ if (output != null) {
+ return new DumpDocuments(output, r, this);
+ }
+ if (autoReply) {
+ return new AutoReplySession(r, autoReplyFactory, autoError, this);
+ }
+ return new DummySession(r, this);
+ }
+
+ @Override
+ public VisitorSession createVisitorSession(VisitorParameters p) {
+ return null;
+ }
+
+ public void sendReply(Message m, Error error) {
+ MyContext ctxt = (MyContext) m.getContext();
+
+ Reply r = new EmptyReply();
+ r.setMessage(m);
+ r.setContext(ctxt.oldContext);
+
+ if (error != null) {
+ r.addError(error);
+ }
+
+ ctxt.handler.handleReply(r);
+ }
+
+ private class MyContext {
+ MyContext(ReplyHandler handler, Object ctxt) {
+ this.handler = handler;
+ this.oldContext = ctxt;
+ }
+
+ ReplyHandler handler;
+ Object oldContext;
+ }
+
+ private class AutoReplySession extends SendSession {
+
+ ReplyHandler handler;
+ ReplyFactory replyFactory;
+ Error e;
+ DummySessionFactory owner;
+
+ public AutoReplySession(ReplyHandler handler, ReplyFactory replyFactory,
+ Error e, DummySessionFactory owner) {
+ this.handler = handler;
+ this.replyFactory = replyFactory;
+ this.e = e;
+ this.owner = owner;
+ }
+
+ protected void handleMessage(Message m) {
+
+ }
+
+ @Override
+ protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
+ owner.add(m);
+ handleMessage(m);
+ Reply r;
+ if (replyFactory == null) {
+ r = new EmptyReply();
+ } else {
+ r = replyFactory.createReply(m);
+ }
+
+ m.setTimeReceivedNow();
+ r.setMessage(m);
+ r.setContext(m.getContext());
+
+ if (e != null) {
+ r.addError(e);
+ }
+ handler.handleReply(r);
+ return Result.ACCEPTED;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private class DumpDocuments extends AutoReplySession {
+ final OutputStream out;
+ public DumpDocuments(OutputStream out, ReplyHandler r, DummySessionFactory factory) {
+ super(r, null, null, factory);
+ this.out = out;
+ }
+ protected void handleMessage(Message m) {
+ if (m instanceof PutDocumentMessage) {
+ PutDocumentMessage p = (PutDocumentMessage) m;
+ Document d = p.getDocumentPut().getDocument();
+ d.serialize(out);
+ }
+ }
+ }
+
+ private class DummySession extends SendSession {
+ ReplyHandler handler;
+ DummySessionFactory owner;
+
+ public DummySession(ReplyHandler handler, DummySessionFactory owner) {
+ this.handler = handler;
+ this.owner = owner;
+ }
+
+ @Override
+ protected Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
+ m.setContext(new MyContext(handler, m.getContext()));
+ owner.add(m);
+ return Result.ACCEPTED;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
new file mode 100755
index 00000000000..a26064cd98b
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeedContext.java
@@ -0,0 +1,111 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.jdisc.Metric;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.clientmetrics.ClientMetrics;
+import com.yahoo.vespaclient.ClusterList;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class FeedContext {
+
+ private final SessionFactory factory;
+ private final MessagePropertyProcessor propertyProcessor;
+ private final DocumentTypeManager docTypeManager;
+ private final ClusterList clusterList;
+ private final ClientMetrics metrics;
+ private final Metric metric;
+ private Map<String, SharedSender> senders = new TreeMap<>();
+
+ public static final Object sync = new Object();
+ public static FeedContext instance = null;
+
+ public FeedContext(MessagePropertyProcessor propertyProcessor, SessionFactory factory, DocumentTypeManager manager, ClusterList clusterList, Metric metric) {
+ this.propertyProcessor = propertyProcessor;
+ this.factory = factory;
+ docTypeManager = manager;
+ this.clusterList = clusterList;
+ metrics = new ClientMetrics();
+ this.metric = metric;
+ }
+
+ public ClientMetrics getMetrics() {
+ return metrics;
+ }
+
+ public ClusterList getClusterList() {
+ return clusterList;
+ }
+
+ public SessionFactory getSessionFactory() {
+ return factory;
+ }
+
+ public void shutdownSenders() {
+ for (SharedSender s : senders.values()) {
+ s.shutdown();
+ }
+ }
+
+ public synchronized SharedSender getSharedSender(String route) {
+ if (propertyProcessor.configChanged()) {
+ Map<String, SharedSender> newSenders = new TreeMap<>();
+
+ for (Map.Entry<String, SharedSender> sender : senders.entrySet()) {
+ newSenders.put(sender.getKey(), new SharedSender(sender.getKey(), factory, sender.getValue(), metric));
+ }
+
+ shutdownSenders();
+ senders = newSenders;
+ propertyProcessor.setConfigChanged(false);
+ }
+
+ if (route == null) {
+ route = propertyProcessor.getFeederOptions().getRoute();
+ }
+
+ SharedSender sender = senders.get(route);
+
+ if (sender == null) {
+ sender = new SharedSender(route, factory, sender, metric);
+ senders.put(route, sender);
+ metrics.addRouteMetricSet(sender.getMetrics());
+ }
+
+ return sender;
+ }
+
+ public MessagePropertyProcessor getPropertyProcessor() {
+ return propertyProcessor;
+ }
+
+ public DocumentTypeManager getDocumentTypeManager() {
+ return docTypeManager;
+ }
+
+ public static FeedContext getInstance(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, Metric metric) {
+ synchronized (sync) {
+ try {
+ if (instance == null) {
+ MessagePropertyProcessor proc = new MessagePropertyProcessor(feederConfig, loadTypeConfig);
+ MessageBusSessionFactory mbusFactory = new MessageBusSessionFactory(proc);
+ instance = new FeedContext(proc,
+ mbusFactory,
+ mbusFactory.getAccess().getDocumentTypeManager(),
+ new ClusterList("client"), metric);
+ } else {
+ instance.getPropertyProcessor().configure(feederConfig, loadTypeConfig);
+ }
+
+ return instance;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java
new file mode 100644
index 00000000000..6fc7b31f648
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java
@@ -0,0 +1,109 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+/**
+ * Base class for unpacking document operation streams and pushing to feed
+ * access points.
+ *
+ * @author Thomas Gundersen
+ * @author steinar
+ */
+public abstract class Feeder {
+
+ protected final InputStream stream;
+ protected final DocumentTypeManager docMan;
+ protected List<String> errors = new LinkedList<String>();
+ protected boolean doAbort = true;
+ protected boolean createIfNonExistent = false;
+ protected final VespaFeedSender sender;
+ private final int MAX_ERRORS = 10;
+
+ protected Feeder(DocumentTypeManager docMan, VespaFeedSender sender, InputStream stream) {
+ this.docMan = docMan;
+ this.sender = sender;
+ this.stream = stream;
+ }
+
+ public void setAbortOnDocumentError(boolean doAbort) {
+ this.doAbort = doAbort;
+ }
+
+ public void setCreateIfNonExistent(boolean value) {
+ this.createIfNonExistent = value;
+ }
+
+ public void addException(Exception e) {
+ String message;
+ if (e.getMessage() != null) {
+ message = e.getMessage().replaceAll("\"", "'");
+ } else {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ message = "(no message) " + sw.toString();
+ }
+
+ addError("ERROR: " + message);
+ }
+
+ private void addError(String error) {
+ if (errors.size() < MAX_ERRORS) {
+ errors.add(error);
+ } else if (errors.size() == MAX_ERRORS) {
+ errors.add("Reached maximum limit of errors (" + MAX_ERRORS + "). Not collecting any more.");
+ }
+ }
+
+ protected abstract FeedReader createReader() throws Exception;
+
+ public List<String> parse() {
+ FeedReader reader = null;
+
+ try {
+ reader = createReader();
+ } catch (Exception e) {
+ addError("ERROR: " + e.getClass().toString() + ": " + e.getMessage().replaceAll("\"", "'"));
+ return errors;
+ }
+
+ while (!sender.isAborted()) {
+ try {
+ VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
+ reader.read(op);
+ if (createIfNonExistent && op.getDocumentUpdate() != null) {
+ op.getDocumentUpdate().setCreateIfNonExistent(true);
+ }
+
+ // Done feeding.
+ if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
+ break;
+ } else {
+ sender.sendOperation(op);
+ }
+ } catch (XMLStreamException e) {
+ addException(e);
+ break;
+ } catch (Exception e) {
+ addException(e);
+ if (doAbort) {
+ break;
+ }
+ }
+ }
+
+ return errors;
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
new file mode 100755
index 00000000000..2894993b983
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/FeederOptions.java
@@ -0,0 +1,364 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+
+/**
+ * Just a wrapper for feeder options, from config or HTTP parameters.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class FeederOptions {
+ // These default values are here basically just for convenience in test cases,
+ // they are overridden by real config values in all other cases.
+ private boolean abortOnDocumentError = true;
+ private boolean abortOnSendError = true;
+ private boolean retryEnabled = true;
+ private double retryDelay = 1;
+ private double timeout = 60;
+ private int maxPendingBytes = 0;
+ private int maxPendingDocs = 0;
+ private double maxFeedRate = 0.0;
+ private String documentManagerConfigId = "client";
+ private String idPrefix = "";
+ private String route = "default";
+ private String routingConfigId;
+ private String slobrokConfigId;
+ private int traceLevel;
+ private int mbusPort;
+ private DocumentProtocol.Priority priority = DocumentProtocol.Priority.NORMAL_3;
+ private boolean priorityExplicitlySet = false;
+ private String docprocChain = "";
+
+ /** Constructs an options object with all default values. */
+ public FeederOptions() {
+ // empty
+ }
+
+ /**
+ * Implements the copy constructor.
+ *
+ * @param src The options to copy.
+ */
+ public FeederOptions(FeederOptions src) {
+ abortOnDocumentError = src.abortOnDocumentError;
+ abortOnSendError = src.abortOnSendError;
+ retryEnabled = src.retryEnabled;
+ retryDelay = src.retryDelay;
+ timeout = src.timeout;
+ maxPendingBytes = src.maxPendingBytes;
+ maxPendingDocs = src.maxPendingDocs;
+ maxFeedRate = src.maxFeedRate;
+ documentManagerConfigId = src.documentManagerConfigId;
+ idPrefix = src.idPrefix;
+ route = src.route;
+ routingConfigId = src.routingConfigId;
+ slobrokConfigId = src.slobrokConfigId;
+ traceLevel = src.traceLevel;
+ mbusPort = src.mbusPort;
+ priority = src.priority;
+ docprocChain = src.docprocChain;
+ }
+
+ /** Constructor that sets values from config. */
+ public FeederOptions(FeederConfig config) {
+ setAbortOnDocumentError(config.abortondocumenterror());
+ setAbortOnSendError(config.abortonsenderror());
+ setIdPrefix(config.idprefix());
+ setMaxPendingBytes(config.maxpendingbytes());
+ setMaxPendingDocs(config.maxpendingdocs());
+ setRetryEnabled(config.retryenabled());
+ setRetryDelay(config.retrydelay());
+ setRoute(config.route());
+ setTimeout(config.timeout());
+ setTraceLevel(config.tracelevel());
+ setMessageBusPort(config.mbusport());
+ setDocprocChain(config.docprocchain());
+ setMaxFeedRate(config.maxfeedrate());
+ }
+
+ public void setMaxFeedRate(double feedRate) {
+ maxFeedRate = feedRate;
+ }
+
+
+ public double getMaxFeedRate() {
+ return maxFeedRate;
+ }
+
+ public boolean getRetryEnabled() {
+ return retryEnabled;
+ }
+
+ public void setRetryEnabled(boolean retryEnabled) {
+ this.retryEnabled = retryEnabled;
+ }
+
+ public double getRetryDelay() {
+ return retryDelay;
+ }
+
+ public void setRetryDelay(double retryDelay) {
+ this.retryDelay = retryDelay;
+ }
+
+ public double getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(double timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMaxPendingBytes() {
+ return maxPendingBytes;
+ }
+
+ public void setMaxPendingBytes(int maxPendingBytes) {
+ this.maxPendingBytes = maxPendingBytes;
+ }
+
+ public int getMaxPendingDocs() {
+ return maxPendingDocs;
+ }
+
+ public void setMaxPendingDocs(int maxPendingDocs) {
+ this.maxPendingDocs = maxPendingDocs;
+ }
+
+ public boolean abortOnDocumentError() {
+ return abortOnDocumentError;
+ }
+
+ public void setAbortOnDocumentError(boolean abortOnDocumentError) {
+ this.abortOnDocumentError = abortOnDocumentError;
+ }
+
+ public boolean abortOnSendError() {
+ return abortOnSendError;
+ }
+
+ public void setAbortOnSendError(boolean abortOnSendError) {
+ this.abortOnSendError = abortOnSendError;
+ }
+
+ public String getIdPrefix() {
+ return idPrefix;
+ }
+
+ public void setIdPrefix(String idPrefix) {
+ this.idPrefix = idPrefix;
+ }
+
+ public void setRoute(String route) {
+ this.route = route;
+ }
+
+ public String getRoute() {
+ return route;
+ }
+
+ public DocumentProtocol.Priority getPriority() {
+ return priority;
+ }
+
+ public boolean isPriorityExplicitlySet() {
+ return priorityExplicitlySet;
+ }
+
+ public String getSlobrokConfigId() {
+ return slobrokConfigId;
+ }
+
+ public void setSlobrokConfigId(String slobrokConfigId) {
+ this.slobrokConfigId = slobrokConfigId;
+ }
+
+ public String getRoutingConfigId() {
+ return routingConfigId;
+ }
+
+ public void setRoutingConfigId(String routingConfigId) {
+ this.routingConfigId = routingConfigId;
+ }
+
+ public String getDocumentManagerConfigId() {
+ return documentManagerConfigId;
+ }
+
+ public void setDocumentManagerConfigId(String documentManagerConfigId) {
+ this.documentManagerConfigId = documentManagerConfigId;
+ }
+
+ public int getTraceLevel() {
+ return traceLevel;
+ }
+
+ public void setTraceLevel(int traceLevel) {
+ this.traceLevel = traceLevel;
+ }
+
+ public int getMessageBusPort() {
+ return mbusPort;
+ }
+
+ public void setMessageBusPort(int mbusPort) {
+ this.mbusPort = mbusPort;
+ }
+
+ public void setPriority(DocumentProtocol.Priority priority) {
+ this.priority = priority;
+ this.priorityExplicitlySet = true;
+ }
+
+ public String getDocprocChain() {
+ return docprocChain;
+ }
+
+ public void setDocprocChain(String chain) {
+ docprocChain = chain;
+ }
+
+ /**
+ * Creates a source session params object with parameters set as these options
+ * dictate.
+ */
+ public SourceSessionParams toSourceSessionParams() {
+ SourceSessionParams params = new SourceSessionParams();
+
+ StaticThrottlePolicy policy;
+ if (maxFeedRate > 0.0) {
+ policy = new RateThrottlingPolicy(maxFeedRate);
+ } else if ((maxPendingDocs == 0) && (maxPendingBytes == 0)) {
+ policy = new DynamicThrottlePolicy();
+ } else {
+ policy = new StaticThrottlePolicy();
+ }
+ if (maxPendingDocs > 0) {
+ policy.setMaxPendingCount(maxPendingDocs);
+ }
+ if (maxPendingBytes > 0) {
+ policy.setMaxPendingSize(maxPendingBytes);
+ }
+
+ params.setThrottlePolicy(policy);
+
+ params.setTimeout(getTimeout());
+ return params;
+ }
+
+ public MessageBusParams toMessageBusParams() {
+ MessageBusParams mbusParams = new MessageBusParams();
+ if (retryEnabled) {
+ RetryTransientErrorsPolicy retryPolicy = new RetryTransientErrorsPolicy();
+ retryPolicy.setBaseDelay(retryDelay);
+ mbusParams.setRetryPolicy(retryPolicy);
+ } else {
+ mbusParams.setRetryPolicy(null);
+ }
+ return mbusParams;
+ }
+
+ public RPCNetworkParams getNetworkParams() {
+ try {
+ RPCNetworkParams networkParams = new RPCNetworkParams();
+ if (mbusPort != -1) {
+ networkParams.setListenPort(mbusPort);
+ }
+ return networkParams;
+ } catch (Exception e) {
+ }
+
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "FeederOptions{" +
+ "abortOnDocumentError=" + abortOnDocumentError +
+ ", abortOnSendError=" + abortOnSendError +
+ ", retryEnabled=" + retryEnabled +
+ ", retryDelay=" + retryDelay +
+ ", timeout=" + timeout +
+ ", maxPendingBytes=" + maxPendingBytes +
+ ", maxPendingDocs=" + maxPendingDocs +
+ ", documentManagerConfigId='" + documentManagerConfigId + '\'' +
+ ", idPrefix='" + idPrefix + '\'' +
+ ", route='" + route + '\'' +
+ ", routingConfigId='" + routingConfigId + '\'' +
+ ", slobrokConfigId='" + slobrokConfigId + '\'' +
+ ", traceLevel=" + traceLevel +
+ ", mbusPort=" + mbusPort +
+ ", priority=" + priority.name() +
+ ", priorityExplicitlySet=" + priorityExplicitlySet +
+ ", docprocChain='" + docprocChain + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof FeederOptions)) return false;
+
+ FeederOptions that = (FeederOptions) o;
+
+ if (abortOnDocumentError != that.abortOnDocumentError) return false;
+ if (abortOnSendError != that.abortOnSendError) return false;
+ if (maxPendingBytes != that.maxPendingBytes) return false;
+ if (maxPendingDocs != that.maxPendingDocs) return false;
+ if (maxFeedRate != that.maxFeedRate) return false;
+ if (mbusPort != that.mbusPort) return false;
+ if (priorityExplicitlySet != that.priorityExplicitlySet) return false;
+ if (Double.compare(that.retryDelay, retryDelay) != 0) return false;
+ if (retryEnabled != that.retryEnabled) return false;
+ if (Double.compare(that.timeout, timeout) != 0) return false;
+ if (traceLevel != that.traceLevel) return false;
+ if (docprocChain != null ? !docprocChain.equals(that.docprocChain) : that.docprocChain != null) return false;
+ if (documentManagerConfigId != null ? !documentManagerConfigId.equals(that.documentManagerConfigId) : that.documentManagerConfigId != null) {
+ return false;
+ }
+ if (idPrefix != null ? !idPrefix.equals(that.idPrefix) : that.idPrefix != null) return false;
+ if (priority != that.priority) return false;
+ if (route != null ? !route.equals(that.route) : that.route != null) return false;
+ if (routingConfigId != null ? !routingConfigId.equals(that.routingConfigId) : that.routingConfigId != null) {
+ return false;
+ }
+ if (slobrokConfigId != null ? !slobrokConfigId.equals(that.slobrokConfigId) : that.slobrokConfigId != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result;
+ long temp;
+ result = (abortOnDocumentError ? 1 : 0);
+ result = 31 * result + (abortOnSendError ? 1 : 0);
+ result = 31 * result + (retryEnabled ? 1 : 0);
+ temp = retryDelay != +0.0d ? Double.doubleToLongBits(retryDelay) : 0L;
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ temp = timeout != +0.0d ? Double.doubleToLongBits(timeout) : 0L;
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ result = 31 * result + maxPendingBytes;
+ result = 31 * result + maxPendingDocs;
+ result = 31 * result + ((int)(maxFeedRate * 1000));
+ result = 31 * result + (documentManagerConfigId != null ? documentManagerConfigId.hashCode() : 0);
+ result = 31 * result + (idPrefix != null ? idPrefix.hashCode() : 0);
+ result = 31 * result + (route != null ? route.hashCode() : 0);
+ result = 31 * result + (routingConfigId != null ? routingConfigId.hashCode() : 0);
+ result = 31 * result + (slobrokConfigId != null ? slobrokConfigId.hashCode() : 0);
+ result = 31 * result + traceLevel;
+ result = 31 * result + mbusPort;
+ result = 31 * result + (priority != null ? priority.hashCode() : 0);
+ result = 31 * result + (priorityExplicitlySet ? 1 : 0);
+ result = 31 * result + (docprocChain != null ? docprocChain.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java
new file mode 100644
index 00000000000..06ba1ac8c10
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/JsonFeeder.java
@@ -0,0 +1,24 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import java.io.InputStream;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.vespaxmlparser.FeedReader;
+
+/**
+ * Unpack JSON document operations and push to a feed access point.
+ *
+ * @author steinar
+ */
+public class JsonFeeder extends Feeder {
+ public JsonFeeder(DocumentTypeManager docMan, SimpleFeedAccess sender, InputStream stream) {
+ super(docMan, new VespaFeedSender(sender), stream);
+ }
+
+ @Override
+ protected FeedReader createReader() throws Exception {
+ return new JsonFeedReader(stream, docMan);
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
new file mode 100755
index 00000000000..8021ea86783
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageBusSessionFactory.java
@@ -0,0 +1,102 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
+import com.yahoo.documentapi.messagebus.MessageBusParams;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+
+import java.util.Collections;
+
+public class MessageBusSessionFactory implements SessionFactory {
+
+ private final MessageBusDocumentAccess access;
+ private final MessagePropertyProcessor processor;
+
+ private interface Metrics {
+ String NUM_OPERATIONS = "num_operations";
+ String NUM_PUTS = "num_puts";
+ String NUM_REMOVES = "num_removes";
+ String NUM_UPDATES = "num_updates";
+ }
+
+ public MessageBusSessionFactory(MessagePropertyProcessor processor) {
+ this.processor = processor;
+ MessageBusParams params = new MessageBusParams(processor.getLoadTypes());
+ params.setTraceLevel(processor.getFeederOptions().getTraceLevel());
+ params.setRPCNetworkParams(processor.getFeederOptions().getNetworkParams());
+ params.setDocumentManagerConfigId("client");
+ access = new MessageBusDocumentAccess(params);
+ }
+
+ public MessageBusDocumentAccess getAccess() {
+ return access;
+ }
+
+ @Override
+ public synchronized SendSession createSendSession(ReplyHandler handler, Metric metric) {
+ return new SourceSessionWrapper(
+ access.getMessageBus().createSourceSession(handler, processor.getFeederOptions().toSourceSessionParams()),
+ metric);
+ }
+
+ public void shutDown() {
+ access.shutdown();
+ }
+
+ @Override
+ public synchronized VisitorSession createVisitorSession(VisitorParameters params) {
+ try {
+ return access.createVisitorSession(params);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class SourceSessionWrapper extends SendSession {
+ private final SourceSession session;
+ private final Metric metric;
+ private final Metric.Context context;
+
+ private SourceSessionWrapper(SourceSession session, Metric metric) {
+ this.session = session;
+ this.metric = metric;
+ this.context = metric.createContext(Collections.<String, String>emptyMap());
+ }
+
+ @Override
+ protected com.yahoo.messagebus.Result onSend(Message m, boolean blockIfQueueFull) throws InterruptedException {
+ updateCounters(m);
+ if (blockIfQueueFull) {
+ return session.sendBlocking(m);
+ } else {
+ return session.send(m);
+ }
+ }
+
+ private void updateCounters(Message m) {
+ metric.add(Metrics.NUM_OPERATIONS, 1, context);
+
+ if (m instanceof PutDocumentMessage) {
+ metric.add(Metrics.NUM_PUTS, 1, context);
+ } else if (m instanceof RemoveDocumentMessage) {
+ metric.add(Metrics.NUM_REMOVES, 1, context);
+ } else if (m instanceof UpdateDocumentMessage) {
+ metric.add(Metrics.NUM_UPDATES, 1, context);
+ }
+ }
+
+ @Override
+ public void close() {
+ session.close();
+ }
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java
new file mode 100755
index 00000000000..5ade035476d
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessageProcessor.java
@@ -0,0 +1,8 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.messagebus.Message;
+
+public interface MessageProcessor {
+ public void process(Message m);
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
new file mode 100644
index 00000000000..f99a73ebd55
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/MessagePropertyProcessor.java
@@ -0,0 +1,324 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.container.Container;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.jdisc.DocumentProcessingHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+import java.util.logging.Logger;
+
+/**
+ * Utility class for assigning properties to messages, either from implicit
+ * config values or from explicit values in requests.
+ */
+public class MessagePropertyProcessor implements ConfigSubscriber.SingleSubscriber<FeederConfig> {
+
+ private static final Logger log = Logger.getLogger(MessagePropertyProcessor.class.getName());
+ private FeederOptions feederOptions = null;
+ private Route defaultRoute = null;
+ private long defaultTimeoutMillis = 0;
+ private boolean retryEnabled = true;
+ private String defaultDocprocChain = null;
+ private boolean defaultAbortOnDocumentError = true;
+ private boolean defaultAbortOnSendError = true;
+ private boolean defaultCreateIfNonExistent = false;
+ private LoadTypeSet loadTypes = null;
+ private boolean configChanged = false;
+
+ public MessagePropertyProcessor(String configId, String loadTypeConfig) {
+ new ConfigSubscriber().subscribe(this, FeederConfig.class, configId);
+ loadTypes = new LoadTypeSet(loadTypeConfig);
+ }
+
+ public MessagePropertyProcessor(FeederConfig config, LoadTypeConfig loadTypeCfg) {
+ loadTypes = new LoadTypeSet();
+ configure(config, loadTypeCfg);
+ }
+
+ public void setRoute(String routeOverride) {
+ defaultRoute = Route.parse(routeOverride);
+ }
+
+ private synchronized String getDocprocChainParameter(HttpRequest request) {
+ String docprocChainParam = request.getProperty("docprocchain");
+ return (docprocChainParam == null ? defaultDocprocChain : docprocChainParam);
+ }
+
+ public synchronized DocprocService getDocprocChain(HttpRequest request) {
+ ComponentRegistry<DocprocService> services = getDocprocServiceRegistry(request);
+
+ String docprocChain = getDocprocChainParameter(request);
+ if (docprocChain == null) {
+ return null;
+ }
+
+ return services.getComponent(docprocChain);
+ }
+
+ public synchronized ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) {
+ String docprocChain = getDocprocChainParameter(request);
+ if (docprocChain == null) {
+ return null;
+ }
+
+ Container container = Container.get();
+ if (container == null) {
+ throw new IllegalStateException("Could not get Container instance.");
+ }
+
+ ComponentRegistry<RequestHandler> requestHandlerRegistry = container.getRequestHandlerRegistry();
+ if (requestHandlerRegistry == null) {
+ throw new IllegalStateException("Could not get requesthandlerregistry.");
+ }
+
+ DocumentProcessingHandler handler = (DocumentProcessingHandler) requestHandlerRegistry
+ .getComponent(DocumentProcessingHandler.class.getName());
+ if (handler == null) {
+ return null;
+ }
+ ComponentRegistry<DocprocService> services = handler.getDocprocServiceRegistry();
+ if (services == null) {
+ throw new IllegalStateException("Could not get DocprocServiceRegistry.");
+ }
+ return services;
+ }
+
+ public PropertySetter buildPropertySetter(HttpRequest request) {
+ String routeParam = null;
+ double timeoutParam = -1;
+ String priorityParam = null;
+ String abortOnDocErrorParam = null;
+ String abortOnFeedErrorParam = null;
+ String loadTypeStr = null;
+ String traceStr = null;
+ String createIfNonExistentParam = null;
+
+ if (request != null) {
+ routeParam = request.getProperty("route");
+
+ String timeoutStr = request.getProperty("timeout");
+ if (timeoutStr != null) {
+ timeoutParam = Double.parseDouble(timeoutStr);
+ }
+
+ priorityParam = request.getProperty("priority");
+ traceStr = request.getProperty("tracelevel");
+ abortOnDocErrorParam = request.getProperty("abortondocumenterror");
+ abortOnFeedErrorParam = request.getProperty("abortonfeederror");
+ loadTypeStr = request.getProperty("loadtype");
+ createIfNonExistentParam = request.getProperty("createifnonexistent");
+ }
+
+ Route route = (routeParam != null ? Route.parse(routeParam) : null);
+ long timeout;
+ boolean retry;
+ boolean abortOnDocumentError;
+ boolean abortOnFeedError;
+ boolean createIfNonExistent;
+
+ synchronized (this) {
+ if (route == null) {
+ route = defaultRoute;
+ }
+ timeout = (timeoutParam < 0 ? defaultTimeoutMillis : (long)(timeoutParam * 1000));
+ retry = retryEnabled;
+ abortOnDocumentError = (abortOnDocErrorParam == null ? defaultAbortOnDocumentError : (!"false".equals(abortOnDocErrorParam)));
+ abortOnFeedError = (abortOnFeedErrorParam == null ? defaultAbortOnSendError : (!"false".equals(abortOnFeedErrorParam)));
+ createIfNonExistent = (createIfNonExistentParam == null ? defaultCreateIfNonExistent : ("true".equals(createIfNonExistentParam)));
+ }
+ DocumentProtocol.Priority priority = null;
+ if (priorityParam != null) {
+ priority = DocumentProtocol.getPriorityByName(priorityParam);
+ }
+
+ LoadType loadType = null;
+ if (loadTypes != null && loadTypeStr != null) {
+ loadType = loadTypes.getNameMap().get(loadTypeStr);
+ }
+
+ if (loadType == null) {
+ loadType = LoadType.DEFAULT;
+ }
+
+ return new PropertySetter(route, timeout, priority, loadType, retry, abortOnDocumentError, abortOnFeedError, createIfNonExistent, traceStr != null ? Integer.parseInt(traceStr) : 0);
+ }
+
+ public long getDefaultTimeoutMillis() { return defaultTimeoutMillis; }
+
+ public synchronized boolean configChanged() {
+ return configChanged;
+ }
+
+ public synchronized void setConfigChanged(boolean configChanged) {
+ this.configChanged = configChanged;
+ }
+
+ public synchronized FeederOptions getFeederOptions() {
+ return feederOptions;
+ }
+
+ public synchronized void configure(FeederConfig config, LoadTypeConfig loadTypeConfig) {
+ loadTypes.configure(loadTypeConfig);
+ configure(config);
+ }
+
+ public LoadTypeSet getLoadTypes() {
+ return loadTypes;
+ }
+
+ public synchronized void configure(FeederConfig config) {
+ if (feederOptions != null) {
+ setConfigChanged(true);
+ }
+
+ feederOptions = new FeederOptions(config);
+ if (feederOptions.getRoute() != null) {
+ defaultRoute = Route.parse(feederOptions.getRoute());
+ } else {
+ defaultRoute = null;
+ }
+ defaultTimeoutMillis = (long) (feederOptions.getTimeout() * 1000);
+ retryEnabled = feederOptions.getRetryEnabled();
+ defaultAbortOnDocumentError = feederOptions.abortOnDocumentError();
+ defaultAbortOnSendError = feederOptions.abortOnSendError();
+
+ if (!"".equals(feederOptions.getDocprocChain())) {
+ defaultDocprocChain = feederOptions.getDocprocChain();
+ } else {
+ defaultDocprocChain = null;
+ }
+
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Received new config (" +
+ "route: " + (defaultRoute != null ? defaultRoute : "<none>") +
+ ", timeout: " + defaultTimeoutMillis + " ms, retry enabled: " + retryEnabled +
+ ", docproc chain: " + (defaultDocprocChain != null ? defaultDocprocChain : "<none>") +
+ ", abort on doc error: " + defaultAbortOnDocumentError +
+ ", abort on feed error: " + defaultAbortOnSendError + ")");
+ }
+ }
+
+ public class PropertySetter implements MessageProcessor {
+ /** Route either set by configuration or by explicit request override. May be null */
+ private Route route;
+ /** Timeout (in milliseconds) */
+ private long timeout;
+ /** Explicit priority set. May be null */
+ private DocumentProtocol.Priority priority;
+ private boolean retryEnabled;
+ private boolean abortOnDocumentError;
+ private boolean abortOnFeedError;
+ private boolean createIfNonExistent;
+ private LoadType loadType;
+ private int traceLevel;
+
+ public PropertySetter(Route route, long timeout, DocumentProtocol.Priority priority, LoadType loadType,
+ boolean retryEnabled, boolean abortOnDocumentError, boolean abortOnFeedError,
+ boolean createIfNonExistent, int traceLevel) {
+ this.route = route;
+ this.timeout = timeout;
+ this.priority = priority;
+ this.loadType = loadType;
+ this.retryEnabled = retryEnabled;
+ this.abortOnDocumentError = abortOnDocumentError;
+ this.abortOnFeedError = abortOnFeedError;
+ this.createIfNonExistent = createIfNonExistent;
+ this.traceLevel = traceLevel;
+ }
+
+ public Route getRoute() {
+ return route;
+ }
+
+ public void setRoute(Route route) {
+ this.route = route;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public DocumentProtocol.Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(DocumentProtocol.Priority priority) {
+ this.priority = priority;
+ }
+
+ public LoadType getLoadType() {
+ return loadType;
+ }
+
+ public void setLoadType(LoadType loadType) {
+ this.loadType = loadType;
+ }
+
+ public boolean getAbortOnDocumentError() {
+ return abortOnDocumentError;
+ }
+
+ public boolean getAbortOnFeedError() {
+ return abortOnFeedError;
+ }
+
+ public boolean getCreateIfNonExistent() {
+ return createIfNonExistent;
+ }
+
+ @Override
+ public void process(Message msg) {
+ if (route != null) {
+ msg.setRoute(route);
+ }
+ msg.setTimeRemaining(timeout);
+ msg.setRetryEnabled(retryEnabled);
+ msg.getTrace().setLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel));
+
+ if (loadType != null) {
+ ((DocumentMessage) msg).setLoadType(loadType);
+ ((DocumentMessage) msg).setPriority(loadType.getPriority());
+ }
+
+ if (priority != null) {
+ ((DocumentMessage) msg).setPriority(priority);
+ }
+ }
+
+ public void process(VisitorParameters params) {
+ if (route != null) {
+ params.setRoute(route);
+ }
+ params.setTimeoutMs(timeout);
+
+ params.setTraceLevel(Math.max(getFeederOptions().getTraceLevel(), traceLevel));
+
+ if (loadType != null) {
+ params.setLoadType(loadType);
+ params.setPriority(loadType.getPriority());
+ }
+
+ if (priority != null) {
+ params.setPriority(priority);
+ }
+ }
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java
new file mode 100755
index 00000000000..dc2294832b1
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SendSession.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Result;
+
+/**
+ * Wrapper class to send Messages. Used instead of using a MessageBus session directly
+ * so that unit tests can be more easily made.
+ */
+public abstract class SendSession {
+
+ protected abstract Result onSend(Message m, boolean blockIfQueueIsFull) throws InterruptedException;
+
+ public Result send(Message m, boolean blockIfQueueIsFull) throws InterruptedException {
+ return onSend(m, blockIfQueueIsFull);
+ }
+
+ public abstract void close();
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
new file mode 100755
index 00000000000..307d8773049
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SessionFactory.java
@@ -0,0 +1,32 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.ReplyHandler;
+
+/**
+ * Wraps the creation of messagebus source sessions to allow
+ * for unit testing of the components without involving messagebus itself.
+ */
+public interface SessionFactory {
+
+ /**
+ * Creates a messagebus session for sending regular messages.
+ *
+ *
+ * @param handler A replyhandler to callback when receiving replies from messagebus
+ * @return The session to use for sending messages.
+ */
+ SendSession createSendSession(ReplyHandler handler, Metric metric);
+
+ /**
+ * Creates a messagebus session for visiting data.
+ *
+ * @param params Parameters to the visitor
+ * @return A visitor session.
+ */
+ VisitorSession createVisitorSession(VisitorParameters params);
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
new file mode 100755
index 00000000000..48ff0eae36b
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SharedSender.java
@@ -0,0 +1,246 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.clientmetrics.RouteMetricSet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * This class allows multiple clients to use one shared messagebus session.
+ * The user should create a ResultCallback, which acts as a "session" for that
+ * client, and send one or more messages using the send() methods.
+ * When done sending messages, the client can wait for all messages to be replied to
+ * using the waitForPending method.
+ */
+public class SharedSender implements ReplyHandler {
+
+ public static final Logger log = Logger.getLogger(SharedSender.class.getName());
+
+ private SendSession sender;
+ private final Object monitor = new Object();
+ private RouteMetricSet metrics;
+
+ // Maps from filename to number of pending requests
+ private Map<ResultCallback, Integer> activeOwners = new HashMap<>();
+
+ /**
+ * Creates a new shared sender.
+ * If oldsender != null, we copy that status information from that sender.
+ */
+ public SharedSender(String route, SessionFactory factory, SharedSender oldSender, Metric metric) {
+ if (factory != null) {
+ sender = factory.createSendSession(this, metric);
+ }
+
+ if (oldSender != null) {
+ this.metrics = oldSender.metrics;
+ } else {
+ metrics = new RouteMetricSet(route, null);
+ }
+ }
+
+ public RouteMetricSet getMetrics() {
+ return metrics;
+ }
+
+ public void remove(ResultCallback owner) {
+ synchronized (monitor) {
+ activeOwners.remove(owner);
+ }
+ }
+
+ public void shutdown() {
+ try {
+ synchronized (monitor) {
+ while ( ! activeOwners.isEmpty()) {
+ monitor.wait(180 * 1000);
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ sender.close();
+ }
+
+ /**
+ * Waits until there are no more pending documents
+ * for the given callback, or the timeout expires.
+ *
+ * @param owner The callback to check for.
+ * @param timeoutMs The number of milliseconds to wait, or -1 to wait indefinitely.
+ * @return true if there were no more pending, or false if the timeout expired.
+ */
+ public boolean waitForPending(ResultCallback owner, long timeoutMs) {
+ long timeStart = SystemTimer.INSTANCE.milliTime();
+ long timeLeft = timeoutMs;
+
+ try {
+ while (timeoutMs == -1 || timeLeft > 0) {
+ synchronized (monitor) {
+ Integer count = activeOwners.get(owner);
+ if (count == null || count == 0) {
+ return true;
+ } else if (timeLeft > 0) {
+ monitor.wait(timeLeft);
+ } else {
+ monitor.wait();
+ }
+ }
+
+ timeLeft = timeoutMs - (SystemTimer.INSTANCE.milliTime() - timeStart);
+ }
+ } catch (InterruptedException e) {
+ }
+
+ return false;
+ }
+
+ public int getPendingCount(ResultCallback owner) {
+ Integer count = activeOwners.get(owner);
+ if (count == null) {
+ return 0;
+ }
+ return count;
+ }
+
+ /**
+ * Returns true if the given result callback has any pending messages with this
+ * sender.
+ *
+ * @param owner The callback to check
+ * @return True if there are any pending, false if not.
+ */
+ public boolean hasPending(ResultCallback owner) {
+ return getPendingCount(owner) > 0;
+ }
+
+ /**
+ * Waits until the given file has no pending documents.
+ *
+ * @param owner the file to check for pending documents
+ */
+ public void waitForPending(ResultCallback owner) {
+ waitForPending(owner, -1);
+ }
+
+ /**
+ * Sends a message
+ *
+ * @param msg The message to send.
+ * @param owner A callback to send replies to when received from messagebus
+ */
+ public void send(Message msg, ResultCallback owner) {
+ send(msg, owner, -1, true);
+ }
+
+ /**
+ * Sends a message. Waits until the number of pending messages for this owner has
+ * become lower than the specified limit if necessary.
+ *
+ * @param msg The message to send
+ * @param owner The callback to send replies to when received from messagebus
+ * @param maxPendingPerOwner The maximum number of pending messages the callback
+ * @param blockingQueue If true, block until the message bus queue is available.
+ */
+ public void send(Message msg, ResultCallback owner, int maxPendingPerOwner, boolean blockingQueue) {
+ // Silently fail messages that are attempted sent after the callback aborted.
+ if (owner.isAborted()) {
+ return;
+ }
+
+ try {
+ synchronized (monitor) {
+ if (maxPendingPerOwner != -1 && blockingQueue) {
+ while (true) {
+ Integer count = activeOwners.get(owner);
+
+ if (count != null && count >= maxPendingPerOwner) {
+ log.log(LogLevel.INFO, "Owner " + owner + " already has " + count + " pending. Waiting for replies");
+ monitor.wait(10000);
+ } else {
+ break;
+ }
+ }
+ }
+
+ Integer count = activeOwners.get(owner);
+
+ if (count == null) {
+ activeOwners.put(owner, 1);
+ } else {
+ activeOwners.put(owner, count + 1);
+ }
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ msg.setContext(owner);
+
+ try {
+ com.yahoo.messagebus.Result r = sender.send(msg, blockingQueue);
+ if (!r.isAccepted()) {
+ EmptyReply reply = new EmptyReply();
+ msg.swapState(reply);
+ reply.setMessage(msg);
+ reply.addError(r.getError());
+ handleReply(reply);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * Implement replyHandler from messagebus. Called when a reply is received from messagebus.
+ * Tries to find the callback from the reply context and updates the pending state for the callback.
+ *
+ * @param r the reply to process.
+ */
+ @Override
+ public void handleReply(Reply r) {
+ synchronized (monitor) {
+ ResultCallback owner = (ResultCallback) r.getContext();
+
+ if (owner != null) {
+ metrics.addReply(r);
+
+ Integer count = activeOwners.get(owner);
+
+ if (count != null) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Received reply for file " + owner.toString() + " count was " + count);
+ }
+
+ if ( ! owner.handleReply(r, count - 1)) {
+ activeOwners.remove(owner);
+ } else {
+ activeOwners.put(owner, count - 1);
+ }
+ }
+ } else {
+ log.log(LogLevel.WARNING, "Received reply " + r + " for message " + r.getMessage() + " without context");
+ }
+
+ monitor.notifyAll();
+ }
+ }
+
+ public interface ResultCallback {
+
+ /** Return true if we should continue waiting for replies for this sender. */
+ boolean handleReply(Reply r, int numPending);
+
+ /**
+ * Returns true if feeding has been aborted. No more feeding is allowed with this
+ * callback after that.
+ */
+ boolean isAborted();
+
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java
new file mode 100755
index 00000000000..52f5add9f44
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+
+public interface SimpleFeedAccess {
+
+ void put(Document doc);
+ void remove(DocumentId docId);
+ void update(DocumentUpdate update);
+ void put(Document doc, TestAndSetCondition condition);
+ void remove(DocumentId docId, TestAndSetCondition condition);
+ void update(DocumentUpdate update, TestAndSetCondition condition);
+ boolean isAborted();
+
+} \ No newline at end of file
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
new file mode 100755
index 00000000000..a9a08562c9d
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
@@ -0,0 +1,114 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.messagebus.Message;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Simplifies sending messages belonging to a single result callback. */
+public class SingleSender implements SimpleFeedAccess {
+
+ private final SharedSender.ResultCallback owner;
+ private final SharedSender sender;
+ private final List<MessageProcessor> messageProcessors = new ArrayList<>();
+ private boolean blockingQueue;
+
+ public SingleSender(SharedSender.ResultCallback owner, SharedSender sender, boolean blockingQueue) {
+ this.owner = owner;
+ this.sender = sender;
+ this.blockingQueue = blockingQueue;
+ }
+
+ public SingleSender(SharedSender.ResultCallback owner, SharedSender sender) {
+ this(owner, sender, true);
+ }
+
+ @Override
+ public void put(Document doc) {
+ send(new PutDocumentMessage(new DocumentPut(doc)));
+ }
+
+ @Override
+ public void remove(DocumentId docId) {
+ send(new RemoveDocumentMessage(docId));
+ }
+
+ @Override
+ public void update(DocumentUpdate update) {
+ send(new UpdateDocumentMessage(update));
+ }
+
+ @Override
+ public void put(Document doc, TestAndSetCondition condition) {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(doc));
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public void remove(DocumentId docId, TestAndSetCondition condition) {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(docId);
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, TestAndSetCondition condition) {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(update);
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return owner.isAborted();
+ }
+
+ public void addMessageProcessor(MessageProcessor processor) {
+ messageProcessors.add(processor);
+ }
+
+ // Runs all message processors on the message and returns it.
+ private Message processMessage(Message m) {
+ for (MessageProcessor processor : messageProcessors) {
+ processor.process(m);
+ }
+ return m;
+ }
+
+ public void send(Message m) {
+ send(m, -1);
+ }
+
+ /**
+ * Sends the given message, allowing a maximum of maxPending messages to be
+ * sent for this sender.
+ *
+ * @param m The message to send
+ * @param maxPending The number of pending messages to block on for this sender.
+ */
+ public void send(Message m, int maxPending) {
+ sender.send(processMessage(m), owner, maxPending, blockingQueue);
+ }
+
+ public void done() {
+ // empty
+ }
+
+ public void waitForPending() {
+ waitForPending(-1);
+ }
+
+ public boolean waitForPending(long timeoutMs) {
+ return sender.waitForPending(owner, timeoutMs);
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java
new file mode 100755
index 00000000000..7838f00642a
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+/**
+ * Wrapper class for SimpleFeedAccess to send various XML operations.
+ */
+public class VespaFeedSender {
+
+ private final SimpleFeedAccess sender;
+
+ public VespaFeedSender(SimpleFeedAccess sender) {
+ this.sender = sender;
+ }
+
+ public boolean isAborted() {
+ return sender.isAborted();
+ }
+
+ public void sendOperation(VespaXMLFeedReader.Operation op) {
+ switch (op.getType()) {
+ case DOCUMENT:
+ sender.put(op.getDocument(), op.getCondition());
+ break;
+ case REMOVE:
+ sender.remove(op.getRemove(), op.getCondition());
+ break;
+ case UPDATE:
+ sender.update(op.getDocumentUpdate(), op.getCondition());
+ break;
+ }
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java
new file mode 100755
index 00000000000..3c989e6d42d
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+import java.io.InputStream;
+
+/**
+ * Unpack a stream of document operations represented as XML and push to a feed
+ * access point.
+ *
+ * @author Thomas Gundersen
+ * @author steinar
+ */
+public class XMLFeeder extends Feeder {
+ public XMLFeeder(DocumentTypeManager docMan, SimpleFeedAccess sender, InputStream stream) {
+ super(docMan, new VespaFeedSender(sender), stream);
+ }
+
+ @Override
+ protected FeedReader createReader() throws Exception {
+ return new VespaXMLFeedReader(stream, docMan);
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
new file mode 100755
index 00000000000..24d30c4aa0a
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/FeedResponse.java
@@ -0,0 +1,163 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedhandler;
+
+import com.yahoo.clientmetrics.RouteMetricSet;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.container.jdisc.VespaHeaders;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.feedapi.SharedSender;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.text.Utf8String;
+import com.yahoo.text.XMLWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+@SuppressWarnings("deprecation")
+public final class FeedResponse extends HttpResponse implements SharedSender.ResultCallback {
+
+ private final static Logger log = Logger.getLogger(FeedResponse.class.getName());
+ private final List<ErrorMessage> errorMessages = new ArrayList<>();
+ private final List<String> errors = new ArrayList<>();
+ private final StringBuilder traces = new StringBuilder();
+ private final RouteMetricSet metrics;
+ private boolean abortOnError = false;
+ private boolean isAborted = false;
+
+ public FeedResponse(RouteMetricSet metrics) {
+ super(com.yahoo.jdisc.http.HttpResponse.Status.OK);
+ this.metrics = metrics;
+ }
+
+ public boolean isAborted() {
+ return isAborted;
+ }
+
+ public void setAbortOnFeedError(boolean abort) {
+ abortOnError = abort;
+ }
+
+ @Override
+ public void render(OutputStream outputStream) throws IOException {
+ if ( ! errorMessages.isEmpty())
+ setStatus(VespaHeaders.getStatus(false, errorMessages.get(0), errorMessages.iterator()));
+
+ XMLWriter writer = new XMLWriter(new OutputStreamWriter(outputStream));
+ writer.openTag("result");
+
+ if (metrics != null) {
+ metrics.printXml(writer, 0, 0);
+ }
+ if (traces.length() > 0) {
+ writer.openTag("trace");
+ writer.append(traces);
+ writer.closeTag();
+ }
+ if (!errors.isEmpty()) {
+ writer.openTag("errors");
+ writer.attribute(new Utf8String("count"), errors.size());
+
+ for (int i = 0; i < errors.size() && i < 10; ++i) {
+ writer.openTag("error");
+ writer.attribute(new Utf8String("message"), errors.get(i));
+ writer.closeTag();
+ }
+ writer.closeTag();
+ }
+
+ writer.closeTag();
+ writer.flush();
+ outputStream.close();
+ }
+
+ @Override
+ public java.lang.String getContentType() {
+ return "application/xml";
+ }
+
+ public String prettyPrint(Message m) {
+ if (m instanceof PutDocumentMessage) {
+ return "PUT[" + ((PutDocumentMessage)m).getDocumentPut().getDocument().getId() + "] ";
+ }
+ if (m instanceof RemoveDocumentMessage) {
+ return "REMOVE[" + ((RemoveDocumentMessage)m).getDocumentId() + "] ";
+ }
+ if (m instanceof UpdateDocumentMessage) {
+ return "UPDATE[" + ((UpdateDocumentMessage)m).getDocumentUpdate().getId() + "] ";
+ }
+
+ return "";
+ }
+
+ public boolean handleReply(Reply reply, int numPending) {
+ metrics.addReply(reply);
+ if (reply.getTrace().getLevel() > 0) {
+ String str = reply.getTrace().toString();
+ traces.append(str);
+ System.out.println(str);
+ }
+
+ if (containsFatalErrors(reply.getErrors())) {
+ for (int i = 0; i < reply.getNumErrors(); ++i) {
+ Error err = reply.getError(i);
+ StringBuilder out = new StringBuilder(prettyPrint(reply.getMessage()));
+ out.append("[").append(DocumentProtocol.getErrorName(err.getCode())).append("] ");
+ if (err.getService() != null) {
+ out.append("(").append(err.getService()).append(") ");
+ }
+ out.append(err.getMessage());
+
+ String str = out.toString();
+ log.finest(str);
+ addError(str);
+ }
+ isAborted = abortOnError;
+ return !abortOnError;
+ }
+ return numPending > 0;
+ }
+
+ public void done() {
+ metrics.done();
+ }
+
+ public FeedResponse addXMLParseError(String error) {
+ errorMessages.add(ErrorMessage.createBadRequest(error));
+ errors.add(error);
+ return this;
+ }
+
+ public FeedResponse addError(String error) {
+ errorMessages.add(ErrorMessage.createBadRequest(error));
+ errors.add(error);
+ return this;
+ }
+
+ public List<String> getErrorList() {
+ return errors;
+ }
+
+ public List<ErrorMessage> getErrorMessageList() {
+ return errorMessages;
+ }
+
+ private static boolean containsFatalErrors(Stream<Error> errors) {
+ return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED);
+ }
+
+ public boolean isSuccess() {
+ return errors.isEmpty();
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java
new file mode 100644
index 00000000000..4e0896b6da4
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/MetricResponse.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedhandler;
+
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.metrics.MetricSet;
+import com.yahoo.text.XMLWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+/**
+ * Response that generates metric output like a status page.
+ */
+public final class MetricResponse extends HttpResponse {
+
+ MetricSet set;
+
+ MetricResponse(MetricSet set) {
+ super(com.yahoo.jdisc.http.HttpResponse.Status.OK);
+ this.set = set;
+ }
+
+ @Override
+ public void render(OutputStream stream) throws IOException {
+ XMLWriter writer = new XMLWriter(new OutputStreamWriter(stream));
+ writer.openTag("status");
+ set.printXml(writer, 0, 2);
+ writer.closeTag();
+ writer.flush();
+ }
+
+ @Override
+ public java.lang.String getContentType() {
+ return "application/xml";
+ }
+
+} \ No newline at end of file
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java
new file mode 100644
index 00000000000..22f75705334
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/NullFeedMetric.java
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedhandler;
+
+import com.yahoo.jdisc.Metric;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.20
+ */
+public final class NullFeedMetric implements Metric {
+ @Override
+ public void set(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public void add(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public Context createContext(Map<String, ?> properties) {
+ return NullFeedContext.INSTANCE;
+ }
+
+ private static class NullFeedContext implements Context {
+ private static final NullFeedContext INSTANCE = new NullFeedContext();
+ }
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
new file mode 100755
index 00000000000..6e3facbdc98
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -0,0 +1,102 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedhandler;
+
+import com.google.inject.Inject;
+import com.yahoo.clientmetrics.RouteMetricSet;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.HttpResponse;
+import com.yahoo.feedapi.DocprocMessageProcessor;
+import com.yahoo.feedapi.FeedContext;
+import com.yahoo.feedapi.Feeder;
+import com.yahoo.feedapi.JsonFeeder;
+import com.yahoo.feedapi.MessagePropertyProcessor;
+import com.yahoo.feedapi.SingleSender;
+import com.yahoo.feedapi.XMLFeeder;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * Feed documents from a com.yahoo.container.handler.Request.
+ *
+ * @author Thomas Gundersen
+ * @author steinar
+ */
+public final class VespaFeedHandler extends VespaFeedHandlerBase {
+
+ public static final String JSON_INPUT = "jsonInput";
+
+ @Inject
+ public VespaFeedHandler(FeederConfig feederConfig, LoadTypeConfig loadTypeConfig, Executor executor,
+ Metric metric) throws Exception {
+ super(feederConfig, loadTypeConfig, executor, metric);
+ }
+
+ VespaFeedHandler(FeedContext context, Executor executor) throws Exception {
+ super(context, executor);
+ }
+
+ public static VespaFeedHandler createFromContext(FeedContext context, Executor executor) throws Exception {
+ return new VespaFeedHandler(context, executor);
+ }
+
+ @Override
+ public HttpResponse handle(HttpRequest request) {
+ return handle(request, (RouteMetricSet.ProgressCallback)null);
+ }
+
+ public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) {
+ if (request.getProperty("status") != null) {
+ return new MetricResponse(context.getMetrics().getMetricSet());
+ }
+
+ boolean asynchronous = request.getBooleanProperty("asynchronous");
+
+ MessagePropertyProcessor.PropertySetter properties = getPropertyProcessor().buildPropertySetter(request);
+
+ String route = properties.getRoute().toString();
+ FeedResponse response = new FeedResponse(new RouteMetricSet(route, callback));
+
+ SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
+ sender.addMessageProcessor(properties);
+ sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
+
+ Feeder feeder = createFeeder(sender, request);
+ feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
+ feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
+ response.setAbortOnFeedError(properties.getAbortOnFeedError());
+
+ List<String> errors = feeder.parse();
+ for (String s : errors) {
+ response.addXMLParseError(s);
+ }
+ if (errors.size() > 0 && feeder instanceof XMLFeeder) {
+ response.addXMLParseError("If you are trying to feed JSON, set the Content-Type header to application/json.");
+ }
+
+ sender.done();
+
+ if (asynchronous) {
+ return response;
+ }
+ long millis = getTimeoutMillis(request);
+ boolean completed = sender.waitForPending(millis);
+ if ( ! completed)
+ response.addError("Timed out after "+millis+" ms waiting for responses");
+ response.done();
+ return response;
+ }
+
+ private Feeder createFeeder(SingleSender sender, HttpRequest request) {
+ String contentType = request.getHeader("Content-Type");
+ if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
+ return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
+ } else {
+ return new XMLFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
+ }
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
new file mode 100755
index 00000000000..fa1e6854593
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
@@ -0,0 +1,93 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedhandler;
+
+import com.google.inject.Inject;
+import com.yahoo.clientmetrics.ClientMetrics;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.container.jdisc.HttpRequest;
+import com.yahoo.container.jdisc.ThreadedHttpRequestHandler;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.feedapi.FeedContext;
+import com.yahoo.feedapi.MessagePropertyProcessor;
+import com.yahoo.feedapi.SharedSender;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.search.query.ParameterParser;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
+import com.yahoo.vespaclient.config.FeederConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.Executor;
+import java.util.zip.GZIPInputStream;
+
+public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
+
+ protected FeedContext context;
+ private final long defaultTimeoutMillis;
+
+ @Inject
+ public VespaFeedHandlerBase(FeederConfig feederConfig,
+ LoadTypeConfig loadTypeConfig,
+ Executor executor,
+ Metric metric) throws Exception {
+ this(FeedContext.getInstance(feederConfig, loadTypeConfig, metric), executor, (long)feederConfig.timeout() * 1000);
+ }
+
+ public VespaFeedHandlerBase(FeedContext context, Executor executor) throws Exception {
+ this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis());
+ }
+
+ public VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) throws Exception {
+ super(executor);
+ this.context = context;
+ this.defaultTimeoutMillis = defaultTimeoutMillis;
+ }
+
+ public SharedSender getSharedSender(String route) {
+ return context.getSharedSender(route);
+ }
+
+ public DocprocService getDocprocChain(HttpRequest request) {
+ return context.getPropertyProcessor().getDocprocChain(request);
+ }
+
+ public ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) {
+ return context.getPropertyProcessor().getDocprocServiceRegistry(request);
+ }
+
+ public MessagePropertyProcessor getPropertyProcessor() {
+ return context.getPropertyProcessor();
+ }
+
+ /**
+ * @param request Request object to get the POST data stream from
+ * @return An InputStream that either is a GZIP wrapper or simply the
+ * original data stream.
+ * @throws IllegalArgumentException if GZIP stream creation failed
+ */
+ public InputStream getRequestInputStream(HttpRequest request) {
+ if ("gzip".equals(request.getHeader("Content-Encoding"))) {
+ try {
+ return new GZIPInputStream(request.getData());
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to create GZIP input stream from content", e);
+ }
+ } else {
+ return request.getData();
+ }
+ }
+
+ protected DocumentTypeManager getDocumentTypeManager() {
+ return context.getDocumentTypeManager();
+ }
+
+ public ClientMetrics getMetrics() {
+ return context.getMetrics();
+ }
+
+ protected long getTimeoutMillis(HttpRequest request) {
+ return ParameterParser.asMilliSeconds(request.getProperty("timeout"), defaultTimeoutMillis);
+ }
+
+}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java
new file mode 100644
index 00000000000..309adee673b
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/package-info.java
@@ -0,0 +1,8 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Exported not as an API, but specificalle for SSBE.
+ */
+@ExportPackage
+package com.yahoo.feedhandler;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java
new file mode 100644
index 00000000000..ab404893c08
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterDef.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespaclient;
+
+public class ClusterDef {
+ public ClusterDef(String name, String configId) {
+ this.name = name;
+ this.configId = configId;
+ }
+
+ String name;
+ String configId;
+
+ public String getName() { return name; }
+ public String getConfigId() { return configId; }
+} \ No newline at end of file
diff --git a/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java
new file mode 100644
index 00000000000..3ea3bb5cb9d
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/vespaclient/ClusterList.java
@@ -0,0 +1,40 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespaclient;
+
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.config.subscription.ConfigGetter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterList {
+ List<ClusterDef> storageClusters = new ArrayList<ClusterDef>();
+
+ public ClusterList() {
+ this(null);
+ }
+
+ public ClusterList(String configId) {
+ if (configId != null) {
+ configure(new ConfigGetter<>(ClusterListConfig.class).getConfig(configId));
+ }
+ }
+
+ public List<ClusterDef> getStorageClusters() {
+ return storageClusters;
+ }
+
+ public void configure(ClusterListConfig cfg) {
+ storageClusters.clear();
+ for (int i = 0; i < cfg.storage().size(); i++) {
+ storageClusters.add(new ClusterDef(cfg.storage(i).name(),
+ cfg.storage(i).configid()));
+ }
+ }
+
+ public static ClusterList createMockedList(List<ClusterDef> clusters) {
+ ClusterList list = new ClusterList(null);
+ list.storageClusters = clusters;
+ return list;
+ }
+}
diff --git a/vespaclient-core/src/main/resources/configdefinitions/feeder.def b/vespaclient-core/src/main/resources/configdefinitions/feeder.def
new file mode 100644
index 00000000000..c5a77ef59e6
--- /dev/null
+++ b/vespaclient-core/src/main/resources/configdefinitions/feeder.def
@@ -0,0 +1,46 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=16
+namespace=vespaclient.config
+
+## Whether or not to abort if there are document-related errors.
+## Value 'false' will skip bad documents, but still abort on other errors.
+abortondocumenterror bool default=true
+
+## Whether or not to abort if there are errors sending messages to Vespa
+abortonsenderror bool default=true
+
+## Prefix each document id with this string.
+idprefix string default=""
+
+## Max number of pending operations.
+maxpendingdocs int default=0
+
+## Max number of bytes in pending operations.
+maxpendingbytes int default=0
+
+## Max number of operations to perform per second (0 == no max)
+maxfeedrate double default=0.0
+
+## Whether or not retrying is enabled.
+retryenabled bool default=true
+
+## Delay between retries.
+retrydelay double default=1
+
+## Timeout for messagebus operations.
+timeout double default=180
+
+## Route to feed documents on
+route string default="default"
+
+## Trace level for messages
+tracelevel int default=0
+
+## Messagebus port to start source session on
+mbusport int default=-1
+
+## Default docproc chain to run
+docprocchain string default=""
+
+## Whether or not to set create-if-non-existent flag on all document updates handled by a feeder.
+createifnonexistent bool default=false
diff --git a/vespaclient-core/src/main/resources/configdefinitions/spooler.def b/vespaclient-core/src/main/resources/configdefinitions/spooler.def
new file mode 100644
index 00000000000..7fef30cbfea
--- /dev/null
+++ b/vespaclient-core/src/main/resources/configdefinitions/spooler.def
@@ -0,0 +1,34 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+namespace=vespa.config.content.spooler
+
+# The root directory under which the spooler will scan for content files
+directory string default="var/spool/vespa"
+
+# If true, move successfully processed files to <directory>/success
+keepsuccess bool default=false
+
+# How many bytes to store maximum in the failures directory (delete files instead of moving to failures if this is exceeded)
+# If 0, don't restrict
+maxfailuresize int default=0
+maxfatalfailuresize int default=0
+
+# How many bytes to store maximum in the successes directory (delete files instead of moving to successes if this is exceeded)
+# If 0, don't restrict
+maxsuccesssize int default=0
+
+# How many worker threads to use
+threads int default=10
+
+# Which parsers to use and config for each of them.
+parsers[].classname string
+parsers[].parameters[].key string
+parsers[].parameters[].value string
+
+# The time spent in failures (seconds) until moving to fatalfailures (default 3 days)
+maxfailuretime int default=259200
+
+# The minimum number of retries before moving to fatalfailures
+minfailureretries int default=5
+
+# The maximum number of retries when getting transient errors sending messages to backend
+maxretries int default=5
diff --git a/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java
new file mode 100644
index 00000000000..fcb6cb6a48d
--- /dev/null
+++ b/vespaclient-core/src/test/java/com/yahoo/feedapi/FeederOptionsTestCase.java
@@ -0,0 +1,91 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class FeederOptionsTestCase {
+
+ @Test
+ public void testEqualsAndHashCode() {
+ FeederOptions f1 = new FeederOptions();
+ FeederOptions f2 = new FeederOptions();
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setAbortOnDocumentError(false);
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f1.setAbortOnDocumentError(true);
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setRoutingConfigId("blabla");
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f2.setRoutingConfigId("blabla");
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setRetryDelay(5000);
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f2.setRetryDelay(5000);
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setRoute("all roads lead to rome");
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f2.setRoute("all roads lead to trondheim");
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f2.setRoute("all roads lead to rome");
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setTraceLevel(0);
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f2.setTraceLevel(0);
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f1.setTraceLevel(5);
+ assertFalse(f1.equals(f2));
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.hashCode() == f2.hashCode());
+
+ f2.setTraceLevel(5);
+ assertTrue(f1.equals(f2));
+ assertTrue(f2.equals(f1));
+ assertTrue(f1.hashCode() == f2.hashCode());
+
+ f2.setMaxFeedRate(34.0);
+ assertFalse(f2.equals(f1));
+ assertFalse(f1.equals(f2));
+ assertTrue(f1.hashCode() != f2.hashCode());
+ }
+
+}
diff --git a/vespaclient-core/src/test/java/com/yahoo/vespafeeder/.gitignore b/vespaclient-core/src/test/java/com/yahoo/vespafeeder/.gitignore
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/vespaclient-core/src/test/java/com/yahoo/vespafeeder/.gitignore