diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 11:29:25 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-10-17 11:29:25 +0200 |
commit | 00bc2918ee015f0fd41e1f52d540d5c723b7c55f (patch) | |
tree | b5f982eb0c93fe0a66c128feaba1ff239df387ad /documentapi/src | |
parent | 65eb08a7ffd6690e5b500ac5dddcc2e9b92e0f7a (diff) |
Remove SERVICE_OOS from c++
Diffstat (limited to 'documentapi/src')
9 files changed, 72 insertions, 570 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 9d38247ebfd..2b554510f76 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -10,8 +10,6 @@ #include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h> #include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/messagebus/emptyreply.h> @@ -39,6 +37,8 @@ using document::readDocumenttypesConfig; using slobrok::api::IMirrorAPI; using namespace documentapi; using vespalib::make_string; +using std::make_unique; +using std::make_shared; class Test : public vespalib::TestApp { private: @@ -50,14 +50,12 @@ private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); bool tryDistribution(TestFrame &frame, const string &id, const string &expected); void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound); - void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, - int32_t numEntries = -1); + void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); - void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, - uint32_t numEntries); + void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, uint32_t numEntries); mbus::Message::UP newPutDocumentMessage(const string &documentId); public: @@ -75,9 +73,6 @@ public: void testProtocol(); void testRoundRobin(); void testRoundRobinCache(); - void testSearchColumn(); - void testSearchRow(); - void testSearchRowMerge(); void multipleGetRepliesAreMergedToFoundDocument(); void testSubsetService(); void testSubsetServiceCache(); @@ -117,9 +112,6 @@ Test::Main() { testLocalServiceCache(); TEST_FLUSH(); testRoundRobin(); TEST_FLUSH(); testRoundRobinCache(); TEST_FLUSH(); - testSearchColumn(); TEST_FLUSH(); - testSearchRow(); TEST_FLUSH(); - testSearchRowMerge(); TEST_FLUSH(); testSubsetService(); TEST_FLUSH(); testSubsetServiceCache(); TEST_FLUSH(); @@ -144,43 +136,34 @@ Test::testProtocol() mbus::IProtocol::SP protocol(new DocumentProtocol(_loadTypes, _repo)); mbus::IRoutingPolicy::UP policy = protocol->createPolicy("AND", ""); - ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("DocumentRouteSelector", "raw:route[0]\n"); - ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("Extern", "foo;bar/baz"); - ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("LoadBalancer", "cluster=docproc/cluster.default;" "session=chain.default;syncinit"); - ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("LocalService", ""); - ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("RoundRobin", ""); - ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != NULL); - - policy = protocol->createPolicy("SearchRow", ""); - ASSERT_TRUE(dynamic_cast<SearchRowPolicy*>(policy.get()) != NULL); - - policy = protocol->createPolicy("SearchColumn", ""); - ASSERT_TRUE(dynamic_cast<SearchColumnPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("SubsetService", ""); - ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != nullptr); } void Test::testAND() { TestFrame frame(_repo); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage( - document::Document::SP( - new document::Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); frame.setHop(mbus::HopSpec("test", "[AND]") .addRecipient("foo") .addRecipient("bar")); @@ -242,7 +225,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(servers.size(), lst.size()); for (uint32_t i = 0; i < servers.size(); ++i) { @@ -267,15 +250,14 @@ mbus::Message::UP Test::newPutDocumentMessage(const string &documentId) { Document::SP doc(new Document(*_docType, DocumentId(documentId))); - return mbus::Message::UP(new PutDocumentMessage(doc)); + return make_unique<PutDocumentMessage>(doc); } void Test::setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries) { - string param = vespalib::make_string("tcp/localhost:%d;%s", - slobrok.port(), pattern.c_str()); + string param = vespalib::make_string("tcp/localhost:%d;%s", slobrok.port(), pattern.c_str()); frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Extern:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); @@ -340,7 +322,7 @@ Test::testExternSend() mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); // Send message from local node to remote cluster and resolve route there. - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->getTrace().setLevel(9); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port()))); @@ -376,7 +358,7 @@ Test::testExternMultipleSlobroks() std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); ASSERT_TRUE((msg = dr.getMessage(600))); @@ -392,7 +374,7 @@ Test::testExternMultipleSlobroks() std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); ASSERT_TRUE((msg = dr.getMessage(600))); @@ -407,8 +389,7 @@ Test::testLocalService() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test select with proper address. for (uint32_t i = 0; i < 10; ++i) { @@ -424,7 +405,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(10u, lst.size()); @@ -437,7 +418,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(1u, lst.size()); EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin()); @@ -452,12 +433,12 @@ Test::testLocalServiceCache() { TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "docproc/cluster.default/[LocalService]/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:foo"))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("test", "docproc/cluster.default/[LocalService]/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( @@ -480,8 +461,8 @@ Test::testLocalServiceCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(barFrame.getReceptor().getReply(600)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); } void @@ -489,9 +470,7 @@ Test::testRoundRobin() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test select with proper address. for (uint32_t i = 0; i < 10; ++i) { @@ -530,13 +509,13 @@ Test::testRoundRobinCache() TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "[RoundRobin]"); fooHop.addRecipient("docproc/cluster.default/0/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo")))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("bar", "[RoundRobin]"); barHop.addRecipient("docproc/cluster.default/0/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( @@ -559,120 +538,8 @@ Test::testRoundRobinCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL); -} - -void -Test::testSearchRow() -{ - TestFrame frame(_repo); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo")); - EXPECT_TRUE(frame.testMergeOneReply("foo")); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar")); - EXPECT_TRUE(frame.testMergeTwoReplies("foo", "bar")); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo")); - TestFrame::ReplyMap replies; - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList().add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo") - .addRecipient("bar")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:2]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); -} - -void -Test::testSearchRowMerge() -{ - TestFrame frame(_repo); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo")); - tryWasFound(frame, 1, 0x0, false); - tryWasFound(frame, 1, 0x1, true); - - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar")); - tryWasFound(frame, 2, 0x0, false); - tryWasFound(frame, 2, 0x1, true); - tryWasFound(frame, 2, 0x2, true); - tryWasFound(frame, 2, 0x3, true); - - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - tryWasFound(frame, 3, 0x0, false); - tryWasFound(frame, 3, 0x1, true); - tryWasFound(frame, 3, 0x2, true); - tryWasFound(frame, 3, 0x3, true); - tryWasFound(frame, 3, 0x4, true); - tryWasFound(frame, 3, 0x5, true); - tryWasFound(frame, 3, 0x6, true); - tryWasFound(frame, 3, 0x7, true); + ASSERT_TRUE(barFrame.getReceptor().getReply(600)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); } void @@ -680,7 +547,7 @@ Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound) { { - frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:69")))); + frame.setMessage(make_unique<RemoveDocumentMessage>(DocumentId("doc:scheme:69"))); std::vector<mbus::RoutingNode*> selected; EXPECT_TRUE(frame.select(selected, expectedRecipients)); for (uint32_t i = 0, len = selected.size(); i < len; ++i) { @@ -689,7 +556,7 @@ Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients, selected[i]->handleReply(std::move(reply)); } mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); + EXPECT_TRUE(reply); EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_REMOVEDOCUMENT, reply->getType()); EXPECT_EQUAL(expectedFound, static_cast<RemoveDocumentReply&>(*reply).wasFound()); } @@ -704,7 +571,7 @@ Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients, selected[i]->handleReply(std::move(reply)); } mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); + EXPECT_TRUE(reply); EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_UPDATEDOCUMENT, reply->getType()); EXPECT_EQUAL(expectedFound, static_cast<UpdateDocumentReply&>(*reply).wasFound()); } @@ -724,11 +591,11 @@ Test::multipleGetRepliesAreMergedToFoundDocument() "route[1].feed \"myfeed\"\n]") .addRecipient("foo") .addRecipient("bar")); - frame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:yarn")))); + frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:yarn"))); std::vector<mbus::RoutingNode*> selected; EXPECT_TRUE(frame.select(selected, 2)); for (uint32_t i = 0, len = selected.size(); i < len; ++i) { - document::Document::SP doc; + Document::SP doc; if (i == 0) { doc.reset(new Document(*_docType, DocumentId("doc:scheme:yarn"))); doc->setLastModified(123456ULL); @@ -737,57 +604,16 @@ Test::multipleGetRepliesAreMergedToFoundDocument() selected[i]->handleReply(std::move(reply)); } mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); - EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), - reply->getType()); + EXPECT_TRUE(reply); + EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType()); EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified()); } -void -Test::testSearchColumn() -{ - TestFrame frame(_repo); - frame.setHop(mbus::HopSpec("test", "[SearchColumn]") - .addRecipient("c0") - .addRecipient("c1") - .addRecipient("c2") - .addRecipient("c3")); - - // Test hash distribution. - EXPECT_TRUE(tryDistribution(frame, "doc:ns:3", "c0")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:18", "c1")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:0", "c2")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:4", "c3")); - - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:0", "c0")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:1", "c0")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:2", "c1")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:3", "c1")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:4", "c2")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:5", "c2")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:6", "c3")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:7", "c3")); - - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:0", "c0")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:1", "c0")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:2", "c1")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:3", "c1")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:4", "c2")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:5", "c2")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:6", "c3")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:7", "c3")); - - // Test routing based on message type. - mbus::Message::UP put(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:"))))); -} - bool Test::tryDistribution(TestFrame &frame, const string &id, const string &expected) { Document::SP doc(new Document(*_docType, DocumentId(id))); - mbus::Message::UP msg(new PutDocumentMessage(doc)); + mbus::Message::UP msg = make_unique<PutDocumentMessage>(doc); frame.setMessage(std::move(msg)); return frame.testSelect(StringList().add(expected)); } @@ -804,13 +630,13 @@ Test::testDocumentRouteSelector() "route[0].feed \"baz\"\n"; { DocumentProtocol protocol(_loadTypes, _repo, okConfig); - EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL); - EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != NULL); + EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr); + EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != nullptr); } { DocumentProtocol protocol(_loadTypes, _repo, errConfig); - EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL); - EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != NULL); + EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr); + EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != nullptr); } // Test policy with proper config. @@ -826,19 +652,17 @@ Test::testDocumentRouteSelector() .addRecipient("foo") .addRecipient("bar")); - frame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0))); + frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:"), 0)); EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar"))); - mbus::Message::UP put(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:"))))); + mbus::Message::UP put = make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))); frame.setMessage(std::move(put)); EXPECT_TRUE(frame.testSelect( StringList().add("foo"))); { vdslib::OperationList opList; - document::DocumentId id("doc:scheme:"); + DocumentId id("doc:scheme:"); Document::UP doc(new Document(*_docType, id)); opList.addPut(std::move(doc)); @@ -849,24 +673,20 @@ Test::testDocumentRouteSelector() { vdslib::OperationList opList; - document::DocumentId id("doc:scheme:"); + DocumentId id("doc:scheme:"); Document::UP doc(new Document(*_repo->getDocumentType("other"), id)); opList.addPut(std::move(doc)); document::BucketIdFactory factory; - put = frame.setMessage(MultiOperationMessage::create(_repo, - factory.getBucketId(id), opList)); + put = frame.setMessage(MultiOperationMessage::create(_repo, factory.getBucketId(id), opList)); EXPECT_TRUE(frame.testSelect(StringList().add("bar"))); } - frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(document::DocumentId("doc:scheme:")))); + frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar"))); - frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage( - document::DocumentUpdate::SP( - new document::DocumentUpdate( - *_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<UpdateDocumentMessage>( + make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("foo"))); frame.setMessage(std::move(put)); @@ -884,22 +704,17 @@ Test::testDocumentRouteSelectorIgnore() "route[0].feed \"myfeed\"\n]") .addRecipient("docproc/cluster.foo")); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage( - document::Document::SP( - new document::Document(*_docType, - DocumentId("id:yarn:testdoc:n=1234:fluff")))))); + frame.setMessage(make_unique<PutDocumentMessage>( + make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff")))); std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 0)); mbus::Reply::UP reply = frame.getReceptor().getReply(600); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType()); EXPECT_EQUAL(0u, reply->getNumErrors()); - frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage( - document::DocumentUpdate::SP( - new document::DocumentUpdate( - *_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<UpdateDocumentMessage>( + make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("docproc/cluster.foo"))); } @@ -989,7 +804,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() mbus::TestServer *srv = new mbus::TestServer( mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)), mbus::RoutingSpec(), slobrok, - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + make_shared<DocumentProtocol>(_loadTypes, _repo)); servers.push_back(srv); srv->net.registerSession("default"); } @@ -999,7 +814,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); std::set<string> lst; for (uint32_t i = 0; i < 666; i++) { @@ -1022,10 +837,8 @@ Test::setupStoragePolicy(TestFrame &frame, const string ¶m, mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0)); - StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy( - DocumentProtocol::NAME, - dir.getName(), - dir.getParam())); + StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); if (numEntries >= 0) { @@ -1046,7 +859,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() mbus::TestServer *srv = new mbus::TestServer( mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)), mbus::RoutingSpec(), slobrok, - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + make_shared<DocumentProtocol>(_loadTypes, _repo)); servers.push_back(srv); srv->net.registerSession("default"); } @@ -1056,12 +869,12 @@ Test::requireThatStoragePolicyIsTargetedWithState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); { std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:5 storage:5"))); - ASSERT_TRUE(policy.getSystemState() != NULL); + ASSERT_TRUE(policy.getSystemState() != nullptr); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5"); } std::set<string> lst; @@ -1086,7 +899,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() mbus::Slobrok slobrok; mbus::TestServer server(mbus::Identity("storage/cluster.mycluster/distributor/0"), mbus::RoutingSpec(), slobrok, - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + make_shared<DocumentProtocol>(_loadTypes, _repo)); server.net.registerSession("default"); string param = vespalib::make_string( @@ -1095,12 +908,12 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); { std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:99 storage:99"))); - ASSERT_TRUE(policy.getSystemState() != NULL); + ASSERT_TRUE(policy.getSystemState() != nullptr); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99"); } for (int i = 0; i < 666; i++) { @@ -1113,9 +926,7 @@ Test::testSubsetService() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test requerying for adding nodes. frame.setHop(mbus::HopSpec("test", "docproc/cluster.default/[SubsetService:2]/chain.default")); @@ -1128,7 +939,7 @@ Test::testSubsetService() ASSERT_TRUE(frame.select(leaf, 1)); lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } ASSERT_TRUE(lst.size() > 1); // must have requeried @@ -1147,7 +958,7 @@ Test::testSubsetService() prev = next; leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } // Test requerying for dropping nodes. @@ -1164,7 +975,7 @@ Test::testSubsetService() mbus::Reply::UP reply(new mbus::EmptyReply()); reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route)); leaf[0]->handleReply(std::move(reply)); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(10u, lst.size()); @@ -1178,12 +989,12 @@ Test::testSubsetServiceCache() { TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "docproc/cluster.default/[SubsetService:2]/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo")))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("bar", "docproc/cluster.default/[SubsetService:2]/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 95115d1c3c1..a1837b6bcd0 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -41,8 +41,6 @@ DocumentProtocol::DocumentProtocol(const LoadTypeSet& loadTypes, putRoutingPolicyFactory("Extern", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::ExternPolicyFactory())); putRoutingPolicyFactory("LocalService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LocalServicePolicyFactory())); putRoutingPolicyFactory("RoundRobin", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::RoundRobinPolicyFactory())); - putRoutingPolicyFactory("SearchColumn", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchColumnPolicyFactory())); - putRoutingPolicyFactory("SearchRow", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchRowPolicyFactory())); putRoutingPolicyFactory("Storage", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::StoragePolicyFactory())); putRoutingPolicyFactory("SubsetService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SubsetServicePolicyFactory())); putRoutingPolicyFactory("LoadBalancer", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LoadBalancerPolicyFactory())); @@ -134,8 +132,7 @@ DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable std::ostringstream message; document::StringUtil::printAsHex( message, blob.data(), blob.size()); - LOG(spam, "Encoded message of protocol %s type %u using version " - "%s serialization:\n%s", + LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s", routable.getProtocol().c_str(), routable.getType(), version.toString().c_str(), message.str().c_str()); } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index f1a691bc46d..26d51e702e9 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -11,8 +11,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT externpolicy.cpp localservicepolicy.cpp roundrobinpolicy.cpp - searchcolumnpolicy.cpp - searchrowpolicy.cpp subsetservicepolicy.cpp loadbalancer.cpp loadbalancerpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp deleted file mode 100644 index 38610aca551..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "searchcolumnpolicy.h" -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/batchdocumentupdatemessage.h> -#include <vespa/documentapi/messagebus/messages/multioperationmessage.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/util/hashmap.h> -#include <vespa/log/log.h> -LOG_SETUP(".searchcolumnpolicy"); - -namespace documentapi { - -SearchColumnPolicy::SearchColumnPolicy(const string ¶m) : - _lock(), - _factory(), - _distributions(), - _maxOOS(0) -{ - if (param.length() > 0) { - int maxOOS = atoi(param.c_str()); - if (maxOOS >= 0) { - _maxOOS = (uint32_t)maxOOS; - } else { - LOG(warning, - "Ignoring a request to set the maximum number of OOS replies to %d because it makes no " - "sense. This routing policy will not allow any recipient to be out of service.", maxOOS); - } - } -} - -SearchColumnPolicy::~SearchColumnPolicy() -{ - // empty -} - -void -SearchColumnPolicy::select(mbus::RoutingContext &context) -{ - std::vector<mbus::Route> recipients; - context.getMatchedRecipients(recipients); - if (recipients.empty()) { - return; - } - const document::DocumentId *id = NULL; - document::BucketId bucketId; - - const mbus::Message &msg = context.getMessage(); - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = &static_cast<const PutDocumentMessage&>(msg).getDocument().getId(); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = &static_cast<const GetDocumentMessage&>(msg).getDocumentId(); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = &static_cast<const RemoveDocumentMessage&>(msg).getDocumentId(); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = &static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId(); - break; - - case DocumentProtocol::MESSAGE_MULTIOPERATION: - bucketId = (static_cast<const MultiOperationMessage&>(msg)).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE: - bucketId = (static_cast<const BatchDocumentUpdateMessage&>(msg)).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - if (bucketId.getRawId() == 0) { - bucketId = _factory.getBucketId(*id); - } - uint32_t recipient = getRecipient(bucketId, recipients.size()); - context.addChild(recipients[recipient]); - context.setSelectOnRetry(true); - if (_maxOOS > 0) { - context.addConsumableError(mbus::ErrorCode::SERVICE_OOS); - } -} - -void -SearchColumnPolicy::merge(mbus::RoutingContext &context) -{ - if (_maxOOS > 0) { - if (context.getNumChildren() > 1) { - std::set<uint32_t> oosReplies; - uint32_t idx = 0; - for (mbus::RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - const mbus::Reply &ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - oosReplies.insert(idx); - } - ++idx; - } - if (oosReplies.size() <= _maxOOS) { - DocumentProtocol::merge(context, oosReplies); - return; // may the rtx be with you - } - } else { - const mbus::Reply &ref = context.getChildIterator().getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - context.setReply(mbus::Reply::UP(new mbus::EmptyReply())); - return; // god help us all - } - } - } - DocumentProtocol::merge(context); -} - -uint32_t -SearchColumnPolicy::getRecipient(const document::BucketId &bucketId, uint32_t numRecipients) -{ - vespalib::LockGuard guard(_lock); - DistributionCache::iterator it = _distributions.find(numRecipients); - if (it == _distributions.end()) { - it = _distributions.insert(DistributionCache::value_type(numRecipients, vdslib::BucketDistribution(1, 16u))).first; - it->second.setNumColumns(numRecipients); - } - return it->second.getColumn(bucketId); -} - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h deleted file mode 100644 index a17824249c4..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/documentapi/common.h> -#include <vespa/messagebus/routing/iroutingpolicy.h> -#include <vespa/vdslib/bucketdistribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/vespalib/util/sync.h> -#include <map> - -namespace documentapi { - -/** - * This policy implements the logic to select recipients for a single search column. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - * @version $Id$ - */ -class SearchColumnPolicy : public mbus::IRoutingPolicy { -private: - typedef std::map<uint32_t, vdslib::BucketDistribution> DistributionCache; - - vespalib::Lock _lock; - document::BucketIdFactory _factory; - DistributionCache _distributions; - uint32_t _maxOOS; - - /** - * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it - * needs to be synchronized. - * - * @param bucketId The bucket whose recipient to return. - * @param numRecipients The number of recipients being distributed to. - * @return The recipient to use. - */ - uint32_t getRecipient(const document::BucketId &bucketId, uint32_t numRecipients); - -public: - /** - * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a - * request to not allow any bad columns. - * - * @param param The maximum number of allowed bad columns. - */ - SearchColumnPolicy(const string ¶m); - ~SearchColumnPolicy(); - - void select(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; -}; - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp deleted file mode 100644 index 20f8398f1e6..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "searchrowpolicy.h" -#include <vespa/documentapi/messagebus/documentprotocol.h> - -#include <vespa/log/log.h> -LOG_SETUP(".searchrowpolicy"); - -namespace documentapi { - -SearchRowPolicy::SearchRowPolicy(const string ¶m) : - _minOk(0) -{ - if (param.length() > 0) { - int minOk = atoi(param.c_str()); - if (minOk > 0) { - _minOk = (uint32_t)minOk; - } else { - LOG(warning, - "Ignoring a request to set the minimum number of OK replies to %d because it makes no sense. " - "This routing policy will not allow any recipient to be out of service.", minOk); - } - } -} - -SearchRowPolicy::~SearchRowPolicy() {} - -void -SearchRowPolicy::select(mbus::RoutingContext &context) -{ - std::vector<mbus::Route> recipients; - context.getMatchedRecipients(recipients); - context.addChildren(recipients); - context.setSelectOnRetry(false); - if (_minOk > 0) { - context.addConsumableError(mbus::ErrorCode::SERVICE_OOS); - } -} - -void -SearchRowPolicy::merge(mbus::RoutingContext &context) -{ - if (_minOk > 0) { - std::set<uint32_t> oosReplies; - uint32_t idx = 0; - for (mbus::RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - const mbus::Reply &ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - oosReplies.insert(idx); - } - ++idx; - } - if (context.getNumChildren() - oosReplies.size() >= _minOk) { - DocumentProtocol::merge(context, oosReplies); - return; - } - } - DocumentProtocol::merge(context); -} - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h deleted file mode 100644 index 2ca987ba4f7..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/documentapi/common.h> -#include <vespa/messagebus/routing/iroutingpolicy.h> - -namespace documentapi { - -class SearchRowPolicy : public mbus::IRoutingPolicy { -private: - SearchRowPolicy(const SearchRowPolicy &); - SearchRowPolicy &operator=(const SearchRowPolicy &); - -public: - /** - * Creates a search row policy that wraps the underlying search group policy in case the parameter is something - * other than an empty string. - * - * @param param The number of minimum non-OOS replies that this policy requires. - */ - SearchRowPolicy(const string ¶m); - ~SearchRowPolicy(); - - void select(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; -private: - uint32_t _minOk; // Hide OUT_OF_SERVICE as long as this number of replies are something else. -}; - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 741083009f5..2c244c63046 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -6,8 +6,6 @@ #include <vespa/documentapi/messagebus/policies/externpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/contentpolicy.h> @@ -107,18 +105,6 @@ RoutingPolicyFactories::RoundRobinPolicyFactory::createPolicy(const string ¶ } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::SearchColumnPolicyFactory::createPolicy(const string ¶m) const -{ - return mbus::IRoutingPolicy::UP(new SearchColumnPolicy(param)); -} - -mbus::IRoutingPolicy::UP -RoutingPolicyFactories::SearchRowPolicyFactory::createPolicy(const string ¶m) const -{ - return mbus::IRoutingPolicy::UP(new SearchRowPolicy(param)); -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::SubsetServicePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new SubsetServicePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index 95a65008a9b..003768aedda 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -52,14 +52,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class SearchColumnPolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; - class SearchRowPolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class SubsetServicePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; |