summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java114
1 files changed, 114 insertions, 0 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
new file mode 100755
index 00000000000..a9a08562c9d
--- /dev/null
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
@@ -0,0 +1,114 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.feedapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.messagebus.Message;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Simplifies sending messages belonging to a single result callback. */
+public class SingleSender implements SimpleFeedAccess {
+
+ private final SharedSender.ResultCallback owner;
+ private final SharedSender sender;
+ private final List<MessageProcessor> messageProcessors = new ArrayList<>();
+ private boolean blockingQueue;
+
+ public SingleSender(SharedSender.ResultCallback owner, SharedSender sender, boolean blockingQueue) {
+ this.owner = owner;
+ this.sender = sender;
+ this.blockingQueue = blockingQueue;
+ }
+
+ public SingleSender(SharedSender.ResultCallback owner, SharedSender sender) {
+ this(owner, sender, true);
+ }
+
+ @Override
+ public void put(Document doc) {
+ send(new PutDocumentMessage(new DocumentPut(doc)));
+ }
+
+ @Override
+ public void remove(DocumentId docId) {
+ send(new RemoveDocumentMessage(docId));
+ }
+
+ @Override
+ public void update(DocumentUpdate update) {
+ send(new UpdateDocumentMessage(update));
+ }
+
+ @Override
+ public void put(Document doc, TestAndSetCondition condition) {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(doc));
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public void remove(DocumentId docId, TestAndSetCondition condition) {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(docId);
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, TestAndSetCondition condition) {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(update);
+ message.setCondition(condition);
+ send(message);
+ }
+
+ @Override
+ public boolean isAborted() {
+ return owner.isAborted();
+ }
+
+ public void addMessageProcessor(MessageProcessor processor) {
+ messageProcessors.add(processor);
+ }
+
+ // Runs all message processors on the message and returns it.
+ private Message processMessage(Message m) {
+ for (MessageProcessor processor : messageProcessors) {
+ processor.process(m);
+ }
+ return m;
+ }
+
+ public void send(Message m) {
+ send(m, -1);
+ }
+
+ /**
+ * Sends the given message, allowing a maximum of maxPending messages to be
+ * sent for this sender.
+ *
+ * @param m The message to send
+ * @param maxPending The number of pending messages to block on for this sender.
+ */
+ public void send(Message m, int maxPending) {
+ sender.send(processMessage(m), owner, maxPending, blockingQueue);
+ }
+
+ public void done() {
+ // empty
+ }
+
+ public void waitForPending() {
+ waitForPending(-1);
+ }
+
+ public boolean waitForPending(long timeoutMs) {
+ return sender.waitForPending(owner, timeoutMs);
+ }
+
+}