diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-20 21:02:13 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-20 21:02:13 +0000 |
commit | 061f32baaa4e5800ce94052c671bdbd155c455f8 (patch) | |
tree | f7d95eb5f3e65d2b7ac5c2b48c42d6cc85c3ad60 /storage | |
parent | ae98a177c07c1e265a40f9964d6556c58542207d (diff) |
Add generation counter to StorageComponent to detect if repos have changed and need reload.
Happy path where you do not need any reload is a single atomic relaxed load as opposed to at least 3 atomic locked instructions.
Diffstat (limited to 'storage')
9 files changed, 52 insertions, 25 deletions
diff --git a/storage/src/vespa/storage/common/storagecomponent.cpp b/storage/src/vespa/storage/common/storagecomponent.cpp index 3846fe3a9c0..c5194f909c0 100644 --- a/storage/src/vespa/storage/common/storagecomponent.cpp +++ b/storage/src/vespa/storage/common/storagecomponent.cpp @@ -36,6 +36,7 @@ StorageComponent::setDocumentTypeRepo(std::shared_ptr<const document::DocumentTy auto repo = std::make_shared<Repos>(std::move(docTypeRepo)); std::lock_guard guard(_lock); _repos = std::move(repo); + _generation++; } void @@ -43,6 +44,7 @@ StorageComponent::setLoadTypes(LoadTypeSetSP loadTypes) { std::lock_guard guard(_lock); _loadTypes = loadTypes; + _generation++; } @@ -65,6 +67,7 @@ StorageComponent::setDistribution(DistributionSP distribution) { std::lock_guard guard(_lock); _distribution = distribution; + _generation++; } void @@ -90,7 +93,8 @@ StorageComponent::StorageComponent(StorageComponentRegister& compReg, _bucketIdFactory(), _distribution(), _nodeStateUpdater(nullptr), - _lock() + _lock(), + _generation(0) { compReg.registerStorageComponent(*this); } diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h index f5796ebb49b..b33501e9dd1 100644 --- a/storage/src/vespa/storage/common/storagecomponent.h +++ b/storage/src/vespa/storage/common/storagecomponent.h @@ -104,7 +104,7 @@ public: uint8_t getPriority(const documentapi::LoadType&) const; DistributionSP getDistribution() const; NodeStateUpdater& getStateUpdater() const; - + uint64_t getGeneration() const { return _generation.load(std::memory_order_relaxed); } private: vespalib::string _clusterName; const lib::NodeType* _nodeType; @@ -117,6 +117,7 @@ private: DistributionSP _distribution; NodeStateUpdater* _nodeStateUpdater; mutable std::mutex _lock; + std::atomic<uint64_t> _generation; }; struct StorageComponentRegister : public virtual framework::ComponentRegister diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 3269a05b3c2..188523af38d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -109,11 +109,6 @@ selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerTyp } } -vespalib::string -createThreadName(size_t stripeId) { - return fmt("PersistenceThread-%zu", stripeId); -} - #ifdef __PIC__ #define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec"))) #else @@ -170,8 +165,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC assert(_sequencedExecutor); LOG(spam, "Setting up the disk"); for (uint32_t i = 0; i < numThreads; i++) { - _persistenceComponents.push_back(std::make_unique<ServiceLayerComponent>(_compReg, createThreadName(i))); - _threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(*_persistenceComponents.back()), + _threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(_component), *_filestorHandler, i % numStripes, _component)); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 7c952d7f82f..ee66bc7d77c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -57,7 +57,6 @@ class FileStorManager : public StorageLinkQueued, DoneInitializeHandler& _init_handler; const document::BucketIdFactory & _bucketIdFactory; - std::vector<std::unique_ptr<ServiceLayerComponent>> _persistenceComponents; std::vector<std::unique_ptr<PersistenceHandler>> _persistenceHandlers; std::vector<std::unique_ptr<DiskThread>> _threads; std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 16a23b5f5a7..4fe7333fb5f 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -423,7 +423,7 @@ MergeHandler::fetchLocalData( } document::BucketIdFactory idFactory; - const auto repo = _env._component.getTypeRepo()->documentTypeRepo; + const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo(); for (const auto& entry_ptr : entries) { const auto& docEntry = *entry_ptr; @@ -458,7 +458,7 @@ MergeHandler::fetchLocalData( e.toString().c_str(), docEntry.toString().c_str()); } } - e._repo = repo.get(); + e._repo = &repo; } for (auto& e : diff) { @@ -540,8 +540,7 @@ MergeHandler::applyDiffLocally( std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); - std::shared_ptr<const document::DocumentTypeRepo> repo(_env._component.getTypeRepo()->documentTypeRepo); - assert(repo); + const document::DocumentTypeRepo & repo = _env.getDocumentTypeRepo(); uint32_t existingCount = entries.size(); uint32_t i = 0, j = 0; @@ -575,7 +574,7 @@ MergeHandler::applyDiffLocally( ++i; LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(bucket, e, context, *repo); + applyDiffEntry(bucket, e, context, repo); } else { assert(spi::Timestamp(e._entry._timestamp) == existing.getTimestamp()); // Diffing for existing timestamp; should either both be put @@ -588,7 +587,7 @@ MergeHandler::applyDiffLocally( "timestamp in %s. Diff slot: %s. Existing slot: %s", bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); - applyDiffEntry(bucket, e, context, *repo); + applyDiffEntry(bucket, e, context, repo); } else { // Duplicate put, just ignore it. LOG(debug, "During diff apply, attempting to add slot " @@ -620,7 +619,7 @@ MergeHandler::applyDiffLocally( LOG(spam, "ApplyBucketDiff(%s): Adding slot %s", bucket.toString().c_str(), e.toString().c_str()); - applyDiffEntry(bucket, e, context, *repo); + applyDiffEntry(bucket, e, context, repo); byteCount += e._headerBlob.size() + e._bodyBlob.size(); } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index a8b8cfa1f8c..42d67573763 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -165,7 +165,9 @@ PersistenceUtil::PersistenceUtil(const ServiceLayerComponent& component, FileSto _metrics(metrics), _nodeIndex(component.getIndex()), _bucketIdFactory(component.getBucketIdFactory()), - _spi(provider) + _spi(provider), + _lastGeneration(0), + _repos() { } @@ -284,4 +286,13 @@ PersistenceUtil::getBucket(const document::DocumentId& id, const document::Bucke return spi::Bucket(bucket); } +void +PersistenceUtil::reloadComponent() const { + // Thread safe as it is only called from the same thread + while (componentHasChanged()) { + _lastGeneration = _component.getGeneration(); + _repos = _component.getTypeRepo(); + } +} + } // storage diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 2dbd7b2a263..8eeea6dddd2 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -114,6 +114,18 @@ public: void updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& info) const; LockResult lockAndGetDisk(const document::Bucket &bucket, StorBucketDatabase::Flag flags = StorBucketDatabase::NONE); api::BucketInfo getBucketInfo(const document::Bucket &bucket) const; + const document::DocumentTypeRepo & getDocumentTypeRepo() const { + if (componentHasChanged()) { + reloadComponent(); + } + return *_repos->documentTypeRepo; + } + const document::FieldSetRepo & getFieldSetRepo() const { + if (componentHasChanged()) { + reloadComponent(); + } + return *_repos->fieldSetRepo; + } static api::BucketInfo convertBucketInfo(const spi::BucketInfo&); static uint32_t convertErrorCode(const spi::Result& response); @@ -123,8 +135,15 @@ public: FileStorThreadMetrics &_metrics; // Needs a better solution for speed and thread safety uint16_t _nodeIndex; private: - const document::BucketIdFactory &_bucketIdFactory; - spi::PersistenceProvider &_spi; + bool componentHasChanged() const { + return _lastGeneration != _component.getGeneration(); + } + void reloadComponent() const; + + const document::BucketIdFactory &_bucketIdFactory; + spi::PersistenceProvider &_spi; + mutable uint64_t _lastGeneration; + mutable std::shared_ptr<StorageComponent::Repos> _repos; }; } // storage diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index d3acf3b2c81..3d5a4efc1af 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -67,7 +67,7 @@ SimpleMessageHandler::handleGet(api::GetCommand& cmd, MessageTracker::UP tracker tracker->setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFieldSet(), *tracker); + auto fieldSet = getFieldSet(_env.getFieldSetRepo(), cmd.getFieldSet(), *tracker); if ( ! fieldSet) { return tracker; } tracker->context().setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); @@ -224,7 +224,7 @@ MessageTracker::UP SimpleMessageHandler::handleCreateIterator(CreateIteratorCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.createIterator); - auto fieldSet = getFieldSet(*_env._component.getTypeRepo()->fieldSetRepo, cmd.getFields(), *tracker); + auto fieldSet = getFieldSet(_env.getFieldSetRepo(), cmd.getFields(), *tracker); if ( ! fieldSet) { return tracker; } tracker->context().setReadConsistency(cmd.getReadConsistency()); diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp index 77eddd3bd0d..66b8dbd83fc 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.cpp +++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp @@ -49,9 +49,9 @@ TestAndSetHelper::TestAndSetHelper(const PersistenceUtil & env, const spi::Persi _docTypePtr(_cmd.getDocumentType()), _missingDocumentImpliesMatch(missingDocumentImpliesMatch) { - const auto _repo = _env._component.getTypeRepo()->documentTypeRepo; - resolveDocumentType(*_repo); - parseDocumentSelection(*_repo, bucketFactory); + const auto & repo = _env.getDocumentTypeRepo(); + resolveDocumentType(repo); + parseDocumentSelection(repo, bucketFactory); } TestAndSetHelper::~TestAndSetHelper() = default; |