summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-01-11 08:20:40 +0100
committerGitHub <noreply@github.com>2018-01-11 08:20:40 +0100
commit97d17bbdd0870c6a448a446915ffd908b3096d95 (patch)
tree6844ab71423421c68f49a1968bd12cf5118b9d07 /storage
parentc2e5bef487b650d7184f8b392c96e33905936623 (diff)
parent019c73433cc1f72197e3cbb2a9821d6633f4e66c (diff)
Merge pull request #4585 from vespa-engine/vekterli/wire-configurable-bucket-space-resolver-into-communication-manager-rebased
Wire in bucket spaces config changes to dynamic bucket space resolving (rebased)
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp106
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp45
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h7
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanagermetrics.h1
-rw-r--r--storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h1
8 files changed, 164 insertions, 16 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index cd5dd2a01a4..e7e08f28ce3 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -6,11 +6,14 @@
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/persistence/spi/fixed_bucket_spaces.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/testhelper.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h>
#include <vespa/vdstestlib/cppunit/macros.h>
+#include <vespa/vespalib/util/stringfmt.h>
using document::test::makeDocumentBucket;
@@ -22,6 +25,8 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
void testStorPendingLimitConfigsArePropagatedToMessageBus();
void testCommandsAreDequeuedInPriorityOrder();
void testRepliesAreDequeuedInFifoOrder();
+ void bucket_space_config_can_be_updated_live();
+ void unmapped_bucket_space_documentapi_request_returns_error_reply();
static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60;
@@ -44,6 +49,8 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus);
CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder);
CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder);
+ CPPUNIT_TEST(bucket_space_config_can_be_updated_live);
+ CPPUNIT_TEST(unmapped_bucket_space_documentapi_request_returns_error_reply);
CPPUNIT_TEST_SUITE_END();
};
@@ -232,4 +239,103 @@ CommunicationManagerTest::testRepliesAreDequeuedInFifoOrder()
}
}
+struct MockMbusReplyHandler : mbus::IReplyHandler {
+ std::vector<std::unique_ptr<mbus::Reply>> replies;
+
+ void handleReply(std::unique_ptr<mbus::Reply> msg) override {
+ replies.emplace_back(std::move(msg));
+ }
+};
+
+struct CommunicationManagerFixture {
+ MockMbusReplyHandler reply_handler;
+ mbus::Slobrok slobrok;
+ std::unique_ptr<TestServiceLayerApp> node;
+ std::unique_ptr<CommunicationManager> comm_mgr;
+ DummyStorageLink* bottom_link;
+
+ CommunicationManagerFixture() {
+ vdstestlib::DirConfig stor_config(getStandardConfig(true));
+ stor_config.getConfig("stor-server").set("node_index", "1");
+ addSlobrokConfig(stor_config, slobrok);
+
+ node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId());
+ comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(),
+ stor_config.getConfigId());
+ bottom_link = new DummyStorageLink();
+ comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link));
+ comm_mgr->open();
+ }
+ ~CommunicationManagerFixture();
+
+ std::unique_ptr<documentapi::GetDocumentMessage> documentapi_message_for_space(const char* space) {
+ auto cmd = std::make_unique<documentapi::GetDocumentMessage>(
+ document::DocumentId(vespalib::make_string("id::%s::stuff", space)));
+ // Bind reply handling to our own mock handler
+ cmd->pushHandler(reply_handler);
+ return cmd;
+ }
+};
+
+CommunicationManagerFixture::~CommunicationManagerFixture() = default;
+
+using vespa::config::content::core::BucketspacesConfigBuilder;
+
+namespace {
+
+BucketspacesConfigBuilder::Documenttype doc_type(vespalib::stringref name, vespalib::stringref space) {
+ BucketspacesConfigBuilder::Documenttype dt;
+ dt.name = name;
+ dt.bucketspace = space;
+ return dt;
+}
+
+}
+
+void CommunicationManagerTest::bucket_space_config_can_be_updated_live() {
+ CommunicationManagerFixture f;
+ BucketspacesConfigBuilder config;
+ config.documenttype.emplace_back(doc_type("foo", "default"));
+ config.documenttype.emplace_back(doc_type("bar", "global"));
+ f.comm_mgr->updateBucketSpacesConfig(config);
+
+ f.comm_mgr->handleMessage(f.documentapi_message_for_space("bar"));
+ f.comm_mgr->handleMessage(f.documentapi_message_for_space("foo"));
+ f.bottom_link->waitForMessages(2, MESSAGE_WAIT_TIME_SEC);
+
+ auto cmd1 = f.bottom_link->getCommand(0);
+ CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::global_space(), cmd1->getBucket().getBucketSpace());
+
+ auto cmd2 = f.bottom_link->getCommand(1);
+ CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::default_space(), cmd2->getBucket().getBucketSpace());
+
+ config.documenttype[1] = doc_type("bar", "default");
+ f.comm_mgr->updateBucketSpacesConfig(config);
+ f.comm_mgr->handleMessage(f.documentapi_message_for_space("bar"));
+ f.bottom_link->waitForMessages(3, MESSAGE_WAIT_TIME_SEC);
+
+ auto cmd3 = f.bottom_link->getCommand(2);
+ CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::default_space(), cmd3->getBucket().getBucketSpace());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue());
+}
+
+void CommunicationManagerTest::unmapped_bucket_space_documentapi_request_returns_error_reply() {
+ CommunicationManagerFixture f;
+
+ BucketspacesConfigBuilder config;
+ config.documenttype.emplace_back(doc_type("foo", "default"));
+ f.comm_mgr->updateBucketSpacesConfig(config);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue());
+
+ f.comm_mgr->handleMessage(f.documentapi_message_for_space("fluff"));
+ CPPUNIT_ASSERT_EQUAL(size_t(1), f.reply_handler.replies.size());
+ auto& reply = *f.reply_handler.replies[0];
+ CPPUNIT_ASSERT(reply.hasErrors());
+ CPPUNIT_ASSERT_EQUAL(static_cast<uint32_t>(api::ReturnCode::REJECTED), reply.getError(0).getCode());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(1), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue());
+}
+
} // storage
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 13d4dfa5e1b..4cf4839319f 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -10,6 +10,7 @@
#include <vespa/storage/common/bucket_resolver.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storage/storageserver/configurable_bucket_resolver.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -17,6 +18,8 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/log/bufferedlogger.h>
+#include <vespa/persistence/spi/fixed_bucket_spaces.h>
+
LOG_SETUP(".communication.manager");
using vespalib::make_string;
@@ -137,8 +140,13 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg)
assert(docMsgPtr.get());
- std::unique_ptr<api::StorageCommand> cmd(
- _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentMessage&>(*docMsgPtr), _component.getTypeRepo()));
+ std::unique_ptr<api::StorageCommand> cmd;
+ try {
+ cmd = _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentMessage&>(*docMsgPtr), _component.getTypeRepo());
+ } catch (spi::UnknownBucketSpaceException& e) {
+ fail_with_unresolvable_bucket_space(std::move(docMsgPtr), e.getMessage());
+ return;
+ }
if (!cmd.get()) {
LOGBM(warning, "Unsupported message: StorageApi could not convert message of type %d to a storageapi message",
@@ -262,6 +270,19 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply)
}
}
+void CommunicationManager::fail_with_unresolvable_bucket_space(
+ std::unique_ptr<documentapi::DocumentMessage> msg,
+ const vespalib::string& error_message)
+{
+ LOG(debug, "Could not map DocumentAPI message to internal bucket: %s", error_message.c_str());
+ MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Failing message as its document type has no known bucket space mapping");
+ std::unique_ptr<mbus::Reply> reply(new mbus::EmptyReply());
+ reply->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_REJECTED, error_message));
+ msg->swapState(*reply);
+ _metrics.bucketSpaceMappingFailures.inc();
+ _messageBusSession->reply(std::move(reply));
+}
+
namespace {
struct PlaceHolderBucketResolver : public BucketResolver {
@@ -320,9 +341,9 @@ CommunicationManager::~CommunicationManager()
onClose();
}
- _sourceSession.reset(0);
- _messageBusSession.reset(0);
- _mbus.reset(0);
+ _sourceSession.reset();
+ _messageBusSession.reset();
+ _mbus.reset();
// Clear map of sent messages _before_ we delete any visitor threads to
// avoid any issues where unloading shared libraries causes messages
@@ -336,12 +357,12 @@ CommunicationManager::~CommunicationManager()
void CommunicationManager::onClose()
{
// Avoid getting config during shutdown
- _configFetcher.reset(0);
+ _configFetcher.reset();
_closed = true;
- if (_mbus.get()) {
- if (_messageBusSession.get()) {
+ if (_mbus) {
+ if (_messageBusSession) {
_messageBusSession->close();
}
}
@@ -352,11 +373,11 @@ void CommunicationManager::onClose()
// Stopping pumper thread should stop all incoming messages from being
// processed.
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interrupt();
_eventQueue.signal();
_thread->join();
- _thread.reset(0);
+ _thread.reset();
}
// Emptying remaining queued messages
@@ -773,4 +794,8 @@ void CommunicationManager::updateMessagebusProtocol(
}
}
+void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& config) {
+ _docApiConverter.setBucketResolver(ConfigurableBucketResolver::from_config(config));
+}
+
} // storage
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index b4508fbc9f9..e2c890bfc9b 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -27,6 +27,7 @@
#include <queue>
#include <atomic>
#include <mutex>
+#include <vespa/config-bucketspaces.h>
namespace mbus {
class RPCMessageBus;
@@ -150,10 +151,13 @@ private:
void process(const std::shared_ptr<api::StorageMessage>& msg);
using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig;
+ using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
void receiveStorageReply(const std::shared_ptr<api::StorageReply>&);
+ void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg,
+ const vespalib::string& error_message);
void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription,
bool includeDiskDescription, bool useOldFormat) const;
@@ -207,6 +211,9 @@ public:
void handleReply(std::unique_ptr<mbus::Reply> msg) override;
void updateMessagebusProtocol(const document::DocumentTypeRepo::SP &repo);
+ void updateBucketSpacesConfig(const BucketspacesConfig&);
+
+ const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; }
};
} // storage
diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp
index 30be5f0fafc..5d2caddb200 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp
@@ -21,12 +21,14 @@ CommunicationManagerMetrics::CommunicationManagerMetrics(const LoadTypeSet& load
failedDueToTooLittleMemory("toolittlememory", "", "Number of messages failed due to too little memory available", this),
convertToStorageAPIFailures("convertfailures", "",
"Number of messages that failed to get converted to storage API messages", this),
+ bucketSpaceMappingFailures("bucket_space_mapping_failures", "",
+ "Number of messages that could not be resolved to a known bucket space", this),
sendCommandLatency("sendcommandlatency", "", "Average ms used to send commands to MBUS", this),
sendReplyLatency("sendreplylatency", "", "Average ms used to send replies to MBUS", this)
{
}
-CommunicationManagerMetrics::~CommunicationManagerMetrics() { }
+CommunicationManagerMetrics::~CommunicationManagerMetrics() = default;
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
index d039755cfaa..2a2b8bdb5c1 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h
@@ -18,6 +18,7 @@ struct CommunicationManagerMetrics : public metrics::MetricSet {
metrics::LoadMetric<metrics::DoubleAverageMetric> exceptionMessageProcessTime;
metrics::LongCountMetric failedDueToTooLittleMemory;
metrics::LongCountMetric convertToStorageAPIFailures;
+ metrics::LongCountMetric bucketSpaceMappingFailures;
metrics::DoubleAverageMetric sendCommandLatency;
metrics::DoubleAverageMetric sendReplyLatency;
diff --git a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp
index 86c802a65cf..9d1ea2d00b5 100644
--- a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp
+++ b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp
@@ -13,7 +13,7 @@ document::Bucket ConfigurableBucketResolver::bucketFromId(const document::Docume
return document::Bucket(iter->second, document::BucketId(0));
}
throw spi::UnknownBucketSpaceException("Unknown bucket space mapping for document type '"
- + id.getDocType() + "' in id: " + id.toString(), VESPA_STRLOC);
+ + id.getDocType() + "' in id: '" + id.toString() + "'", VESPA_STRLOC);
}
document::BucketSpace ConfigurableBucketResolver::bucketSpaceFromName(const vespalib::string& name) const {
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index d60f46e5a07..8dd532b537c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -76,6 +76,7 @@ StorageNode::StorageNode(
std::unique_ptr<HostInfo> hostInfo,
RunMode mode)
: _singleThreadedDebugMode(mode == SINGLE_THREADED_TEST_MODE),
+ _has_enabled_global_spaces(false),
_configFetcher(),
_hostInfo(std::move(hostInfo)),
_context(context),
@@ -281,7 +282,7 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
- _serverConfig.reset(new StorServerConfig(oldC));
+ _serverConfig = std::make_unique<StorServerConfig>(oldC);
_newServerConfig.reset();
(void)updated;
}
@@ -324,11 +325,11 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
StorDistributionConfig::getDiskDistributionName(newC.diskDistribution).c_str());
ASSIGN(diskDistribution);
}
- _distributionConfig.reset(new StorDistributionConfig(oldC));
+ _distributionConfig = std::make_unique<StorDistributionConfig>(oldC);
_newDistributionConfig.reset();
if (updated) {
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC));
- for (StorageLink* link = _chain.get(); link != 0; link = link->getNextLink()) {
+ for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) {
link->storageDistributionChanged();
}
}
@@ -347,7 +348,12 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
if (_newBucketSpacesConfig) {
_bucketSpacesConfig = std::move(_newBucketSpacesConfig);
_context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
- // TODO: Add new bucket space resolver to document api converter
+ // If we've seen global bucket spaces enabled once, we must continue to update
+ // bucket spaces config or we'll get out of sync with doc types config.
+ _has_enabled_global_spaces = _has_enabled_global_spaces || _bucketSpacesConfig->enableMultipleBucketSpaces;
+ if (_has_enabled_global_spaces) {
+ _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
+ }
}
}
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 38a48ac2eed..e9070f17670 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -104,6 +104,7 @@ protected:
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
private:
bool _singleThreadedDebugMode;
+ bool _has_enabled_global_spaces;
// Subscriptions to config
std::unique_ptr<config::ConfigFetcher> _configFetcher;