diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-11 08:20:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-11 08:20:40 +0100 |
commit | 97d17bbdd0870c6a448a446915ffd908b3096d95 (patch) | |
tree | 6844ab71423421c68f49a1968bd12cf5118b9d07 /storage | |
parent | c2e5bef487b650d7184f8b392c96e33905936623 (diff) | |
parent | 019c73433cc1f72197e3cbb2a9821d6633f4e66c (diff) |
Merge pull request #4585 from vespa-engine/vekterli/wire-configurable-bucket-space-resolver-into-communication-manager-rebased
Wire in bucket spaces config changes to dynamic bucket space resolving (rebased)
Diffstat (limited to 'storage')
8 files changed, 164 insertions, 16 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index cd5dd2a01a4..e7e08f28ce3 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -6,11 +6,14 @@ #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/persistence/spi/fixed_bucket_spaces.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h> #include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vespalib/util/stringfmt.h> using document::test::makeDocumentBucket; @@ -22,6 +25,8 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { void testStorPendingLimitConfigsArePropagatedToMessageBus(); void testCommandsAreDequeuedInPriorityOrder(); void testRepliesAreDequeuedInFifoOrder(); + void bucket_space_config_can_be_updated_live(); + void unmapped_bucket_space_documentapi_request_returns_error_reply(); static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60; @@ -44,6 +49,8 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus); CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder); CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder); + CPPUNIT_TEST(bucket_space_config_can_be_updated_live); + CPPUNIT_TEST(unmapped_bucket_space_documentapi_request_returns_error_reply); CPPUNIT_TEST_SUITE_END(); }; @@ -232,4 +239,103 @@ CommunicationManagerTest::testRepliesAreDequeuedInFifoOrder() } } +struct MockMbusReplyHandler : mbus::IReplyHandler { + std::vector<std::unique_ptr<mbus::Reply>> replies; + + void handleReply(std::unique_ptr<mbus::Reply> msg) override { + replies.emplace_back(std::move(msg)); + } +}; + +struct CommunicationManagerFixture { + MockMbusReplyHandler reply_handler; + mbus::Slobrok slobrok; + std::unique_ptr<TestServiceLayerApp> node; + std::unique_ptr<CommunicationManager> comm_mgr; + DummyStorageLink* bottom_link; + + CommunicationManagerFixture() { + vdstestlib::DirConfig stor_config(getStandardConfig(true)); + stor_config.getConfig("stor-server").set("node_index", "1"); + addSlobrokConfig(stor_config, slobrok); + + node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId()); + comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), + stor_config.getConfigId()); + bottom_link = new DummyStorageLink(); + comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link)); + comm_mgr->open(); + } + ~CommunicationManagerFixture(); + + std::unique_ptr<documentapi::GetDocumentMessage> documentapi_message_for_space(const char* space) { + auto cmd = std::make_unique<documentapi::GetDocumentMessage>( + document::DocumentId(vespalib::make_string("id::%s::stuff", space))); + // Bind reply handling to our own mock handler + cmd->pushHandler(reply_handler); + return cmd; + } +}; + +CommunicationManagerFixture::~CommunicationManagerFixture() = default; + +using vespa::config::content::core::BucketspacesConfigBuilder; + +namespace { + +BucketspacesConfigBuilder::Documenttype doc_type(vespalib::stringref name, vespalib::stringref space) { + BucketspacesConfigBuilder::Documenttype dt; + dt.name = name; + dt.bucketspace = space; + return dt; +} + +} + +void CommunicationManagerTest::bucket_space_config_can_be_updated_live() { + CommunicationManagerFixture f; + BucketspacesConfigBuilder config; + config.documenttype.emplace_back(doc_type("foo", "default")); + config.documenttype.emplace_back(doc_type("bar", "global")); + f.comm_mgr->updateBucketSpacesConfig(config); + + f.comm_mgr->handleMessage(f.documentapi_message_for_space("bar")); + f.comm_mgr->handleMessage(f.documentapi_message_for_space("foo")); + f.bottom_link->waitForMessages(2, MESSAGE_WAIT_TIME_SEC); + + auto cmd1 = f.bottom_link->getCommand(0); + CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::global_space(), cmd1->getBucket().getBucketSpace()); + + auto cmd2 = f.bottom_link->getCommand(1); + CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::default_space(), cmd2->getBucket().getBucketSpace()); + + config.documenttype[1] = doc_type("bar", "default"); + f.comm_mgr->updateBucketSpacesConfig(config); + f.comm_mgr->handleMessage(f.documentapi_message_for_space("bar")); + f.bottom_link->waitForMessages(3, MESSAGE_WAIT_TIME_SEC); + + auto cmd3 = f.bottom_link->getCommand(2); + CPPUNIT_ASSERT_EQUAL(spi::FixedBucketSpaces::default_space(), cmd3->getBucket().getBucketSpace()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); +} + +void CommunicationManagerTest::unmapped_bucket_space_documentapi_request_returns_error_reply() { + CommunicationManagerFixture f; + + BucketspacesConfigBuilder config; + config.documenttype.emplace_back(doc_type("foo", "default")); + f.comm_mgr->updateBucketSpacesConfig(config); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); + + f.comm_mgr->handleMessage(f.documentapi_message_for_space("fluff")); + CPPUNIT_ASSERT_EQUAL(size_t(1), f.reply_handler.replies.size()); + auto& reply = *f.reply_handler.replies[0]; + CPPUNIT_ASSERT(reply.hasErrors()); + CPPUNIT_ASSERT_EQUAL(static_cast<uint32_t>(api::ReturnCode::REJECTED), reply.getError(0).getCode()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(1), f.comm_mgr->metrics().bucketSpaceMappingFailures.getValue()); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 13d4dfa5e1b..4cf4839319f 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -10,6 +10,7 @@ #include <vespa/storage/common/bucket_resolver.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/config/config-stor-server.h> +#include <vespa/storage/storageserver/configurable_bucket_resolver.h> #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -17,6 +18,8 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/bufferedlogger.h> +#include <vespa/persistence/spi/fixed_bucket_spaces.h> + LOG_SETUP(".communication.manager"); using vespalib::make_string; @@ -137,8 +140,13 @@ CommunicationManager::handleMessage(std::unique_ptr<mbus::Message> msg) assert(docMsgPtr.get()); - std::unique_ptr<api::StorageCommand> cmd( - _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentMessage&>(*docMsgPtr), _component.getTypeRepo())); + std::unique_ptr<api::StorageCommand> cmd; + try { + cmd = _docApiConverter.toStorageAPI(static_cast<documentapi::DocumentMessage&>(*docMsgPtr), _component.getTypeRepo()); + } catch (spi::UnknownBucketSpaceException& e) { + fail_with_unresolvable_bucket_space(std::move(docMsgPtr), e.getMessage()); + return; + } if (!cmd.get()) { LOGBM(warning, "Unsupported message: StorageApi could not convert message of type %d to a storageapi message", @@ -262,6 +270,19 @@ CommunicationManager::handleReply(std::unique_ptr<mbus::Reply> reply) } } +void CommunicationManager::fail_with_unresolvable_bucket_space( + std::unique_ptr<documentapi::DocumentMessage> msg, + const vespalib::string& error_message) +{ + LOG(debug, "Could not map DocumentAPI message to internal bucket: %s", error_message.c_str()); + MBUS_TRACE(msg->getTrace(), 6, "Communication manager: Failing message as its document type has no known bucket space mapping"); + std::unique_ptr<mbus::Reply> reply(new mbus::EmptyReply()); + reply->addError(mbus::Error(documentapi::DocumentProtocol::ERROR_REJECTED, error_message)); + msg->swapState(*reply); + _metrics.bucketSpaceMappingFailures.inc(); + _messageBusSession->reply(std::move(reply)); +} + namespace { struct PlaceHolderBucketResolver : public BucketResolver { @@ -320,9 +341,9 @@ CommunicationManager::~CommunicationManager() onClose(); } - _sourceSession.reset(0); - _messageBusSession.reset(0); - _mbus.reset(0); + _sourceSession.reset(); + _messageBusSession.reset(); + _mbus.reset(); // Clear map of sent messages _before_ we delete any visitor threads to // avoid any issues where unloading shared libraries causes messages @@ -336,12 +357,12 @@ CommunicationManager::~CommunicationManager() void CommunicationManager::onClose() { // Avoid getting config during shutdown - _configFetcher.reset(0); + _configFetcher.reset(); _closed = true; - if (_mbus.get()) { - if (_messageBusSession.get()) { + if (_mbus) { + if (_messageBusSession) { _messageBusSession->close(); } } @@ -352,11 +373,11 @@ void CommunicationManager::onClose() // Stopping pumper thread should stop all incoming messages from being // processed. - if (_thread.get() != 0) { + if (_thread) { _thread->interrupt(); _eventQueue.signal(); _thread->join(); - _thread.reset(0); + _thread.reset(); } // Emptying remaining queued messages @@ -773,4 +794,8 @@ void CommunicationManager::updateMessagebusProtocol( } } +void CommunicationManager::updateBucketSpacesConfig(const BucketspacesConfig& config) { + _docApiConverter.setBucketResolver(ConfigurableBucketResolver::from_config(config)); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index b4508fbc9f9..e2c890bfc9b 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -27,6 +27,7 @@ #include <queue> #include <atomic> #include <mutex> +#include <vespa/config-bucketspaces.h> namespace mbus { class RPCMessageBus; @@ -150,10 +151,13 @@ private: void process(const std::shared_ptr<api::StorageMessage>& msg); using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; + using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); void configure(std::unique_ptr<CommunicationManagerConfig> config) override; void receiveStorageReply(const std::shared_ptr<api::StorageReply>&); + void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg, + const vespalib::string& error_message); void serializeNodeState(const api::GetNodeStateReply& gns, std::ostream& os, bool includeDescription, bool includeDiskDescription, bool useOldFormat) const; @@ -207,6 +211,9 @@ public: void handleReply(std::unique_ptr<mbus::Reply> msg) override; void updateMessagebusProtocol(const document::DocumentTypeRepo::SP &repo); + void updateBucketSpacesConfig(const BucketspacesConfig&); + + const CommunicationManagerMetrics& metrics() const noexcept { return _metrics; } }; } // storage diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp index 30be5f0fafc..5d2caddb200 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp @@ -21,12 +21,14 @@ CommunicationManagerMetrics::CommunicationManagerMetrics(const LoadTypeSet& load failedDueToTooLittleMemory("toolittlememory", "", "Number of messages failed due to too little memory available", this), convertToStorageAPIFailures("convertfailures", "", "Number of messages that failed to get converted to storage API messages", this), + bucketSpaceMappingFailures("bucket_space_mapping_failures", "", + "Number of messages that could not be resolved to a known bucket space", this), sendCommandLatency("sendcommandlatency", "", "Average ms used to send commands to MBUS", this), sendReplyLatency("sendreplylatency", "", "Average ms used to send replies to MBUS", this) { } -CommunicationManagerMetrics::~CommunicationManagerMetrics() { } +CommunicationManagerMetrics::~CommunicationManagerMetrics() = default; } diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h index d039755cfaa..2a2b8bdb5c1 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h @@ -18,6 +18,7 @@ struct CommunicationManagerMetrics : public metrics::MetricSet { metrics::LoadMetric<metrics::DoubleAverageMetric> exceptionMessageProcessTime; metrics::LongCountMetric failedDueToTooLittleMemory; metrics::LongCountMetric convertToStorageAPIFailures; + metrics::LongCountMetric bucketSpaceMappingFailures; metrics::DoubleAverageMetric sendCommandLatency; metrics::DoubleAverageMetric sendReplyLatency; diff --git a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp index 86c802a65cf..9d1ea2d00b5 100644 --- a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp @@ -13,7 +13,7 @@ document::Bucket ConfigurableBucketResolver::bucketFromId(const document::Docume return document::Bucket(iter->second, document::BucketId(0)); } throw spi::UnknownBucketSpaceException("Unknown bucket space mapping for document type '" - + id.getDocType() + "' in id: " + id.toString(), VESPA_STRLOC); + + id.getDocType() + "' in id: '" + id.toString() + "'", VESPA_STRLOC); } document::BucketSpace ConfigurableBucketResolver::bucketSpaceFromName(const vespalib::string& name) const { diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index d60f46e5a07..8dd532b537c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -76,6 +76,7 @@ StorageNode::StorageNode( std::unique_ptr<HostInfo> hostInfo, RunMode mode) : _singleThreadedDebugMode(mode == SINGLE_THREADED_TEST_MODE), + _has_enabled_global_spaces(false), _configFetcher(), _hostInfo(std::move(hostInfo)), _context(context), @@ -281,7 +282,7 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); - _serverConfig.reset(new StorServerConfig(oldC)); + _serverConfig = std::make_unique<StorServerConfig>(oldC); _newServerConfig.reset(); (void)updated; } @@ -324,11 +325,11 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) StorDistributionConfig::getDiskDistributionName(newC.diskDistribution).c_str()); ASSIGN(diskDistribution); } - _distributionConfig.reset(new StorDistributionConfig(oldC)); + _distributionConfig = std::make_unique<StorDistributionConfig>(oldC); _newDistributionConfig.reset(); if (updated) { _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC)); - for (StorageLink* link = _chain.get(); link != 0; link = link->getNextLink()) { + for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) { link->storageDistributionChanged(); } } @@ -347,7 +348,12 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) if (_newBucketSpacesConfig) { _bucketSpacesConfig = std::move(_newBucketSpacesConfig); _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); - // TODO: Add new bucket space resolver to document api converter + // If we've seen global bucket spaces enabled once, we must continue to update + // bucket spaces config or we'll get out of sync with doc types config. + _has_enabled_global_spaces = _has_enabled_global_spaces || _bucketSpacesConfig->enableMultipleBucketSpaces; + if (_has_enabled_global_spaces) { + _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); + } } } diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 38a48ac2eed..e9070f17670 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -104,6 +104,7 @@ protected: using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; private: bool _singleThreadedDebugMode; + bool _has_enabled_global_spaces; // Subscriptions to config std::unique_ptr<config::ConfigFetcher> _configFetcher; |