diff options
63 files changed, 296 insertions, 234 deletions
diff --git a/metrics/src/vespa/metrics/metricvalueset.h b/metrics/src/vespa/metrics/metricvalueset.h index 82c01343472..bb2a7409ce7 100644 --- a/metrics/src/vespa/metrics/metricvalueset.h +++ b/metrics/src/vespa/metrics/metricvalueset.h @@ -68,7 +68,7 @@ public: ValueClass getValues() const; /** - * Get the current values from the metric. This function should not be + * Set the current values for the metric. This function should not be * called in parallel. Only call it from a single thread or use external * locking. If it returns false, it means the metric have just been reset. * In which case, redo getValues(), apply the update again, and call diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java index 59f5e0f3f40..0a9496be0a6 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java @@ -100,7 +100,9 @@ public class VespaServiceDumperImpl implements VespaServiceDumper { unixPathDirectory.deleteRecursively(); } context.log(log, Level.INFO, "Creating '" + unixPathDirectory +"'."); - unixPathDirectory.createDirectory("rwxrwxrwx"); + unixPathDirectory.createDirectory("rwxr-x---") + .setOwner(context.userNamespace().vespaUser()) + .setGroup(context.userNamespace().vespaGroup()); URI destination = serviceDumpDestination(nodeSpec, createDumpId(request)); ProducerContext producerCtx = new ProducerContext(context, directory, request); List<Artifact> producedArtifacts = new ArrayList<>(); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java index a44f90b164b..909c6c9cbc1 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java @@ -114,7 +114,6 @@ class ContainerFileSystemProvider extends FileSystemProvider { // Only called when both 'source' and 'target' have 'this' as the FS provider Path targetPathOnHost = pathOnHost(target); provider(targetPathOnHost).copy(pathOnHost(source), targetPathOnHost, options); - fixOwnerToContainerRoot(toContainerPath(target)); } @Override @@ -122,7 +121,6 @@ class ContainerFileSystemProvider extends FileSystemProvider { // Only called when both 'source' and 'target' have 'this' as the FS provider Path targetPathOnHost = pathOnHost(target); provider(targetPathOnHost).move(pathOnHost(source), targetPathOnHost, options); - fixOwnerToContainerRoot(toContainerPath(target)); } @Override diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java index a5fc6a1373f..4e85052a176 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java @@ -85,7 +85,7 @@ class ContainerFileSystemTest { new UnixPath(destination).setOwnerId(500).setGroupId(200); ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2")); Files.copy(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING); - assertOwnership(destination2, 0, 0, 10000, 11000); + assertOwnership(destination2, 500, 200, 10500, 11200); } @Test @@ -116,7 +116,7 @@ class ContainerFileSystemTest { new UnixPath(destination).setOwnerId(500).setGroupId(200); ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2")); Files.move(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING); - assertOwnership(destination2, 0, 0, 10000, 11000); + assertOwnership(destination2, 500, 200, 10500, 11200); } @Test diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index ff9676da7d0..7793467040e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -745,8 +745,8 @@ DummyPersistence::createBucket(const Bucket& b, Context&) return Result(); } -Result -DummyPersistence::deleteBucket(const Bucket& b, Context&) +void +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); @@ -756,7 +756,7 @@ DummyPersistence::deleteBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); } _content.erase(b); - return Result(); + onComplete->onComplete(std::make_unique<Result>()); } Result diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 9d93316d382..a9b611d131c 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -169,7 +169,7 @@ public: Result destroyIterator(IteratorId, Context&) override; Result createBucket(const Bucket&, Context&) override; - Result deleteBucket(const Bucket&, Context&) override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index e423e0aaac5..6be0941d731 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -35,4 +35,9 @@ AbstractPersistenceProvider::setActiveStateAsync(const Bucket &, BucketInfo::Act op->onComplete(std::make_unique<Result>()); } +void +AbstractPersistenceProvider::deleteBucketAsync(const Bucket &, Context &, OperationComplete::UP op) { + op->onComplete(std::make_unique<Result>()); +} + } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 5f8cf2fc171..472abeca161 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -14,39 +14,14 @@ namespace storage::spi { class AbstractPersistenceProvider : public PersistenceProvider { public: - /** - * Default impl is empty. - */ Result initialize() override { return Result(); }; - - /** - * Default impl empty. - */ Result createBucket(const Bucket&, Context&) override { return Result(); } - - /** - * Default impl is empty. - */ Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } - - /** - * Default impl is remove(). - */ RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override; void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; - - /** - * Default impl empty. - */ Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } - - /** - * Default impl empty. - */ void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) override; - /** - * Default impl empty. - */ + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; }; diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index bb819fc9e50..7da2ee58aa9 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat } Result +PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + deleteBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 45ca49435a7..99e80cc197a 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -342,7 +342,8 @@ struct PersistenceProvider * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual Result deleteBucket(const Bucket&, Context&) = 0; + Result deleteBucket(const Bucket&, Context&); + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index e2f2f21f596..2c21a30396d 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -23,6 +23,8 @@ using storage::spi::BucketInfo; using BlockedReason = IBlockableMaintenanceJob::BlockedReason; using MoveOperationVector = std::vector<MoveOperation>; using storage::spi::dummy::DummyBucketExecutor; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; using vespalib::ThreadStackExecutor; struct ControllerFixtureBase : public ::testing::Test diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index c9b009e4a17..13955953eb5 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -6,6 +6,8 @@ #include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/vespalib/util/threadstackexecutor.h> +using vespalib::RetainGuard; + using BlockedReason = IBlockableMaintenanceJob::BlockedReason; struct MyDirectJobRunner : public IMaintenanceJobRunner { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index 774acd0071a..14f2ff42dbe 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -2,14 +2,14 @@ #include "lid_space_common.h" #include <vespa/searchcore/proton/server/blockable_maintenance_job.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/searchcorespi/index/i_thread_service.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/gtest/gtest.h> namespace storage::spi::dummy { class DummyBucketExecutor; } struct JobTestBase : public ::testing::Test { - MonitoredRefCount _refCount; + vespalib::MonitoredRefCount _refCount; test::ClusterStateHandler _clusterStateHandler; test::DiskMemUsageNotifier _diskMemUsageNotifier; std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 1463be8cdbd..71ae26180ad 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -6,7 +6,6 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/common/transient_resource_usage_provider.h> #include <vespa/searchcore/proton/documentmetastore/operation_listener.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> @@ -42,6 +41,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> @@ -68,6 +68,7 @@ using search::SerialNum; using search::CommitParam; using storage::spi::BucketInfo; using storage::spi::Timestamp; +using vespalib::MonitoredRefCount; using vespalib::Slime; using vespalib::makeLambdaTask; using vespa::config::search::AttributesConfigBuilder; diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp index a76c375bbfc..061a3fd4c32 100644 --- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp +++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp @@ -6,7 +6,6 @@ #include <vespa/document/datatype/referencedatatype.h> #include <vespa/log/log.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/reference/document_db_reference_resolver.h> #include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h> #include <vespa/searchcore/proton/reference/i_document_db_reference.h> @@ -19,6 +18,7 @@ #include <vespa/searchlib/attribute/reference_attribute.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/searchlib/test/mock_attribute_manager.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -33,6 +33,7 @@ using proton::test::MockDocumentDBReference; using search::attribute::test::MockAttributeManager; using vespa::config::search::ImportedFieldsConfig; using vespa::config::search::ImportedFieldsConfigBuilder; +using vespalib::MonitoredRefCount; using vespalib::SequencedTaskExecutor; using vespalib::ISequencedTaskExecutor; diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 668be4c2c2d..15281563b93 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -2,12 +2,12 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/stllike/string.h> #include <vespa/document/base/documentid.h> -#include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/monitored_refcount.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/searchlib/test/mock_gid_to_lid_mapping.h> #include <map> #include <vespa/log/log.h> @@ -16,12 +16,13 @@ LOG_SETUP("gid_to_lid_change_listener_test"); using document::GlobalId; using document::BucketId; using document::DocumentId; -using vespalib::GenerationHandler; using search::attribute::Config; using search::attribute::BasicType; using search::attribute::Reference; using search::attribute::ReferenceAttribute; using search::attribute::test::MockGidToLidMapperFactory; +using vespalib::MonitoredRefCount; +using vespalib::GenerationHandler; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 07d749d8c4f..219a2ea43a4 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -16,7 +16,6 @@ vespa_add_library(searchcore_pcommon STATIC hw_info_sampler.cpp indexschema_inspector.cpp ipendinglidtracker.cpp - monitored_refcount.cpp operation_rate_tracker.cpp pendinglidtracker.cpp select_utils.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index e75c16ddef6..8ccb4863878 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -50,21 +50,22 @@ private: */ class OwningState : public State { public: - OwningState(std::unique_ptr<ITransport> transport) + OwningState(std::shared_ptr<ITransport> transport) : State(*transport), _owned(std::move(transport)) {} ~OwningState() override; private: - std::unique_ptr<ITransport> _owned; + std::shared_ptr<ITransport> _owned; }; inline std::shared_ptr<State> make(ITransport & latch) { return std::make_shared<State>(latch); } + inline std::shared_ptr<State> -make(std::unique_ptr<ITransport> transport) { +make(std::shared_ptr<ITransport> transport) { return std::make_shared<OwningState>(std::move(transport)); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index cb26e80b3ff..114292d055d 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -364,7 +364,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } @@ -384,7 +384,7 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); } @@ -436,7 +436,7 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP if (handler == nullptr) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } @@ -564,19 +564,23 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) } -Result -PersistenceEngine::deleteBucket(const Bucket& b, Context&) +void +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleDeleteBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleDeleteBucket(feedtoken::make(transportContext), b); + } else { + handler->handleDeleteBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index e131cb13ae1..94331ac2cd6 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -115,7 +115,7 @@ public: Result destroyIterator(IteratorId, Context&) override; Result createBucket(const Bucket &bucketId, Context &) override ; - Result deleteBucket(const Bucket&, Context&) override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp index 4cc5fb0b5db..5d1e1318b04 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp @@ -29,6 +29,8 @@ using search::AttributeVector; using search::IAttributeManager; using search::NotImplementedAttribute; using vespalib::ISequencedTaskExecutor; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; using vespa::config::search::ImportedFieldsConfig; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h index 761b9407fe5..522cdf83477 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h @@ -21,15 +21,17 @@ namespace search::attribute { namespace vespa::config::search::internal { class InternalImportedFieldsType; } namespace vespalib { - class ISequencedTaskExecutor; + +class ISequencedTaskExecutor; +class MonitoredRefCount; } + namespace proton { class IDocumentDBReference; class IDocumentDBReferenceRegistry; class ImportedAttributesRepo; class GidToLidChangeRegistrator; -class MonitoredRefCount; /** * Class that for a given document db resolves all references to parent document dbs: @@ -42,7 +44,7 @@ private: const document::DocumentType &_thisDocType; const ImportedFieldsConfig &_importedFieldsCfg; const document::DocumentType &_prevThisDocType; - MonitoredRefCount &_refCount; + vespalib::MonitoredRefCount &_refCount; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; bool _useReferences; std::map<vespalib::string, std::unique_ptr<GidToLidChangeRegistrator>> _registrators; @@ -61,7 +63,7 @@ public: const document::DocumentType &thisDocType, const ImportedFieldsConfig &importedFieldsCfg, const document::DocumentType &prevThisDocType, - MonitoredRefCount &refCount, + vespalib::MonitoredRefCount &refCount, vespalib::ISequencedTaskExecutor &attributeFieldWriter, bool useReferences); ~DocumentDBReferenceResolver() override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index c7e716b4173..427cbab2a14 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -3,6 +3,8 @@ #include "gid_to_lid_change_listener.h" #include <future> +using vespalib::RetainGuard; + namespace proton { GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h index 23bb32af87a..28e9684ed86 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h @@ -4,8 +4,8 @@ #include "i_gid_to_lid_change_listener.h" #include <vespa/searchlib/attribute/reference_attribute.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> namespace proton { @@ -18,14 +18,14 @@ class GidToLidChangeListener : public IGidToLidChangeListener vespalib::ISequencedTaskExecutor &_attributeFieldWriter; vespalib::ISequencedTaskExecutor::ExecutorId _executorId; std::shared_ptr<search::attribute::ReferenceAttribute> _attr; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; vespalib::string _name; vespalib::string _docTypeName; public: GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, std::shared_ptr<search::attribute::ReferenceAttribute> attr, - RetainGuard refCount, + vespalib::RetainGuard refCount, const vespalib::string &name, const vespalib::string &docTypeName); ~GidToLidChangeListener() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index c70928cd5e8..386d8367e52 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -25,6 +25,7 @@ using document::BucketId; using storage::spi::BucketInfo; using storage::spi::Bucket; using proton::bucketdb::BucketMover; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; using vespalib::Trinary; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 66d24efb466..d4438fcd411 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -10,7 +10,7 @@ #include "maintenancedocumentsubdb.h" #include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace storage::spi { struct BucketExecutor; } @@ -59,7 +59,7 @@ private: using Movers = std::vector<BucketMoverSP>; using GuardedMoveOps = BucketMover::GuardedMoveOps; std::shared_ptr<IBucketStateCalculator> _calc; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; IDocumentMoveHandler &_moveHandler; IBucketModifiedHandler &_modifiedHandler; IThreadService &_master; @@ -80,7 +80,7 @@ private: IDiskMemUsageNotifier &_diskMemUsageNotifier; BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, @@ -115,7 +115,7 @@ private: public: static std::shared_ptr<BucketMoveJob> create(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 936ba812a44..014bba11f83 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -19,13 +19,13 @@ #include "threading_service_config.h" #include <vespa/searchcore/proton/attribute/attribute_usage_filter.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h> #include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/index/indexmanager.h> #include <vespa/searchlib/docstore/cachestats.h> #include <vespa/searchlib/transactionlog/syncproxy.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/varholder.h> #include <mutex> #include <condition_variable> @@ -111,7 +111,7 @@ private: DocumentDBTaggedMetrics _metrics; std::unique_ptr<metrics::UpdateHook> _metricsHook; vespalib::VarHolder<IFeedView::SP> _feedView; - MonitoredRefCount _refCount; + vespalib::MonitoredRefCount _refCount; bool _syncFeedViewEnabled; IDocumentDBOwner &_owner; storage::spi::BucketExecutor &_bucketExecutor; @@ -381,7 +381,7 @@ public: /** * Reference counting */ - RetainGuard retain() { return RetainGuard(_refCount); } + vespalib::RetainGuard retain() { return vespalib::RetainGuard(_refCount); } bool getDelayedConfig() const { return _state.getDelayedConfig(); } void replayConfig(SerialNum serialNum) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h index fe937a35db7..2098466a2c0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/flushengine/iflushhandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -13,7 +13,7 @@ class FlushHandlerProxy : public IFlushHandler { private: std::shared_ptr<DocumentDB> _documentDB; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: FlushHandlerProxy(const std::shared_ptr<DocumentDB> &documentDB); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index efe92a11f02..a5c1d1fc2c9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -26,6 +26,7 @@ using search::DocumentMetaData; using search::LidUsageStats; using storage::spi::makeBucketTask; using storage::spi::Bucket; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; namespace proton::lidspace { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 717c19f093d..917ff12be4a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -6,9 +6,9 @@ #include "document_db_maintenance_config.h" #include "i_disk_mem_usage_listener.h" #include "iclusterstatechangedhandler.h" -#include <vespa/searchlib/common/idocumentmetastore.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/document/bucket/bucketspace.h> +#include <vespa/searchlib/common/idocumentmetastore.h> +#include <vespa/vespalib/util/retain_guard.h> #include <atomic> namespace storage::spi { struct BucketExecutor; } @@ -50,7 +50,7 @@ private: bool _shouldCompactLidSpace; IThreadService &_master; BucketExecutor &_bucketExecutor; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; document::BucketSpace _bucketSpace; bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; @@ -68,7 +68,7 @@ private: class MoveTask; CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, @@ -81,7 +81,7 @@ private: public: static std::shared_ptr<CompactionJob> create(const DocumentDBLidSpaceCompactionConfig &config, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 2adcdbab217..c4826bba8ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -14,6 +14,7 @@ LOG_SETUP(".proton.server.maintenancecontroller"); using document::BucketId; using vespalib::Executor; +using vespalib::MonitoredRefCount; using vespalib::makeLambdaTask; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index bddd58a349a..8c8cc3e2d43 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -5,14 +5,17 @@ #include "maintenancedocumentsubdb.h" #include "i_maintenance_job.h" #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/scheduledexecutor.h> #include <mutex> namespace vespalib { - class Timer; - class Executor; + +class Executor; +class MonitoredRefCount; +class Timer; + } namespace searchcorespi::index { struct IThreadService; } @@ -20,7 +23,6 @@ namespace proton { class MaintenanceJobRunner; class DocumentDBMaintenanceConfig; -class MonitoredRefCount; /** * Class that controls the bucket moving between ready and notready sub databases @@ -36,7 +38,7 @@ public: using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName); + MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName); ~MaintenanceController(); void registerJobInMasterThread(IMaintenanceJob::UP job); @@ -70,14 +72,14 @@ public: const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; } IThreadService & masterThread() { return _masterThread; } const DocTypeName & getDocTypeName() const { return _docTypeName; } - RetainGuard retainDB() { return RetainGuard(_refCount); } + vespalib::RetainGuard retainDB() { return vespalib::RetainGuard(_refCount); } private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; IThreadService &_masterThread; vespalib::Executor &_defaultExecutor; - MonitoredRefCount &_refCount; + vespalib::MonitoredRefCount &_refCount; MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index f4d6175391c..5c35434aae8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -18,7 +18,7 @@ private: FeedHandler &_feedHandler; BucketHandler &_bucketHandler; ClusterStateHandler &_clusterStateHandler; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: explicit PersistenceHandlerProxy(std::shared_ptr<DocumentDB> documentDB); diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index c266a604e33..6fe1947bd1d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -16,6 +16,7 @@ using document::BucketId; using storage::spi::Timestamp; using storage::spi::Bucket; using vespalib::IDestructorCallback; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h index 5c52991d0c3..6198f588a32 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h @@ -3,9 +3,9 @@ #include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <persistence/spi/types.h> #include <vespa/document/bucket/bucketspace.h> +#include <vespa/vespalib/util/retain_guard.h> #include <atomic> namespace storage::spi { struct BucketExecutor; } @@ -36,7 +36,7 @@ private: IThreadService &_master; BucketExecutor &_bucketExecutor; const vespalib::string _docTypeName; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; const vespalib::duration _cfgAgeLimit; const uint32_t _subDbId; const document::BucketSpace _bucketSpace; @@ -45,14 +45,14 @@ private: void remove(uint32_t lid, const RawDocumentMetaData & meta); - PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, + PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor); bool run() override; public: static std::shared_ptr<PruneRemovedDocumentsJob> - create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, + create(const Config &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor) { diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h index 4eaf15b3b1c..f7c52ed4baa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/summaryengine/isearchhandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -13,7 +13,7 @@ class SearchHandlerProxy : public ISearchHandler { private: std::shared_ptr<DocumentDB> _documentDB; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: SearchHandlerProxy(std::shared_ptr<DocumentDB> documentDB); diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d521975e0cb..a1263c9433b 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) pending_message_tracker().insert(msg); } { - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); + // TODO we might not want this particular behavior for merge operations either + MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3))); // Not blocked for exact node match. EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index e877f4601b7..971ff36c833 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) { EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1))); } +TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) { + RemoveBucketOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(1, 3))); + // In node target set + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120)); + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120)); + // Not in node target set + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120)); + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 25009156f18..f5531a134d0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { struct PendingMessage { uint32_t _msgType; + uint16_t _node; uint8_t _pri; - PendingMessage() : _msgType(UINT32_MAX), _pri(0) {} + constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {} - PendingMessage(uint32_t msgType, uint8_t pri) - : _msgType(msgType), _pri(pri) {} + constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept + : _msgType(msgType), _node(0), _pri(pri) {} - bool shouldCheck() const { return _msgType != UINT32_MAX; } + bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; } }; void enableClusterState(const lib::ClusterState& systemState) { @@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { IdealStateOperation::UP op(result.createOperation()); if (op.get()) { if (blocker.shouldCheck() - && op->shouldBlockThisOperation(blocker._msgType, - blocker._pri)) + && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri)) { return "BLOCKED"; } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index a174d305c27..5f1acf6e7da 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -22,6 +22,15 @@ } \ } +#define CHECK_ERROR_ASYNC(className, failType, onError) \ + { \ + Guard guard(_lock); \ + if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ + onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ + return; \ + } \ + } + namespace storage { namespace { @@ -172,13 +181,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, return _spi.destroyIterator(iterId, context); } -spi::Result -PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket, - spi::Context& context) +void +PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, + spi::OperationComplete::UP operationComplete) { LOG_SPI("deleteBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET); - return _spi.deleteBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); + _spi.deleteBucketAsync(bucket, context, std::move(operationComplete)); } spi::Result diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index d90fa7b2eaa..64828a2a3ee 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -103,7 +103,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index c51357cacd1..c3caac7121c 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -66,9 +66,9 @@ public: return PersistenceProviderWrapper::createBucket(bucket, ctx); } - spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_deleteBucketInvocations; - return PersistenceProviderWrapper::deleteBucket(bucket, ctx); + PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index fbe1c142b09..7f66d1effd5 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() { } bool -GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { +GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const { return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 2e010a61bde..f51739242b7 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -21,7 +21,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "garbagecollection"; }; Type getType() const override { return GARBAGE_COLLECTION; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index b1231fafcd9..744b24b593e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -138,8 +138,7 @@ public: bool check(uint32_t messageType, uint16_t node, uint8_t priority) override { - (void) node; - if (op.shouldBlockThisOperation(messageType, priority)) { + if (op.shouldBlockThisOperation(messageType, node, priority)) { blocked = true; return false; } @@ -232,6 +231,7 @@ IdealStateOperation::toString() const bool IdealStateOperation::shouldBlockThisOperation(uint32_t messageType, + [[maybe_unused]] uint16_t node, uint8_t) const { for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index d4dc4e405df..f8f35afe821 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -217,7 +217,7 @@ public: /** * Should return true if the given message type should block this operation. */ - virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const; + virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const; protected: friend struct IdealStateManagerTest; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7cfe4172b2c..f951a880e5d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{ } -bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const { +bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const { for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) { if (messageType == blocking_type) { return true; } } - return IdealStateOperation::shouldBlockThisOperation(messageType, pri); + return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri); } bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 1bca1f7389f..832c0f99681 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -48,7 +48,7 @@ public: const document::BucketId&, MergeLimiter&, std::vector<MergeMetaData>&); - bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; + bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override; bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; private: static void addIdealNodes( diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 9a57722dc7e..25cae5b9979 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender) bool RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg) { - api::DeleteBucketReply* rep = - dynamic_cast<api::DeleteBucketReply*>(msg.get()); + auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get()); uint16_t node = _tracker.handleReply(*rep); @@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha } bool -RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const +RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const { - return true; + // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded + // number in the worst case), so a simple linear scan suffices. + for (uint16_t node : getNodes()) { + if (target_node == node) { + return true; + } + } + return false; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h index a0d496f948a..5e0922d5685 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h @@ -30,7 +30,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "remove"; }; Type getType() const override { return DELETE_BUCKET; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 649503cf0f5..6f3924535ef 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op bool SplitOperation::shouldBlockThisOperation(uint32_t msgType, + [[maybe_unused]] uint16_t node, uint8_t pri) const { if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index ee957309088..6a268155fc8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -21,7 +21,7 @@ public: const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override; - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index d2bfabd2950..d150f5600e5 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -91,7 +91,19 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +bool +bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { + // Don't check document sizes, as background moving of documents in Proton + // may trigger a change in size without any mutations taking place. This will + // only take place when a document being moved was fed _prior_ to the change + // where Proton starts reporting actual document sizes, and will eventually + // converge to a stable value. But for now, ignore it to prevent false positive + // error logs and non-deleted buckets. + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); +} + } + AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, @@ -142,6 +154,47 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { + return tracker; + } + + auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exist() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } + tracker->sendReply(); + }); + _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return tracker; +} + +MessageTracker::UP AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const { trackerUP->setMetric(_env._metrics.setBucketStates); @@ -154,9 +207,8 @@ AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrack auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { if (tracker->checkForError(*response)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(), - "handleSetBucketState"); + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState"); if (entry.exist()) { entry->info.setActive(newState == spi::BucketInfo::ACTIVE); notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); @@ -273,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra return true; } +bool +AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const +{ + spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); + if (result.hasError()) { + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); + return false; + } + api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); + // Don't check meta fields or active/ready fields since these are not + // that important and ready may change under the hood in a race with + // getModifiedBuckets(). If bucket is empty it means it has already + // been deleted by a racing split/join. + if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { + LOG(error, + "Service layer bucket database and provider out of sync before " + "deleting bucket %s! Service layer db had %s while provider says " + "bucket has %s. Deletion has been rejected to ensure data is not " + "lost, but bucket may remain out of sync until service has been " + "restarted.", + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); + return false; + } + return true; +} + } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index bf37becb2c3..4f5c242570c 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -29,8 +29,10 @@ public: MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 3d9b359f506..297185ac54c 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -45,7 +45,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::CREATEBUCKET_ID: return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index ab5066576fd..73033132e5d 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -131,10 +131,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont return checkResult(_impl.createBucket(bucket, context)); } -spi::Result -ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.deleteBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 9361cd1d19d..6e7986ad65c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,7 +41,7 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; @@ -54,7 +54,6 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; spi::Result createBucket(const spi::Bucket&, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -67,6 +66,8 @@ public: void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4c09f09e63..b4fe207e2e5 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa return document::FieldSet::SP(); } -bool -bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { - // Don't check document sizes, as background moving of documents in Proton - // may trigger a change in size without any mutations taking place. This will - // only take place when a document being moved was fed _prior_ to the change - // where Proton starts reporting actual document sizes, and will eventually - // converge to a stable value. But for now, ignore it to prevent false positive - // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); -} } SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) : _env(env), @@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT return tracker; } -bool -SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const -{ - spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); - if (result.hasError()) { - LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), result.getErrorMessage().c_str()); - return false; - } - api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); - // Don't check meta fields or active/ready fields since these are not - // that important and ready may change under the hood in a race with - // getModifiedBuckets(). If bucket is empty it means it has already - // been deleted by a racing split/join. - if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { - LOG(error, - "Service layer bucket database and provider out of sync before " - "deleting bucket %s! Service layer db had %s while provider says " - "bucket has %s. Deletion has been rejected to ensure data is not " - "lost, but bucket may remain out of sync until service has been " - "restarted.", - bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); - return false; - } - return true; -} - -MessageTracker::UP -SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.deleteBuckets); - LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); - } - spi::Bucket bucket(cmd.getBucket()); - if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { - return tracker; - } - _spi.deleteBucket(bucket, tracker->context()); - StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); - { - StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); - if (entry.exist() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - cmd.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 9f00f67684d..2cfbc7016c0 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -23,13 +23,11 @@ public: MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; }; diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 9e2917775f0..6115c21623d 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -38,6 +38,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT memoryusage.cpp mmap_file_allocator.cpp mmap_file_allocator_factory.cpp + monitored_refcount.cpp printable.cpp priority_queue.cpp random.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp index 97615abba31..4376e26bb66 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp @@ -3,7 +3,7 @@ #include "monitored_refcount.h" #include <cassert> -namespace proton { +namespace vespalib { MonitoredRefCount::MonitoredRefCount() : _lock(), @@ -41,4 +41,4 @@ MonitoredRefCount::waitForZeroRefCount() _cv.wait(guard, [this] { return (_refCount == 0u); }); } -} // namespace proton +} diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/monitored_refcount.h new file mode 100644 index 00000000000..465284b6fd3 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <mutex> +#include <condition_variable> + +namespace vespalib { + +class RetainGuard; +/* + * Class containing a reference count that can be waited on to become zero. + * Typically ancestor or member of a class that has to be careful of when + * portions object can be properly torn down before destruction itself. + */ +class MonitoredRefCount +{ + std::mutex _lock; + std::condition_variable _cv; + uint32_t _refCount; + void retain() noexcept; + void release() noexcept; + friend RetainGuard; +public: + MonitoredRefCount(); + virtual ~MonitoredRefCount(); + void waitForZeroRefCount(); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/retain_guard.h index 9eb713b1220..090f3ce75cf 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h +++ b/vespalib/src/vespa/vespalib/util/retain_guard.h @@ -1,31 +1,16 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #pragma once -#include <mutex> -#include <condition_variable> +#include "monitored_refcount.h" -namespace proton { +namespace vespalib { -class RetainGuard; /* - * Class containing a reference count that can be waited on to become zero. - * Typically ancestor or member of a class that has to be careful of when - * portions object can be properly torn down before destruction itself. + * Class containing a reference to a monitored reference count, + * intended to block teardown of the class owning the monitored + * reference count. */ -class MonitoredRefCount -{ - std::mutex _lock; - std::condition_variable _cv; - uint32_t _refCount; - void retain() noexcept; - void release() noexcept; - friend RetainGuard; -public: - MonitoredRefCount(); - virtual ~MonitoredRefCount(); - void waitForZeroRefCount(); -}; - class RetainGuard { public: RetainGuard(MonitoredRefCount & refCount) noexcept @@ -57,4 +42,4 @@ private: MonitoredRefCount * _refCount; }; -} // namespace proton +} |