summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-27 14:42:09 +0200
committerGitHub <noreply@github.com>2022-06-27 14:42:09 +0200
commit7f8d32a811a6f33a388a780b5b3284cf12ae8624 (patch)
tree64cfbc97edaba6547ea4ec654f8d3b8f7d4f3340
parent5e6c4ba9f90073bd4f0fbcf0bc5ddb6f1948f0fe (diff)
parent2493482192eca0bd06ba0e33154654146dbc030b (diff)
Merge pull request #23240 from vespa-engine/balder/no-longer-require-sequencing
Remove 'requireSequencing' as an option.
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/Distributor.java8
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/content/StorageNode.java7
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/DistributorTest.java2
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java4
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.h1
-rw-r--r--messagebus/src/tests/protocolrepository/protocolrepository.cpp41
-rw-r--r--messagebus/src/vespa/messagebus/iprotocol.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp48
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h4
-rw-r--r--messagebus/src/vespa/messagebus/testlib/simpleprotocol.h1
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def2
-rw-r--r--storage/src/vespa/storageapi/mbusprot/storageprotocol.h1
12 files changed, 41 insertions, 81 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/Distributor.java b/config-model/src/main/java/com/yahoo/vespa/model/content/Distributor.java
index 2215a0a8df9..b4c48b3204b 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/Distributor.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/Distributor.java
@@ -3,7 +3,6 @@ package com.yahoo.vespa.model.content;
import com.yahoo.config.model.api.ModelContext;
import com.yahoo.config.model.deploy.DeployState;
-import com.yahoo.vespa.config.content.core.StorCommunicationmanagerConfig;
import com.yahoo.vespa.config.content.core.StorDistributormanagerConfig;
import com.yahoo.vespa.config.content.core.StorServerConfig;
import com.yahoo.config.model.producer.AbstractConfigProducer;
@@ -71,13 +70,6 @@ public class Distributor extends ContentNode implements StorDistributormanagerCo
}
@Override
- public void getConfig(StorCommunicationmanagerConfig.Builder builder) {
- super.getConfig(builder);
- // Single distributor needs help to encode the messages.
- builder.mbus.dispatch_on_encode(true);
- }
-
- @Override
public void getConfig(StorDistributormanagerConfig.Builder builder) {
builder.num_distributor_stripes(tuneNumDistributorStripes());
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageNode.java b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageNode.java
index 1bdd281d38e..af103d4d5b9 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageNode.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageNode.java
@@ -6,7 +6,6 @@ import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.model.producer.AbstractConfigProducer;
import com.yahoo.vespa.config.content.StorFilestorConfig;
import com.yahoo.vespa.config.content.core.StorBucketmoverConfig;
-import com.yahoo.vespa.config.content.core.StorCommunicationmanagerConfig;
import com.yahoo.vespa.config.content.core.StorServerConfig;
import com.yahoo.vespa.defaults.Defaults;
import com.yahoo.vespa.model.application.validation.RestartConfigs;
@@ -94,10 +93,4 @@ public class StorageNode extends ContentNode implements StorServerConfig.Produce
cluster.getConfig(builder);
}
- @Override
- public void getConfig(StorCommunicationmanagerConfig.Builder builder) {
- super.getConfig(builder);
- builder.mbus.dispatch_on_encode(false);
- }
-
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/DistributorTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/DistributorTest.java
index d23dca65dd1..635f799411b 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/DistributorTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/DistributorTest.java
@@ -291,7 +291,7 @@ public class DistributorTest {
cluster.getChildren().get("0").getConfig(builder);
StorCommunicationmanagerConfig config = new StorCommunicationmanagerConfig(builder);
assertTrue(config.mbus().dispatch_on_encode());
- assertFalse(config.mbus().dispatch_on_decode());
+ assertTrue(config.mbus().dispatch_on_decode());
assertEquals(4, config.mbus().num_threads());
assertEquals(StorCommunicationmanagerConfig.Mbus.Optimize_for.LATENCY, config.mbus().optimize_for());
assertFalse(config.skip_thread());
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
index 8908ab9f5b9..c03ffba1c34 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java
@@ -101,8 +101,8 @@ public class StorageClusterTest {
StorCommunicationmanagerConfig.Builder builder = new StorCommunicationmanagerConfig.Builder();
storage.getChildren().get("0").getConfig(builder);
StorCommunicationmanagerConfig config = new StorCommunicationmanagerConfig(builder);
- assertFalse(config.mbus().dispatch_on_encode());
- assertFalse(config.mbus().dispatch_on_decode());
+ assertTrue(config.mbus().dispatch_on_encode());
+ assertTrue(config.mbus().dispatch_on_decode());
assertEquals(4, config.mbus().num_threads());
assertEquals(StorCommunicationmanagerConfig.Mbus.Optimize_for.LATENCY, config.mbus().optimize_for());
assertFalse(config.skip_thread());
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
index ffbb63e54f1..f55c05f72cd 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h
@@ -292,7 +292,6 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string &param) const override;
mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override;
mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override;
- bool requireSequencing() const override { return false; }
};
}
diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
index a9a268262ee..20649d56d13 100644
--- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp
+++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
@@ -13,42 +13,23 @@ private:
public:
- TestProtocol(const string &name)
+ TestProtocol(const string &name) noexcept
: _name(name)
- {
- // empty
- }
+ { }
- const string &
- getName() const override
- {
- return _name;
- }
+ const string & getName() const override { return _name; }
- IRoutingPolicy::UP
- createPolicy(const string &name, const string &param) const override
- {
- (void)name;
- (void)param;
+ IRoutingPolicy::UP createPolicy(const string &, const string &) const override {
throw std::exception();
}
- Blob
- encode(const vespalib::Version &version, const Routable &routable) const override
- {
- (void)version;
- (void)routable;
+ Blob encode(const vespalib::Version &, const Routable &) const override {
throw std::exception();
}
- Routable::UP
- decode(const vespalib::Version &version, BlobRef data) const override
- {
- (void)version;
- (void)data;
+ Routable::UP decode(const vespalib::Version &, BlobRef ) const override {
throw std::exception();
}
- bool requireSequencing() const override { return false; }
};
int
@@ -58,16 +39,16 @@ Test::Main()
ProtocolRepository repo;
IProtocol::SP prev;
- prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo")));
- ASSERT_TRUE(prev.get() == NULL);
+ prev = repo.putProtocol(std::make_shared<TestProtocol>("foo"));
+ ASSERT_FALSE(prev);
IRoutingPolicy::SP policy = repo.getRoutingPolicy("foo", "bar", "baz");
- prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo")));
- ASSERT_TRUE(prev.get() != NULL);
+ prev = repo.putProtocol(std::make_shared<TestProtocol>("foo"));
+ ASSERT_TRUE(prev);
ASSERT_NOT_EQUAL(prev.get(), repo.getProtocol("foo"));
policy = repo.getRoutingPolicy("foo", "bar", "baz");
- ASSERT_TRUE(policy.get() == NULL);
+ ASSERT_FALSE(policy);
TEST_DONE();
}
diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h
index 20726ced71b..e46a5600471 100644
--- a/messagebus/src/vespa/messagebus/iprotocol.h
+++ b/messagebus/src/vespa/messagebus/iprotocol.h
@@ -79,9 +79,6 @@ public:
* @return The decoded routable.
*/
virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw()
-
-
- virtual bool requireSequencing() const = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 485c8e3e911..7627aa876b3 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -55,10 +55,10 @@ private:
}
-RPCSend::RPCSend() :
- _net(nullptr),
- _clientIdent("client"),
- _serverIdent("server")
+RPCSend::RPCSend()
+ : _net(nullptr),
+ _clientIdent("client"),
+ _serverIdent("server")
{ }
RPCSend::~RPCSend() = default;
@@ -220,19 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
- const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
+ if (!_net->allowDispatchForEncode()) {
+ doHandleReply(std::move(reply));
} else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
- doHandleReply(protocol, std::move(reply));
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable {
+ doHandleReply(std::move(reply));
}));
assert (!rejected);
}
}
void
-RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) {
+RPCSend::doHandleReply(Reply::UP reply) {
+ const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
FRT_RPCRequest &req = ctx->getRequest();
string version = ctx->getVersion().toString();
@@ -256,29 +256,29 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
- FRT_Values &args = *req->GetParams();
- std::unique_ptr<Params> params = toParams(args);
- IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
- if (protocol == nullptr) {
- replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
- vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
- return;
- }
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
+ if (!_net->allowDispatchForDecode()) {
+ doRequest(req);
} else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
- doRequest(req, protocol, std::move(params));
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() {
+ doRequest(req);
}));
assert (!rejected);
}
}
void
-RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params)
+RPCSend::doRequest(FRT_RPCRequest *req)
{
+ FRT_Values &args = *req->GetParams();
+ std::unique_ptr<Params> params = toParams(args);
+ IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
+ if (protocol == nullptr) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
+ vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
+ return;
+ }
Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
req->DiscardBlobs();
if ( ! routable ) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index e548c4adca2..c2bcb7dff2b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -83,9 +83,9 @@ public:
void invoke(FRT_RPCRequest *req);
private:
- void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params);
+ void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
- void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply);
+ void doHandleReply(std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
index 57a6e901b90..0486f058217 100644
--- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
+++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
@@ -72,7 +72,6 @@ public:
IRoutingPolicy::UP createPolicy(const string &name, const string &param) const override;
Blob encode(const vespalib::Version &version, const Routable &routable) const override;
Routable::UP decode(const vespalib::Version &version, BlobRef data) const override;
- virtual bool requireSequencing() const override { return false; }
};
} // namespace mbus
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 65a3cd718ec..6090e6bcd08 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -51,7 +51,7 @@ mbus.dispatch_on_encode bool default=true restart
## Enable to use above thread pool for decoding replies
## False will use network(fnet) thread
## Todo: Change default once verified in large scale deployment.
-mbus.dispatch_on_decode bool default=false restart
+mbus.dispatch_on_decode bool default=true restart
## Skip messenger thread on reply
## Experimental
diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
index 3f36ac42117..591009c9832 100644
--- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
+++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h
@@ -22,7 +22,6 @@ public:
mbus::IRoutingPolicy::UP createPolicy(const mbus::string& name, const mbus::string& param) const override;
mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override;
mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override;
- bool requireSequencing() const override { return true; }
private:
ProtocolSerialization5_0 _serializer5_0;
ProtocolSerialization5_1 _serializer5_1;