aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java20
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java183
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java85
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java195
-rw-r--r--documentapi/src/tests/messagebus/messagebus_test.cpp8
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp367
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp5
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp137
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h52
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp63
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h30
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp14
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h8
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java4
-rw-r--r--messagebus/src/vespa/messagebus/errorcode.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/errorcode.h3
-rw-r--r--messagebus_test/src/tests/errorcodes/DumpCodes.java1
-rw-r--r--messagebus_test/src/tests/errorcodes/dumpcodes.cpp1
-rw-r--r--messagebus_test/src/tests/errorcodes/ref-dump.txt1
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/returncode.cpp1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java1
24 files changed, 73 insertions, 1113 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index c401fc16a2f..e1e501d3e1b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -291,8 +291,6 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg));
putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory());
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
- putRoutingPolicyFactory("SearchColumn", new RoutingPolicyFactories.SearchColumnPolicyFactory());
- putRoutingPolicyFactory("SearchRow", new RoutingPolicyFactories.SearchRowPolicyFactory());
putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
index acd73a21a8a..6c13b7468c7 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
@@ -116,26 +116,6 @@ public abstract class RoutingPolicyFactories {
}
}
- static class SearchColumnPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchColumnPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class SearchRowPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchRowPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
static class SubsetServicePolicyFactory implements RoutingPolicyFactory {
public DocumentProtocolRoutingPolicy createPolicy(String param) {
return new SubsetServicePolicy(param);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java
deleted file mode 100644
index aabb6407d14..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java
+++ /dev/null
@@ -1,183 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-import com.yahoo.document.BucketIdFactory;
-import com.yahoo.document.DocumentId;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-import com.yahoo.vdslib.BucketDistribution;
-
-import java.util.*;
-import java.util.logging.Logger;
-
-/**
- * <p>This policy implements the logic to select recipients for a single search column. It has 2 different modes of
- * operation;</p>
- *
- * <ol>
- * <li>If the "maxbadparts" parameter is 0, select recipient based on document id hash and use
- * shared merge logic. Do not allow any out-of-service replies.</li>
- * <li>Else do best-effort validation of system
- * state. This means;
- * <ol>
- * <li>if the message is sending to all recipients (typicall start- and
- * end-of-feed), allow at most "maxbadparts" out-of-service replies,</li>
- * <li>else always allow out-of-service reply by masking it with an empty
- * reply.</li>
- * </ol>
- * </li>
- * </ol>
- * <p>For systems that allow bad parts, one will not know whether or not feeding
- * was a success until the RTX attempts to set the new index live, because it is
- * only the RTX that is now able to verify that the service level requirements
- * are met. Feeding will still break if a message that was supposed to be sent
- * to all recipients receives more than "maxbadparts" out-of-service replies,
- * according to (2.a) above.</p>
- *
- * @author Simon Thoresen
- */
-public class SearchColumnPolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SearchColumnPolicy.class.getName());
- private BucketIdFactory factory = new BucketIdFactory();
- private Map<Integer, BucketDistribution> distributions = new HashMap<Integer, BucketDistribution>();
- private int maxOOS = 0; // The maximum OUT_OF_SERVICE replies to hide.
-
- public static final int DEFAULT_NUM_BUCKET_BITS = 16;
-
- /**
- * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a
- * request to not allow any bad columns.
- *
- * @param param The maximum number of allowed bad columns.
- */
- public SearchColumnPolicy(String param) {
- if (param != null && param.length() > 0) {
- try {
- maxOOS = Integer.parseInt(param);
- } catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (maxOOS < 0) {
- log.log(LogLevel.WARNING, "Ignoring a request to set the maximum number of OOS replies to " + maxOOS +
- " because it makes no sense. This routing policy will not allow any recipient" +
- " to be out of service.");
- }
- }
- }
-
- @Override
- public void select(RoutingContext context) {
- List<Route> recipients = context.getMatchedRecipients();
- if (recipients == null || recipients.size() == 0) {
- return;
- }
- DocumentId id = null;
- BucketId bucketId = null;
- Message msg = context.getMessage();
- switch (msg.getType()) {
-
- case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- id = ((PutDocumentMessage)msg).getDocumentPut().getDocument().getId();
- break;
-
- case DocumentProtocol.MESSAGE_GETDOCUMENT:
- id = ((GetDocumentMessage)msg).getDocumentId();
- break;
-
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- id = ((RemoveDocumentMessage)msg).getDocumentId();
- break;
-
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- id = ((UpdateDocumentMessage)msg).getDocumentUpdate().getId();
- break;
-
- case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE:
- bucketId = ((BatchDocumentUpdateMessage)msg).getBucketId();
- break;
-
- case DocumentProtocol.MESSAGE_GETBUCKETSTATE:
- bucketId = ((GetBucketStateMessage)msg).getBucketId();
- break;
-
- default:
- throw new UnsupportedOperationException("Message type '" + msg.getType() + "' not supported.");
- }
- if (bucketId == null && id != null) {
- bucketId = factory.getBucketId(id);
- }
- int recipient = getRecipient(bucketId, recipients.size());
- context.addChild(recipients.get(recipient));
- context.setSelectOnRetry(true);
- if (maxOOS > 0) {
- context.addConsumableError(ErrorCode.SERVICE_OOS);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- if (maxOOS > 0) {
- if (context.getNumChildren() > 1) {
- Set<Integer> oosReplies = new HashSet<Integer>();
- int idx = 0;
- for (RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- Reply ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- oosReplies.add(idx);
- }
- ++idx;
- }
- if (oosReplies.size() <= maxOOS) {
- DocumentProtocol.merge(context, oosReplies);
- return; // may the rtx be with you
- }
- } else {
- Reply ref = context.getChildIterator().getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- context.setReply(new EmptyReply());
- return; // god help us all
- }
- }
- }
- DocumentProtocol.merge(context);
- }
-
- /**
- * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it
- * needs to be synchronized.
- *
- * @param bucketId The bucket whose recipient to return.
- * @param numRecipients The number of recipients being distributed to.
- * @return The recipient to use.
- */
- private synchronized int getRecipient(BucketId bucketId, int numRecipients) {
- BucketDistribution distribution = distributions.get(numRecipients);
- if (distribution == null) {
- distribution = new BucketDistribution(1, DEFAULT_NUM_BUCKET_BITS);
- distribution.setNumColumns(numRecipients);
- distributions.put(numRecipients, distribution);
- }
- return distribution.getColumn(bucketId);
- }
-
- @Override
- public void destroy() {
- // empty
- }
-
- @Override
- public MetricSet getMetrics() {
- return null;
- }
-}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java
deleted file mode 100755
index 4d94107eae7..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SearchRowPolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SearchRowPolicy.class.getName());
- private int minOk = 0; // Hide OUT_OF_SERVICE as long as this number of replies are something else.
-
- /**
- * Creates a search row policy that wraps the underlying search group policy in case the parameter is something
- * other than an empty string.
- *
- * @param param The number of minimum non-OOS replies that this policy requires.
- */
- public SearchRowPolicy(String param) {
- if (param != null && param.length() > 0) {
- try {
- minOk = Integer.parseInt(param);
- }
- catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (minOk <= 0) {
- log.log(LogLevel.WARNING, "Ignoring a request to set the minimum number of OK replies to " + minOk + " " +
- "because it makes no sense. This routing policy will not allow any recipient " +
- "to be out of service.");
- }
- }
- }
-
- @Override
- public void select(RoutingContext context) {
- context.addChildren(context.getMatchedRecipients());
- context.setSelectOnRetry(false);
- if (minOk > 0) {
- context.addConsumableError(ErrorCode.SERVICE_OOS);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- if (minOk > 0) {
- Set<Integer> oosReplies = new HashSet<Integer>();
- int idx = 0;
- for (RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- Reply ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- oosReplies.add(idx);
- }
- ++idx;
- }
- if (context.getNumChildren() - oosReplies.size() >= minOk) {
- DocumentProtocol.merge(context, oosReplies);
- return;
- }
- }
- DocumentProtocol.merge(context);
- }
-
- @Override
- public void destroy() {
- // empty
- }
-
- @Override
- public MetricSet getMetrics() {
- return null;
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
index ff237d46b90..fba2edf5cfd 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
@@ -12,8 +12,6 @@ import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import com.yahoo.messagebus.routing.*;
import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.vdslib.DocumentList;
-import com.yahoo.vdslib.Entry;
import org.junit.Before;
import org.junit.Test;
@@ -60,12 +58,6 @@ public class PolicyTestCase {
policy = new DocumentProtocol(manager).createPolicy("RoundRobin", null);
assertTrue(policy instanceof RoundRobinPolicy);
- policy = new DocumentProtocol(manager).createPolicy("SearchRow", null);
- assertTrue(policy instanceof SearchRowPolicy);
-
- policy = new DocumentProtocol(manager).createPolicy("SearchColumn", null);
- assertTrue(policy instanceof SearchColumnPolicy);
-
policy = new DocumentProtocol(manager).createPolicy("SubsetService", null);
assertTrue(policy instanceof SubsetServicePolicy);
@@ -330,125 +322,6 @@ public class PolicyTestCase {
}
@Test
- public void testSearchRow() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:")))));
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo"));
- frame.assertMergeOneReply("foo");
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar"));
- frame.assertMergeTwoReplies("foo", "bar");
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo"));
- Map<String, Integer> replies = new HashMap<>();
- replies.put("foo", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar", "baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies,
- Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:2]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar", "baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies,
- Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.destroy();
- }
-
- @Test
- public void testSearchRowMerge() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo"));
- tryWasFound(frame, 1, 0x0, false);
- tryWasFound(frame, 1, 0x1, true);
-
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar"));
- tryWasFound(frame, 2, 0x0, false);
- tryWasFound(frame, 2, 0x1, true);
- tryWasFound(frame, 2, 0x2, true);
- tryWasFound(frame, 2, 0x3, true);
-
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- tryWasFound(frame, 3, 0x0, false);
- tryWasFound(frame, 3, 0x1, true);
- tryWasFound(frame, 3, 0x2, true);
- tryWasFound(frame, 3, 0x3, true);
- tryWasFound(frame, 3, 0x4, true);
- tryWasFound(frame, 3, 0x5, true);
- tryWasFound(frame, 3, 0x6, true);
- tryWasFound(frame, 3, 0x7, true);
- frame.destroy();
- }
-
- private void tryWasFound(PolicyTestFrame frame, int expectedRecipients,
- int foundMask, boolean expectedFound)
- {
- {
- frame.setMessage(new RemoveDocumentMessage(new DocumentId("doc:scheme:69")));
- List<RoutingNode> selected = frame.select(expectedRecipients);
- for (int i = 0, len = selected.size(); i < len; ++i) {
- RemoveDocumentReply reply = new RemoveDocumentReply();
- reply.setWasFound(((1 << i) & foundMask) != 0);
- selected.get(i).handleReply(reply);
- }
- Reply reply = frame.getReceptor().getReply(TIMEOUT);
- assertNotNull(reply);
- assertEquals(DocumentProtocol.REPLY_REMOVEDOCUMENT, reply.getType());
- assertEquals(expectedFound, ((RemoveDocumentReply)reply).wasFound());
- }
- {
- DocumentUpdate upd = new DocumentUpdate(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:"));
- frame.setMessage(new UpdateDocumentMessage(upd));
- List<RoutingNode> selected = frame.select(expectedRecipients);
- for (int i = 0, len = selected.size(); i < len; ++i) {
- UpdateDocumentReply reply = new UpdateDocumentReply();
- reply.setWasFound(((1 << i) & foundMask) != 0);
- selected.get(i).handleReply(reply);
- }
- Reply reply = frame.getReceptor().getReply(TIMEOUT);
- assertNotNull(reply);
- assertEquals(DocumentProtocol.REPLY_UPDATEDOCUMENT, reply.getType());
- assertEquals(expectedFound, ((UpdateDocumentReply)reply).wasFound());
- }
- }
-
- @Test
public void multipleGetRepliesAreMergedToFoundDocument() {
PolicyTestFrame frame = new PolicyTestFrame(manager);
frame.setHop(new HopSpec("test", getDocumentRouteSelectorRawConfig())
@@ -483,74 +356,6 @@ public class PolicyTestCase {
"route[1].feed \"myfeed\"\n]";
}
- @Test
- public void testSearchColumn() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setHop(new HopSpec("test", "[SearchColumn]")
- .addRecipient("c0").addRecipient("c1")
- .addRecipient("c2").addRecipient("c3"));
-
- // Test hash distribution.
- assertDistribution(frame, "doc:ns:3", "c0");
- assertDistribution(frame, "doc:ns:18", "c1");
- assertDistribution(frame, "doc:ns:0", "c2");
- assertDistribution(frame, "doc:ns:4", "c3");
-
- assertDistribution(frame, "userdoc:ns:49152:0", "c0");
- assertDistribution(frame, "userdoc:ns:49152:1", "c0");
- assertDistribution(frame, "userdoc:ns:16384:2", "c1");
- assertDistribution(frame, "userdoc:ns:16384:3", "c1");
- assertDistribution(frame, "userdoc:ns:5461:4", "c2");
- assertDistribution(frame, "userdoc:ns:5461:5", "c2");
- assertDistribution(frame, "userdoc:ns:0:6", "c3");
- assertDistribution(frame, "userdoc:ns:0:7", "c3");
-
- assertDistribution(frame, "groupdoc:ns:0:0", "c0");
- assertDistribution(frame, "groupdoc:ns:0:1", "c0");
- assertDistribution(frame, "groupdoc:ns:4:2", "c1");
- assertDistribution(frame, "groupdoc:ns:4:3", "c1");
- assertDistribution(frame, "groupdoc:ns:2:4", "c2");
- assertDistribution(frame, "groupdoc:ns:2:5", "c2");
- assertDistribution(frame, "groupdoc:ns:7:6", "c3");
- assertDistribution(frame, "groupdoc:ns:7:7", "c3");
-
- // Test routing based on message type.
- Message put = new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:"))));
- frame.setHop(new HopSpec("test", "[SearchColumn]").addRecipient("c0").addRecipient("c1"));
- frame.setMessage(put);
- frame.assertMergeOneReply("c0");
-
- // Test allowed bad parts.
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0"));
- frame.setMessage(put);
- Map<String, Integer> replies = new HashMap<>();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1"));
- frame.setMessage(put);
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1").addRecipient("c2"));
- frame.setMessage(put);
- replies.clear();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:2]").addRecipient("c0").addRecipient("c1").addRecipient("c2"));
- frame.setMessage(put);
- replies.clear();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.destroy();
- }
-
private void assertDistribution(PolicyTestFrame frame, String id, String expected) {
frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
new DocumentId(id)))));
diff --git a/documentapi/src/tests/messagebus/messagebus_test.cpp b/documentapi/src/tests/messagebus/messagebus_test.cpp
index d8920b0577b..919dc6b6570 100644
--- a/documentapi/src/tests/messagebus/messagebus_test.cpp
+++ b/documentapi/src/tests/messagebus/messagebus_test.cpp
@@ -92,13 +92,7 @@ void Test::testProtocol() {
DocumentProtocol protocol(set, _repo);
EXPECT_TRUE(protocol.getName() == "document");
- IRoutingPolicy::UP policy = protocol.createPolicy(string("SearchRow"),string(""));
- EXPECT_TRUE(policy.get() != NULL);
-
- policy = protocol.createPolicy(string("SearchColumn"),string(""));
- EXPECT_TRUE(policy.get() != NULL);
-
- policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg"));
+ IRoutingPolicy::UP policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg"));
EXPECT_TRUE(policy.get() != NULL);
policy = protocol.createPolicy(string(""),string(""));
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index 9d38247ebfd..58db3079631 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -10,8 +10,6 @@
#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h>
#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
#include <vespa/messagebus/emptyreply.h>
@@ -39,6 +37,8 @@ using document::readDocumenttypesConfig;
using slobrok::api::IMirrorAPI;
using namespace documentapi;
using vespalib::make_string;
+using std::make_unique;
+using std::make_shared;
class Test : public vespalib::TestApp {
private:
@@ -48,16 +48,12 @@ private:
private:
bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected);
- bool tryDistribution(TestFrame &frame, const string &id, const string &expected);
- void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound);
- void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern,
- int32_t numEntries = -1);
+ void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1);
StoragePolicy &setupStoragePolicy(TestFrame &frame, const string &param,
const string &pattern = "", int32_t numEntries = -1);
bool isErrorPolicy(const string &name, const string &param);
void assertMirrorReady(const IMirrorAPI &mirror);
- void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern,
- uint32_t numEntries);
+ void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, uint32_t numEntries);
mbus::Message::UP newPutDocumentMessage(const string &documentId);
public:
@@ -75,9 +71,6 @@ public:
void testProtocol();
void testRoundRobin();
void testRoundRobinCache();
- void testSearchColumn();
- void testSearchRow();
- void testSearchRowMerge();
void multipleGetRepliesAreMergedToFoundDocument();
void testSubsetService();
void testSubsetServiceCache();
@@ -117,9 +110,6 @@ Test::Main() {
testLocalServiceCache(); TEST_FLUSH();
testRoundRobin(); TEST_FLUSH();
testRoundRobinCache(); TEST_FLUSH();
- testSearchColumn(); TEST_FLUSH();
- testSearchRow(); TEST_FLUSH();
- testSearchRowMerge(); TEST_FLUSH();
testSubsetService(); TEST_FLUSH();
testSubsetServiceCache(); TEST_FLUSH();
@@ -144,43 +134,34 @@ Test::testProtocol()
mbus::IProtocol::SP protocol(new DocumentProtocol(_loadTypes, _repo));
mbus::IRoutingPolicy::UP policy = protocol->createPolicy("AND", "");
- ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("DocumentRouteSelector", "raw:route[0]\n");
- ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("Extern", "foo;bar/baz");
- ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("LoadBalancer",
"cluster=docproc/cluster.default;"
"session=chain.default;syncinit");
- ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("LocalService", "");
- ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("RoundRobin", "");
- ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != NULL);
-
- policy = protocol->createPolicy("SearchRow", "");
- ASSERT_TRUE(dynamic_cast<SearchRowPolicy*>(policy.get()) != NULL);
-
- policy = protocol->createPolicy("SearchColumn", "");
- ASSERT_TRUE(dynamic_cast<SearchColumnPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("SubsetService", "");
- ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != nullptr);
}
void
Test::testAND()
{
TestFrame frame(_repo);
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(
- document::Document::SP(
- new document::Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
frame.setHop(mbus::HopSpec("test", "[AND]")
.addRecipient("foo")
.addRecipient("bar"));
@@ -242,7 +223,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(servers.size(), lst.size());
for (uint32_t i = 0; i < servers.size(); ++i) {
@@ -267,15 +248,14 @@ mbus::Message::UP
Test::newPutDocumentMessage(const string &documentId)
{
Document::SP doc(new Document(*_docType, DocumentId(documentId)));
- return mbus::Message::UP(new PutDocumentMessage(doc));
+ return make_unique<PutDocumentMessage>(doc);
}
void
Test::setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern,
int32_t numEntries)
{
- string param = vespalib::make_string("tcp/localhost:%d;%s",
- slobrok.port(), pattern.c_str());
+ string param = vespalib::make_string("tcp/localhost:%d;%s", slobrok.port(), pattern.c_str());
frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Extern:%s]", param.c_str())));
mbus::MessageBus &mbus = frame.getMessageBus();
const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test");
@@ -340,7 +320,7 @@ Test::testExternSend()
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
// Send message from local node to remote cluster and resolve route there.
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->getTrace().setLevel(9);
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port())));
@@ -376,7 +356,7 @@ Test::testExternMultipleSlobroks()
std::make_shared<DocumentProtocol>(_loadTypes, _repo));
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
ASSERT_TRUE((msg = dr.getMessage(600)));
@@ -392,7 +372,7 @@ Test::testExternMultipleSlobroks()
std::make_shared<DocumentProtocol>(_loadTypes, _repo));
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
ASSERT_TRUE((msg = dr.getMessage(600)));
@@ -407,8 +387,7 @@ Test::testLocalService()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType, DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test select with proper address.
for (uint32_t i = 0; i < 10; ++i) {
@@ -424,7 +403,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(10u, lst.size());
@@ -437,7 +416,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(1u, lst.size());
EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin());
@@ -452,12 +431,12 @@ Test::testLocalServiceCache()
{
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "docproc/cluster.default/[LocalService]/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:foo")));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("test", "docproc/cluster.default/[LocalService]/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
@@ -480,8 +459,8 @@ Test::testLocalServiceCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL);
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(barFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
}
void
@@ -489,9 +468,7 @@ Test::testRoundRobin()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test select with proper address.
for (uint32_t i = 0; i < 10; ++i) {
@@ -530,13 +507,13 @@ Test::testRoundRobinCache()
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "[RoundRobin]");
fooHop.addRecipient("docproc/cluster.default/0/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo"))));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("bar", "[RoundRobin]");
barHop.addRecipient("docproc/cluster.default/0/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
@@ -559,155 +536,8 @@ Test::testRoundRobinCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL);
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL);
-}
-
-void
-Test::testSearchRow()
-{
- TestFrame frame(_repo);
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo"));
- EXPECT_TRUE(frame.testMergeOneReply("foo"));
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar"));
- EXPECT_TRUE(frame.testMergeTwoReplies("foo", "bar"));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo"));
- TestFrame::ReplyMap replies;
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList().add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo")
- .addRecipient("bar"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:2]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-}
-
-void
-Test::testSearchRowMerge()
-{
- TestFrame frame(_repo);
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo"));
- tryWasFound(frame, 1, 0x0, false);
- tryWasFound(frame, 1, 0x1, true);
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar"));
- tryWasFound(frame, 2, 0x0, false);
- tryWasFound(frame, 2, 0x1, true);
- tryWasFound(frame, 2, 0x2, true);
- tryWasFound(frame, 2, 0x3, true);
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- tryWasFound(frame, 3, 0x0, false);
- tryWasFound(frame, 3, 0x1, true);
- tryWasFound(frame, 3, 0x2, true);
- tryWasFound(frame, 3, 0x3, true);
- tryWasFound(frame, 3, 0x4, true);
- tryWasFound(frame, 3, 0x5, true);
- tryWasFound(frame, 3, 0x6, true);
- tryWasFound(frame, 3, 0x7, true);
-}
-
-void
-Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients,
- uint32_t foundMask, bool expectedFound)
-{
- {
- frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:69"))));
- std::vector<mbus::RoutingNode*> selected;
- EXPECT_TRUE(frame.select(selected, expectedRecipients));
- for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- mbus::Reply::UP reply(new RemoveDocumentReply());
- static_cast<RemoveDocumentReply&>(*reply).setWasFound((1 << i) & foundMask);
- selected[i]->handleReply(std::move(reply));
- }
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_REMOVEDOCUMENT, reply->getType());
- EXPECT_EQUAL(expectedFound, static_cast<RemoveDocumentReply&>(*reply).wasFound());
- }
- {
- DocumentUpdate::SP upd(new DocumentUpdate(*_docType, DocumentId("doc:scheme:")));
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(upd)));
- std::vector<mbus::RoutingNode*> selected;
- EXPECT_TRUE(frame.select(selected, expectedRecipients));
- for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- mbus::Reply::UP reply(new UpdateDocumentReply());
- static_cast<UpdateDocumentReply&>(*reply).setWasFound((1 << i) & foundMask);
- selected[i]->handleReply(std::move(reply));
- }
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_UPDATEDOCUMENT, reply->getType());
- EXPECT_EQUAL(expectedFound, static_cast<UpdateDocumentReply&>(*reply).wasFound());
- }
+ ASSERT_TRUE(barFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
}
void
@@ -724,11 +554,11 @@ Test::multipleGetRepliesAreMergedToFoundDocument()
"route[1].feed \"myfeed\"\n]")
.addRecipient("foo")
.addRecipient("bar"));
- frame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:yarn"))));
+ frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:yarn")));
std::vector<mbus::RoutingNode*> selected;
EXPECT_TRUE(frame.select(selected, 2));
for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- document::Document::SP doc;
+ Document::SP doc;
if (i == 0) {
doc.reset(new Document(*_docType, DocumentId("doc:scheme:yarn")));
doc->setLastModified(123456ULL);
@@ -737,62 +567,12 @@ Test::multipleGetRepliesAreMergedToFoundDocument()
selected[i]->handleReply(std::move(reply));
}
mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT),
- reply->getType());
+ EXPECT_TRUE(reply);
+ EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType());
EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified());
}
void
-Test::testSearchColumn()
-{
- TestFrame frame(_repo);
- frame.setHop(mbus::HopSpec("test", "[SearchColumn]")
- .addRecipient("c0")
- .addRecipient("c1")
- .addRecipient("c2")
- .addRecipient("c3"));
-
- // Test hash distribution.
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:3", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:18", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:0", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:4", "c3"));
-
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:0", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:1", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:2", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:3", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:4", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:5", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:6", "c3"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:7", "c3"));
-
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:0", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:1", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:2", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:3", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:4", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:5", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:6", "c3"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:7", "c3"));
-
- // Test routing based on message type.
- mbus::Message::UP put(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:")))));
-}
-
-bool
-Test::tryDistribution(TestFrame &frame, const string &id, const string &expected)
-{
- Document::SP doc(new Document(*_docType, DocumentId(id)));
- mbus::Message::UP msg(new PutDocumentMessage(doc));
- frame.setMessage(std::move(msg));
- return frame.testSelect(StringList().add(expected));
-}
-
-void
Test::testDocumentRouteSelector()
{
// Test policy with usage safeguard.
@@ -804,13 +584,13 @@ Test::testDocumentRouteSelector()
"route[0].feed \"baz\"\n";
{
DocumentProtocol protocol(_loadTypes, _repo, okConfig);
- EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL);
- EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != NULL);
+ EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr);
+ EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != nullptr);
}
{
DocumentProtocol protocol(_loadTypes, _repo, errConfig);
- EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL);
- EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != NULL);
+ EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr);
+ EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != nullptr);
}
// Test policy with proper config.
@@ -826,19 +606,17 @@ Test::testDocumentRouteSelector()
.addRecipient("foo")
.addRecipient("bar"));
- frame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)));
+ frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:"), 0));
EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar")));
- mbus::Message::UP put(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:")))));
+ mbus::Message::UP put = make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")));
frame.setMessage(std::move(put));
EXPECT_TRUE(frame.testSelect( StringList().add("foo")));
{
vdslib::OperationList opList;
- document::DocumentId id("doc:scheme:");
+ DocumentId id("doc:scheme:");
Document::UP doc(new Document(*_docType, id));
opList.addPut(std::move(doc));
@@ -849,24 +627,20 @@ Test::testDocumentRouteSelector()
{
vdslib::OperationList opList;
- document::DocumentId id("doc:scheme:");
+ DocumentId id("doc:scheme:");
Document::UP doc(new Document(*_repo->getDocumentType("other"), id));
opList.addPut(std::move(doc));
document::BucketIdFactory factory;
- put = frame.setMessage(MultiOperationMessage::create(_repo,
- factory.getBucketId(id), opList));
+ put = frame.setMessage(MultiOperationMessage::create(_repo, factory.getBucketId(id), opList));
EXPECT_TRUE(frame.testSelect(StringList().add("bar")));
}
- frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(document::DocumentId("doc:scheme:"))));
+ frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar")));
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(
- document::DocumentUpdate::SP(
- new document::DocumentUpdate(
- *_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<UpdateDocumentMessage>(
+ make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("foo")));
frame.setMessage(std::move(put));
@@ -884,22 +658,17 @@ Test::testDocumentRouteSelectorIgnore()
"route[0].feed \"myfeed\"\n]")
.addRecipient("docproc/cluster.foo"));
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(
- document::Document::SP(
- new document::Document(*_docType,
- DocumentId("id:yarn:testdoc:n=1234:fluff"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(
+ make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff"))));
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 0));
mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType());
EXPECT_EQUAL(0u, reply->getNumErrors());
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(
- document::DocumentUpdate::SP(
- new document::DocumentUpdate(
- *_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<UpdateDocumentMessage>(
+ make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("docproc/cluster.foo")));
}
@@ -999,7 +768,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
std::set<string> lst;
for (uint32_t i = 0; i < 666; i++) {
@@ -1022,10 +791,8 @@ Test::setupStoragePolicy(TestFrame &frame, const string &param,
mbus::MessageBus &mbus = frame.getMessageBus();
const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test");
const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0));
- StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(
- DocumentProtocol::NAME,
- dir.getName(),
- dir.getParam()));
+ StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME,
+ dir.getName(), dir.getParam()));
policy.initSynchronous();
assertMirrorReady(*policy.getMirror());
if (numEntries >= 0) {
@@ -1046,7 +813,7 @@ Test::requireThatStoragePolicyIsTargetedWithState()
mbus::TestServer *srv = new mbus::TestServer(
mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)),
mbus::RoutingSpec(), slobrok,
- mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo)));
+ make_shared<DocumentProtocol>(_loadTypes, _repo));
servers.push_back(srv);
srv->net.registerSession("default");
}
@@ -1056,12 +823,12 @@ Test::requireThatStoragePolicyIsTargetedWithState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:5 storage:5")));
- ASSERT_TRUE(policy.getSystemState() != NULL);
+ ASSERT_TRUE(policy.getSystemState() != nullptr);
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5");
}
std::set<string> lst;
@@ -1086,7 +853,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
mbus::Slobrok slobrok;
mbus::TestServer server(mbus::Identity("storage/cluster.mycluster/distributor/0"),
mbus::RoutingSpec(), slobrok,
- mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo)));
+ make_shared<DocumentProtocol>(_loadTypes, _repo));
server.net.registerSession("default");
string param = vespalib::make_string(
@@ -1095,12 +862,12 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 1);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:99 storage:99")));
- ASSERT_TRUE(policy.getSystemState() != NULL);
+ ASSERT_TRUE(policy.getSystemState() != nullptr);
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99");
}
for (int i = 0; i < 666; i++) {
@@ -1113,9 +880,7 @@ Test::testSubsetService()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test requerying for adding nodes.
frame.setHop(mbus::HopSpec("test", "docproc/cluster.default/[SubsetService:2]/chain.default"));
@@ -1128,7 +893,7 @@ Test::testSubsetService()
ASSERT_TRUE(frame.select(leaf, 1));
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
ASSERT_TRUE(lst.size() > 1); // must have requeried
@@ -1147,7 +912,7 @@ Test::testSubsetService()
prev = next;
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
// Test requerying for dropping nodes.
@@ -1164,7 +929,7 @@ Test::testSubsetService()
mbus::Reply::UP reply(new mbus::EmptyReply());
reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route));
leaf[0]->handleReply(std::move(reply));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(10u, lst.size());
@@ -1178,12 +943,12 @@ Test::testSubsetServiceCache()
{
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "docproc/cluster.default/[SubsetService:2]/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo"))));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("bar", "docproc/cluster.default/[SubsetService:2]/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index 95115d1c3c1..a1837b6bcd0 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -41,8 +41,6 @@ DocumentProtocol::DocumentProtocol(const LoadTypeSet& loadTypes,
putRoutingPolicyFactory("Extern", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::ExternPolicyFactory()));
putRoutingPolicyFactory("LocalService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LocalServicePolicyFactory()));
putRoutingPolicyFactory("RoundRobin", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::RoundRobinPolicyFactory()));
- putRoutingPolicyFactory("SearchColumn", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchColumnPolicyFactory()));
- putRoutingPolicyFactory("SearchRow", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchRowPolicyFactory()));
putRoutingPolicyFactory("Storage", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::StoragePolicyFactory()));
putRoutingPolicyFactory("SubsetService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SubsetServicePolicyFactory()));
putRoutingPolicyFactory("LoadBalancer", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LoadBalancerPolicyFactory()));
@@ -134,8 +132,7 @@ DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable
std::ostringstream message;
document::StringUtil::printAsHex(
message, blob.data(), blob.size());
- LOG(spam, "Encoded message of protocol %s type %u using version "
- "%s serialization:\n%s",
+ LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s",
routable.getProtocol().c_str(), routable.getType(),
version.toString().c_str(), message.str().c_str());
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
index f1a691bc46d..26d51e702e9 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
@@ -11,8 +11,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT
externpolicy.cpp
localservicepolicy.cpp
roundrobinpolicy.cpp
- searchcolumnpolicy.cpp
- searchrowpolicy.cpp
subsetservicepolicy.cpp
loadbalancer.cpp
loadbalancerpolicy.cpp
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp
deleted file mode 100644
index 38610aca551..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "searchcolumnpolicy.h"
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/batchdocumentupdatemessage.h>
-#include <vespa/documentapi/messagebus/messages/multioperationmessage.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/util/hashmap.h>
-#include <vespa/log/log.h>
-LOG_SETUP(".searchcolumnpolicy");
-
-namespace documentapi {
-
-SearchColumnPolicy::SearchColumnPolicy(const string &param) :
- _lock(),
- _factory(),
- _distributions(),
- _maxOOS(0)
-{
- if (param.length() > 0) {
- int maxOOS = atoi(param.c_str());
- if (maxOOS >= 0) {
- _maxOOS = (uint32_t)maxOOS;
- } else {
- LOG(warning,
- "Ignoring a request to set the maximum number of OOS replies to %d because it makes no "
- "sense. This routing policy will not allow any recipient to be out of service.", maxOOS);
- }
- }
-}
-
-SearchColumnPolicy::~SearchColumnPolicy()
-{
- // empty
-}
-
-void
-SearchColumnPolicy::select(mbus::RoutingContext &context)
-{
- std::vector<mbus::Route> recipients;
- context.getMatchedRecipients(recipients);
- if (recipients.empty()) {
- return;
- }
- const document::DocumentId *id = NULL;
- document::BucketId bucketId;
-
- const mbus::Message &msg = context.getMessage();
- switch(msg.getType()) {
- case DocumentProtocol::MESSAGE_PUTDOCUMENT:
- id = &static_cast<const PutDocumentMessage&>(msg).getDocument().getId();
- break;
-
- case DocumentProtocol::MESSAGE_GETDOCUMENT:
- id = &static_cast<const GetDocumentMessage&>(msg).getDocumentId();
- break;
-
- case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- id = &static_cast<const RemoveDocumentMessage&>(msg).getDocumentId();
- break;
-
- case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
- id = &static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId();
- break;
-
- case DocumentProtocol::MESSAGE_MULTIOPERATION:
- bucketId = (static_cast<const MultiOperationMessage&>(msg)).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE:
- bucketId = (static_cast<const BatchDocumentUpdateMessage&>(msg)).getBucketId();
- break;
-
- default:
- LOG(error, "Message type '%d' not supported.", msg.getType());
- return;
- }
- if (bucketId.getRawId() == 0) {
- bucketId = _factory.getBucketId(*id);
- }
- uint32_t recipient = getRecipient(bucketId, recipients.size());
- context.addChild(recipients[recipient]);
- context.setSelectOnRetry(true);
- if (_maxOOS > 0) {
- context.addConsumableError(mbus::ErrorCode::SERVICE_OOS);
- }
-}
-
-void
-SearchColumnPolicy::merge(mbus::RoutingContext &context)
-{
- if (_maxOOS > 0) {
- if (context.getNumChildren() > 1) {
- std::set<uint32_t> oosReplies;
- uint32_t idx = 0;
- for (mbus::RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- const mbus::Reply &ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- oosReplies.insert(idx);
- }
- ++idx;
- }
- if (oosReplies.size() <= _maxOOS) {
- DocumentProtocol::merge(context, oosReplies);
- return; // may the rtx be with you
- }
- } else {
- const mbus::Reply &ref = context.getChildIterator().getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- context.setReply(mbus::Reply::UP(new mbus::EmptyReply()));
- return; // god help us all
- }
- }
- }
- DocumentProtocol::merge(context);
-}
-
-uint32_t
-SearchColumnPolicy::getRecipient(const document::BucketId &bucketId, uint32_t numRecipients)
-{
- vespalib::LockGuard guard(_lock);
- DistributionCache::iterator it = _distributions.find(numRecipients);
- if (it == _distributions.end()) {
- it = _distributions.insert(DistributionCache::value_type(numRecipients, vdslib::BucketDistribution(1, 16u))).first;
- it->second.setNumColumns(numRecipients);
- }
- return it->second.getColumn(bucketId);
-}
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h
deleted file mode 100644
index a17824249c4..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/documentapi/common.h>
-#include <vespa/messagebus/routing/iroutingpolicy.h>
-#include <vespa/vdslib/bucketdistribution.h>
-#include <vespa/document/bucket/bucketidfactory.h>
-#include <vespa/vespalib/util/sync.h>
-#include <map>
-
-namespace documentapi {
-
-/**
- * This policy implements the logic to select recipients for a single search column.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- * @version $Id$
- */
-class SearchColumnPolicy : public mbus::IRoutingPolicy {
-private:
- typedef std::map<uint32_t, vdslib::BucketDistribution> DistributionCache;
-
- vespalib::Lock _lock;
- document::BucketIdFactory _factory;
- DistributionCache _distributions;
- uint32_t _maxOOS;
-
- /**
- * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it
- * needs to be synchronized.
- *
- * @param bucketId The bucket whose recipient to return.
- * @param numRecipients The number of recipients being distributed to.
- * @return The recipient to use.
- */
- uint32_t getRecipient(const document::BucketId &bucketId, uint32_t numRecipients);
-
-public:
- /**
- * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a
- * request to not allow any bad columns.
- *
- * @param param The maximum number of allowed bad columns.
- */
- SearchColumnPolicy(const string &param);
- ~SearchColumnPolicy();
-
- void select(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-};
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp
deleted file mode 100644
index 20f8398f1e6..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "searchrowpolicy.h"
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".searchrowpolicy");
-
-namespace documentapi {
-
-SearchRowPolicy::SearchRowPolicy(const string &param) :
- _minOk(0)
-{
- if (param.length() > 0) {
- int minOk = atoi(param.c_str());
- if (minOk > 0) {
- _minOk = (uint32_t)minOk;
- } else {
- LOG(warning,
- "Ignoring a request to set the minimum number of OK replies to %d because it makes no sense. "
- "This routing policy will not allow any recipient to be out of service.", minOk);
- }
- }
-}
-
-SearchRowPolicy::~SearchRowPolicy() {}
-
-void
-SearchRowPolicy::select(mbus::RoutingContext &context)
-{
- std::vector<mbus::Route> recipients;
- context.getMatchedRecipients(recipients);
- context.addChildren(recipients);
- context.setSelectOnRetry(false);
- if (_minOk > 0) {
- context.addConsumableError(mbus::ErrorCode::SERVICE_OOS);
- }
-}
-
-void
-SearchRowPolicy::merge(mbus::RoutingContext &context)
-{
- if (_minOk > 0) {
- std::set<uint32_t> oosReplies;
- uint32_t idx = 0;
- for (mbus::RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- const mbus::Reply &ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- oosReplies.insert(idx);
- }
- ++idx;
- }
- if (context.getNumChildren() - oosReplies.size() >= _minOk) {
- DocumentProtocol::merge(context, oosReplies);
- return;
- }
- }
- DocumentProtocol::merge(context);
-}
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h
deleted file mode 100644
index 2ca987ba4f7..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/documentapi/common.h>
-#include <vespa/messagebus/routing/iroutingpolicy.h>
-
-namespace documentapi {
-
-class SearchRowPolicy : public mbus::IRoutingPolicy {
-private:
- SearchRowPolicy(const SearchRowPolicy &);
- SearchRowPolicy &operator=(const SearchRowPolicy &);
-
-public:
- /**
- * Creates a search row policy that wraps the underlying search group policy in case the parameter is something
- * other than an empty string.
- *
- * @param param The number of minimum non-OOS replies that this policy requires.
- */
- SearchRowPolicy(const string &param);
- ~SearchRowPolicy();
-
- void select(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-private:
- uint32_t _minOk; // Hide OUT_OF_SERVICE as long as this number of replies are something else.
-};
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
index 741083009f5..2c244c63046 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
@@ -6,8 +6,6 @@
#include <vespa/documentapi/messagebus/policies/externpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
@@ -107,18 +105,6 @@ RoutingPolicyFactories::RoundRobinPolicyFactory::createPolicy(const string &para
}
mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::SearchColumnPolicyFactory::createPolicy(const string &param) const
-{
- return mbus::IRoutingPolicy::UP(new SearchColumnPolicy(param));
-}
-
-mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::SearchRowPolicyFactory::createPolicy(const string &param) const
-{
- return mbus::IRoutingPolicy::UP(new SearchRowPolicy(param));
-}
-
-mbus::IRoutingPolicy::UP
RoutingPolicyFactories::SubsetServicePolicyFactory::createPolicy(const string &param) const
{
return mbus::IRoutingPolicy::UP(new SubsetServicePolicy(param));
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
index 95a65008a9b..003768aedda 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
@@ -52,14 +52,6 @@ public:
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
};
- class SearchColumnPolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
- class SearchRowPolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
class SubsetServicePolicyFactory : public IRoutingPolicyFactory {
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
index 91b269081bf..adbe07df8d0 100644
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
@@ -34,8 +34,6 @@ public class StatusCodes {
return Response.Status.BAD_REQUEST;
case ErrorCode.NO_SERVICES_FOR_ROUTE:
return Response.Status.NOT_FOUND;
- case ErrorCode.SERVICE_OOS:
- return Response.Status.SERVICE_UNAVAILABLE;
case ErrorCode.ENCODE_ERROR:
return Response.Status.BAD_REQUEST;
case ErrorCode.NETWORK_ERROR:
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
index 7c645ec4e1b..ee8a4cc2b55 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
@@ -50,7 +50,9 @@ public final class ErrorCode {
/** No services found for the message route. */
public static final int NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3;
- /** The selected service was out of service. */
+ /** The selected service was out of service.
+ */
+ @Deprecated // Unused and will be removed
public static final int SERVICE_OOS = FATAL_ERROR + 4;
/** An error occured while encoding the message. */
diff --git a/messagebus/src/vespa/messagebus/errorcode.cpp b/messagebus/src/vespa/messagebus/errorcode.cpp
index 67330580f7a..af9089b987e 100644
--- a/messagebus/src/vespa/messagebus/errorcode.cpp
+++ b/messagebus/src/vespa/messagebus/errorcode.cpp
@@ -30,7 +30,6 @@ ErrorCode::getName(uint32_t errorCode)
case SEND_QUEUE_CLOSED : return "SEND_QUEUE_CLOSED";
case SEND_QUEUE_FULL : return "SEND_QUEUE_FULL";
case SEQUENCE_ERROR : return "SEQUENCE_ERROR";
- case SERVICE_OOS : return "SERVICE_OOS";
case SESSION_BUSY : return "SESSION_BUSY";
case TIMEOUT : return "TIMEOUT";
case TRANSIENT_ERROR : return "TRANSIENT_ERROR";
diff --git a/messagebus/src/vespa/messagebus/errorcode.h b/messagebus/src/vespa/messagebus/errorcode.h
index 2cdc6952a67..f170e29ca8f 100644
--- a/messagebus/src/vespa/messagebus/errorcode.h
+++ b/messagebus/src/vespa/messagebus/errorcode.h
@@ -72,7 +72,8 @@ public:
NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3,
// The selected service was out of service.
- SERVICE_OOS = FATAL_ERROR + 4,
+ // Unused.....
+ // SERVICE_OOS = FATAL_ERROR + 4,
// An error occured while encoding the message.
ENCODE_ERROR = FATAL_ERROR + 5,
diff --git a/messagebus_test/src/tests/errorcodes/DumpCodes.java b/messagebus_test/src/tests/errorcodes/DumpCodes.java
index 01003876fa3..96c7fc57a3e 100644
--- a/messagebus_test/src/tests/errorcodes/DumpCodes.java
+++ b/messagebus_test/src/tests/errorcodes/DumpCodes.java
@@ -24,7 +24,6 @@ public class DumpCodes {
dump("SEND_QUEUE_CLOSED", ErrorCode.SEND_QUEUE_CLOSED);
dump("ILLEGAL_ROUTE", ErrorCode.ILLEGAL_ROUTE);
dump("NO_SERVICES_FOR_ROUTE", ErrorCode.NO_SERVICES_FOR_ROUTE);
- dump("SERVICE_OOS", ErrorCode.SERVICE_OOS);
dump("ENCODE_ERROR", ErrorCode.ENCODE_ERROR);
dump("NETWORK_ERROR", ErrorCode.NETWORK_ERROR);
dump("UNKNOWN_PROTOCOL", ErrorCode.UNKNOWN_PROTOCOL);
diff --git a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
index fdc8892b7c7..ecc5c9bc42c 100644
--- a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
+++ b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
@@ -36,7 +36,6 @@ App::Main()
dump("SEND_QUEUE_CLOSED", ErrorCode::SEND_QUEUE_CLOSED);
dump("ILLEGAL_ROUTE", ErrorCode::ILLEGAL_ROUTE);
dump("NO_SERVICES_FOR_ROUTE", ErrorCode::NO_SERVICES_FOR_ROUTE);
- dump("SERVICE_OOS", ErrorCode::SERVICE_OOS);
dump("ENCODE_ERROR", ErrorCode::ENCODE_ERROR);
dump("NETWORK_ERROR", ErrorCode::NETWORK_ERROR);
dump("UNKNOWN_PROTOCOL", ErrorCode::UNKNOWN_PROTOCOL);
diff --git a/messagebus_test/src/tests/errorcodes/ref-dump.txt b/messagebus_test/src/tests/errorcodes/ref-dump.txt
index b8038816897..70a10de7b82 100644
--- a/messagebus_test/src/tests/errorcodes/ref-dump.txt
+++ b/messagebus_test/src/tests/errorcodes/ref-dump.txt
@@ -10,7 +10,6 @@ first unused TRANSIENT_ERROR => 100008 => "UNKNOWN(100008)"
SEND_QUEUE_CLOSED => 200001 => "SEND_QUEUE_CLOSED"
ILLEGAL_ROUTE => 200002 => "ILLEGAL_ROUTE"
NO_SERVICES_FOR_ROUTE => 200003 => "NO_SERVICES_FOR_ROUTE"
-SERVICE_OOS => 200004 => "SERVICE_OOS"
ENCODE_ERROR => 200005 => "ENCODE_ERROR"
NETWORK_ERROR => 200006 => "NETWORK_ERROR"
UNKNOWN_PROTOCOL => 200007 => "UNKNOWN_PROTOCOL"
diff --git a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
index 9825aea47bc..79a768541b9 100644
--- a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
@@ -93,7 +93,6 @@ ReturnCode::isNodeDownOrNetwork() const
case mbus::ErrorCode::UNKNOWN_SESSION:
case mbus::ErrorCode::HANDSHAKE_FAILED:
case mbus::ErrorCode::NO_SERVICES_FOR_ROUTE:
- case mbus::ErrorCode::SERVICE_OOS:
case mbus::ErrorCode::NETWORK_ERROR:
case mbus::ErrorCode::UNKNOWN_PROTOCOL:
case Protocol::ERROR_NODE_NOT_READY:
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 7cea767f9ba..0a9fe72552c 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -131,7 +131,6 @@ public class OperationProcessor {
exceptionMessage.contains("SEND_QUEUE_CLOSED") ||
exceptionMessage.contains("ILLEGAL_ROUTE") ||
exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") ||
- exceptionMessage.contains("SERVICE_OOS") ||
exceptionMessage.contains("NETWORK_ERROR") ||
exceptionMessage.contains("SEQUENCE_ERROR") ||
exceptionMessage.contains("NETWORK_SHUTDOWN") ||