diff options
Diffstat (limited to 'storage/src')
12 files changed, 74 insertions, 69 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index d12e6b90b2a..829a080ad01 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -723,6 +723,7 @@ public: auto createUpdateCommand(const document::BucketId& bucket) const { auto update = std::make_shared<document::DocumentUpdate>( + _self._node->getTestDocMan().getTypeRepo(), *_self._node->getTestDocMan().getTypeRepo() .getDocumentType("testdoctype1"), document::DocumentId("id:foo:testdoctype1::bar2")); diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 81b0293b0c0..54aca78d13d 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -200,6 +200,7 @@ std::shared_ptr<api::UpdateCommand> ExternalOperationHandlerTest::makeUpdateComm const vespalib::string& doc_type, const vespalib::string& id) const { auto update = std::make_shared<document::DocumentUpdate>( + _testDocMan.getTypeRepo(), *_testDocMan.getTypeRepo().getDocumentType(doc_type), document::DocumentId(id)); return std::make_shared<api::UpdateCommand>( diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 28602124045..ea2cc00c642 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -294,7 +294,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, document::DocumentUpdate::SP update; if (!options._withError) { update = std::make_shared<document::DocumentUpdate>( - *_doc_type, + *_repo, *_doc_type, document::DocumentId(document::DocIdString("test", "test"))); document::FieldUpdate fup(_doc_type->getField("headerval")); fup.addUpdate(ArithmeticValueUpdate(ArithmeticValueUpdate::Add, 10)); @@ -304,7 +304,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, // part of the Get. Just a sneaky way to force an eval error. auto* badDocType = _repo->getDocumentType("testdoctype2"); update = std::make_shared<document::DocumentUpdate>( - *badDocType, + *_repo, *badDocType, document::DocumentId(document::DocIdString("test", "test"))); document::FieldUpdate fup(badDocType->getField("onlyinchild")); fup.addUpdate(ArithmeticValueUpdate(ArithmeticValueUpdate::Add, 10)); diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 92474728957..9ce862f5db8 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -75,8 +75,7 @@ std::shared_ptr<UpdateOperation> UpdateOperation_Test::sendUpdate(const std::string& bucketState) { document::DocumentUpdate::SP update( - new document::DocumentUpdate( - *_html_type, + new document::DocumentUpdate(*_repo, *_html_type, document::DocumentId(document::DocIdString("test", "test")))); _bId = getExternalOperationHandler().getBucketId(update->getId()); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 838df87662f..369e820f987 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -2725,6 +2725,7 @@ void FileStorManagerTest::update_command_size_is_added_to_metric() { document::BucketId bucket(16, 4000); createBucket(bucket, 0); auto update = std::make_shared<document::DocumentUpdate>( + _node->getTestDocMan().getTypeRepo(), _node->getTestDocMan().createRandomDocument()->getType(), document::DocumentId("id:foo:testdoctype1::bar")); auto cmd = std::make_shared<api::UpdateCommand>( diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index f15921e447d..368eae104e8 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -223,16 +223,11 @@ PersistenceTestUtils::doGetOnDisk( } document::DocumentUpdate::SP -PersistenceTestUtils::createBodyUpdate( - const document::DocumentId& docId, - const document::FieldValue& updateValue) +PersistenceTestUtils::createBodyUpdate(const document::DocumentId& docId, const document::FieldValue& updateValue) { - const DocumentType* docType(_env->_component.getTypeRepo() - ->getDocumentType("testdoctype1")); - document::DocumentUpdate::SP update( - new document::DocumentUpdate(*docType, docId)); - std::shared_ptr<document::AssignValueUpdate> assignUpdate( - new document::AssignValueUpdate(updateValue)); + const DocumentType* docType(_env->_component.getTypeRepo()->getDocumentType("testdoctype1")); + document::DocumentUpdate::SP update(new document::DocumentUpdate(*_env->_component.getTypeRepo(), *docType, docId)); + std::shared_ptr<document::AssignValueUpdate> assignUpdate(new document::AssignValueUpdate(updateValue)); document::FieldUpdate fieldUpdate(docType->getField("content")); fieldUpdate.addUpdate(*assignUpdate); update->addUpdate(fieldUpdate); @@ -240,16 +235,11 @@ PersistenceTestUtils::createBodyUpdate( } document::DocumentUpdate::SP -PersistenceTestUtils::createHeaderUpdate( - const document::DocumentId& docId, - const document::FieldValue& updateValue) +PersistenceTestUtils::createHeaderUpdate(const document::DocumentId& docId, const document::FieldValue& updateValue) { - const DocumentType* docType(_env->_component.getTypeRepo() - ->getDocumentType("testdoctype1")); - document::DocumentUpdate::SP update( - new document::DocumentUpdate(*docType, docId)); - std::shared_ptr<document::AssignValueUpdate> assignUpdate( - new document::AssignValueUpdate(updateValue)); + const DocumentType* docType(_env->_component.getTypeRepo()->getDocumentType("testdoctype1")); + document::DocumentUpdate::SP update(new document::DocumentUpdate(*_env->_component.getTypeRepo(), *docType, docId)); + std::shared_ptr<document::AssignValueUpdate> assignUpdate(new document::AssignValueUpdate(updateValue)); document::FieldUpdate fieldUpdate(docType->getField("headerval")); fieldUpdate.addUpdate(*assignUpdate); update->addUpdate(fieldUpdate); diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index c729df1e7eb..686e10ba5ef 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -189,7 +189,7 @@ std::unique_ptr<api::UpdateCommand> TestAndSetTest::conditional_update_test( { putTestDocument(matchingHeader, timestampOne); - auto docUpdate = std::make_shared<document::DocumentUpdate>(testDoc->getType(), testDocId); + auto docUpdate = std::make_shared<document::DocumentUpdate>(_env->_testDocMan.getTypeRepo(), testDoc->getType(), testDocId); auto fieldUpdate = document::FieldUpdate(testDoc->getField("content")); fieldUpdate.addUpdate(document::AssignValueUpdate(NEW_CONTENT)); docUpdate->addUpdate(fieldUpdate); diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 7df598bed97..98ad8761736 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -597,25 +597,19 @@ ChangedBucketOwnershipHandlerTest::testAbortOutdatedPutOperation() void ChangedBucketOwnershipHandlerTest::testAbortOutdatedUpdateCommand() { - const document::DocumentType* docType(_testDocRepo.getTypeRepo() - .getDocumentType("testdoctype1")); + const document::DocumentType* docType(_testDocRepo.getTypeRepo().getDocumentType("testdoctype1")); document::DocumentId docId("id:foo:testdoctype1::bar"); - document::DocumentUpdate::SP update( - std::make_shared<document::DocumentUpdate>(*docType, docId)); - CPPUNIT_ASSERT(changeAbortsMessage<api::UpdateCommand>( - getBucketToAbort(), update, api::Timestamp(1234))); - CPPUNIT_ASSERT(!changeAbortsMessage<api::UpdateCommand>( - getBucketToAllow(), update, api::Timestamp(1234))); + auto update(std::make_shared<document::DocumentUpdate>(_testDocRepo.getTypeRepo(), *docType, docId)); + CPPUNIT_ASSERT(changeAbortsMessage<api::UpdateCommand>(getBucketToAbort(), update, api::Timestamp(1234))); + CPPUNIT_ASSERT(!changeAbortsMessage<api::UpdateCommand>(getBucketToAllow(), update, api::Timestamp(1234))); } void ChangedBucketOwnershipHandlerTest::testAbortOutdatedRemoveCommand() { document::DocumentId docId("id:foo:testdoctype1::bar"); - CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>( - getBucketToAbort(), docId, api::Timestamp(1234))); - CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>( - getBucketToAllow(), docId, api::Timestamp(1234))); + CPPUNIT_ASSERT(changeAbortsMessage<api::RemoveCommand>(getBucketToAbort(), docId, api::Timestamp(1234))); + CPPUNIT_ASSERT(!changeAbortsMessage<api::RemoveCommand>(getBucketToAllow(), docId, api::Timestamp(1234))); } void diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index 695ae17c5d4..40d561bd589 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -181,7 +181,7 @@ void DocumentApiConverterTest::testForwardedPut() void DocumentApiConverterTest::testUpdate() { - auto update = std::make_shared<document::DocumentUpdate>(_html_type, defaultDocId); + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId); documentapi::UpdateDocumentMessage updateMsg(update); updateMsg.setOldTimestamp(1234); updateMsg.setNewTimestamp(5678); @@ -327,19 +327,19 @@ DocumentApiConverterTest::testBatchDocumentUpdate() { document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test1")); - auto update = std::make_shared<document::DocumentUpdate>(_html_type, docId); + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, docId); updates.push_back(update); } { document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test2")); - auto update = std::make_shared<document::DocumentUpdate>(_html_type, docId); + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, docId); updates.push_back(update); } { document::DocumentId docId(document::UserDocIdString("userdoc:test:1234:test3")); - auto update = std::make_shared<document::DocumentUpdate>(_html_type, docId); + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, docId); updates.push_back(update); } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index e29540de064..2a2a840dd4e 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -28,3 +28,16 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 + +## Number of threads for mbus threadpool +## Any value below 1 will be 1. +mbus.num_threads int default=4 + +## Enable to use above thread pool for encoding replies +## False will use network(fnet) thread +mbus.dispatch_on_encode bool default=true + +## Enable to use above thread pool for decoding replies +## False will use network(fnet) thread +## Todo: Change default once verified in large scale deployment. +mbus.dispatch_on_decode bool default=false diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 94a151bcdc1..65523b62c59 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -79,7 +79,7 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapp : _request(std::move(request)) { } -StorageTransportContext::~StorageTransportContext() { } +StorageTransportContext::~StorageTransportContext() = default; void CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) @@ -278,13 +278,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - virtual document::Bucket bucketFromId(const document::DocumentId &) const override { + document::Bucket bucketFromId(const document::DocumentId &) const override { return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); } - virtual document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - virtual vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -423,6 +423,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setSlobrokConfig(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); + params.setNumThreads(std::max(1, config->mbus.numThreads)); + params.setDispatchOnDecode(config->mbus.dispatchOnDecode); + params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 6bb2ca31ec1..0602d14e088 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -16,6 +16,8 @@ #include <fcntl.h> #include <vespa/log/log.h> +#include <vespa/config/print/configdatabuffer.h> + LOG_SETUP(".node.server"); using vespa::config::content::StorDistributionConfigBuilder; @@ -466,12 +468,24 @@ StorageNode::shutdown() LOG(debug, "Done shutting down node"); } -void StorageNode::configure(std::unique_ptr<StorServerConfig> config) -{ - // When we get config, we try to grab the config lock to ensure noone - // else is doing configuration work, and then we write the new config - // to a variable where we can find it later when processing config - // updates +namespace { + +void log_config_received(const config::ConfigInstance& cfg) { + if (LOG_WOULD_LOG(debug)) { + config::ConfigDataBuffer buf; + cfg.serialize(buf); + LOG(debug, "Received new %s config: %s", cfg.defName().c_str(), buf.getEncodedString().c_str()); + } +} + +} + +void StorageNode::configure(std::unique_ptr<StorServerConfig> config) { + log_config_received(*config); + // When we get config, we try to grab the config lock to ensure noone + // else is doing configuration work, and then we write the new config + // to a variable where we can find it later when processing config + // updates { vespalib::LockGuard configLockGuard(_configLock); _newServerConfig = std::move(config); @@ -482,13 +496,8 @@ void StorageNode::configure(std::unique_ptr<StorServerConfig> config) } } -void -StorageNode::configure(std::unique_ptr<UpgradingConfig> config) -{ - // When we get config, we try to grab the config lock to ensure noone - // else is doing configuration work, and then we write the new config - // to a variable where we can find it later when processing config - // updates +void StorageNode::configure(std::unique_ptr<UpgradingConfig> config) { + log_config_received(*config); { vespalib::LockGuard configLockGuard(_configLock); _newClusterConfig = std::move(config); @@ -499,13 +508,8 @@ StorageNode::configure(std::unique_ptr<UpgradingConfig> config) } } -void -StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) -{ - // When we get config, we try to grab the config lock to ensure noone - // else is doing configuration work, and then we write the new config - // to a variable where we can find it later when processing config - // updates +void StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { + log_config_received(*config); { vespalib::LockGuard configLockGuard(_configLock); _newDistributionConfig = std::move(config); @@ -516,9 +520,8 @@ StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) } } -void -StorageNode::configure(std::unique_ptr<StorPrioritymappingConfig> config) -{ +void StorageNode::configure(std::unique_ptr<StorPrioritymappingConfig> config) { + log_config_received(*config); { vespalib::LockGuard configLockGuard(_configLock); _newPriorityConfig = std::move(config); @@ -533,6 +536,7 @@ void StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> config, bool hasChanged, int64_t generation) { + log_config_received(*config); (void) generation; if (!hasChanged) return; @@ -546,9 +550,8 @@ StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> config, } } -void -StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) -{ +void StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { + log_config_received(*config); { vespalib::LockGuard configLockGuard(_configLock); _newBucketSpacesConfig = std::move(config); |