diff options
74 files changed, 406 insertions, 311 deletions
diff --git a/configserver/src/main/resources/configserver-app/services.xml b/configserver/src/main/resources/configserver-app/services.xml index faec1c650f6..ca6ed4cff28 100644 --- a/configserver/src/main/resources/configserver-app/services.xml +++ b/configserver/src/main/resources/configserver-app/services.xml @@ -39,6 +39,7 @@ <component id="com.yahoo.vespa.config.server.rpc.RpcRequestHandlerProvider" bundle="configserver" /> <component id="com.yahoo.vespa.config.server.rpc.security.DummyNodeIdentifierProvider" bundle="configserver" /> <component id="com.yahoo.vespa.config.server.rpc.security.DefaultRpcAuthorizerProvider" bundle="configserver" /> + <component id="com.yahoo.vespa.config.server.http.TesterClient" bundle="configserver" /> <components> <include dir="config-models" /> diff --git a/container-core/src/main/java/com/yahoo/container/di/componentgraph/core/ComponentGraph.java b/container-core/src/main/java/com/yahoo/container/di/componentgraph/core/ComponentGraph.java index 049aad84cfe..86e6bc4fa4a 100644 --- a/container-core/src/main/java/com/yahoo/container/di/componentgraph/core/ComponentGraph.java +++ b/container-core/src/main/java/com/yahoo/container/di/componentgraph/core/ComponentGraph.java @@ -258,10 +258,12 @@ public class ComponentGraph { if (component.isEmpty()) { Object instance; try { - // This is an indication that you have not set up your components correctly in the model - // And tit will cause unnecessary reconstruction of your components. - // TODO: this should perhaps bee a warning. - log.log(Level.INFO, () -> "Trying the fallback injector to create" + messageForNoGlobalComponent(clazz, node)); + Level level = hasExplicitBinding(fallbackInjector, key) ? Level.FINE : Level.WARNING; + log.log(level, () -> "Trying the fallback injector to create" + messageForNoGlobalComponent(clazz, node)); + if (level.intValue() > Level.INFO.intValue()) { + log.log(level, "A component of type " + key.getTypeLiteral() + " should probably be declared in services.xml. " + + "Not doing so may cause resource leaks and unnecessary reconstruction of components."); + } instance = fallbackInjector.getInstance(key); } catch (ConfigurationException e) { throw removeStackTrace(new IllegalStateException( @@ -277,6 +279,11 @@ public class ComponentGraph { return component.get(); } + private boolean hasExplicitBinding(Injector injector, Key<?> key) { + log.log(Level.FINE, () -> "Injector binding for " + key + ": " + injector.getExistingBinding(key)); + return injector.getExistingBinding(key) != null; + } + private Node handleComponentParameter(Node node, Injector fallbackInjector, Class<?> clazz, Collection<Annotation> annotations) { List<Annotation> bindingAnnotations = annotations.stream().filter(ComponentGraph::isBindingAnnotation).collect(Collectors.toList()); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 1774694853b..59d18fcf17b 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -470,10 +470,10 @@ public class ApplicationApiHandler extends AuditLoggingRequestHandler { TenantInfo mergedInfo = TenantInfo.EMPTY .withName(getString(insp.field("name"), oldInfo.name())) .withEmail(getString(insp.field("email"), oldInfo.email())) - .withWebsite(getString(insp.field("website"), oldInfo.email())) + .withWebsite(getString(insp.field("website"), oldInfo.website())) .withInvoiceEmail(getString(insp.field("invoiceEmail"), oldInfo.invoiceEmail())) .withContactName(getString(insp.field("contactName"), oldInfo.contactName())) - .withContactEmail(getString(insp.field("contactEmail"), oldInfo.contactName())) + .withContactEmail(getString(insp.field("contactEmail"), oldInfo.contactEmail())) .withAddress(updateTenantInfoAddress(insp.field("address"), oldInfo.address())) .withBillingContact(updateTenantInfoBillingContact(insp.field("billingContact"), oldInfo.billingContact())); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java index 59f5e0f3f40..0a9496be0a6 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java @@ -100,7 +100,9 @@ public class VespaServiceDumperImpl implements VespaServiceDumper { unixPathDirectory.deleteRecursively(); } context.log(log, Level.INFO, "Creating '" + unixPathDirectory +"'."); - unixPathDirectory.createDirectory("rwxrwxrwx"); + unixPathDirectory.createDirectory("rwxr-x---") + .setOwner(context.userNamespace().vespaUser()) + .setGroup(context.userNamespace().vespaGroup()); URI destination = serviceDumpDestination(nodeSpec, createDumpId(request)); ProducerContext producerCtx = new ProducerContext(context, directory, request); List<Artifact> producedArtifacts = new ArrayList<>(); diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java index a44f90b164b..909c6c9cbc1 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java @@ -114,7 +114,6 @@ class ContainerFileSystemProvider extends FileSystemProvider { // Only called when both 'source' and 'target' have 'this' as the FS provider Path targetPathOnHost = pathOnHost(target); provider(targetPathOnHost).copy(pathOnHost(source), targetPathOnHost, options); - fixOwnerToContainerRoot(toContainerPath(target)); } @Override @@ -122,7 +121,6 @@ class ContainerFileSystemProvider extends FileSystemProvider { // Only called when both 'source' and 'target' have 'this' as the FS provider Path targetPathOnHost = pathOnHost(target); provider(targetPathOnHost).move(pathOnHost(source), targetPathOnHost, options); - fixOwnerToContainerRoot(toContainerPath(target)); } @Override diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java index a5fc6a1373f..4e85052a176 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java @@ -85,7 +85,7 @@ class ContainerFileSystemTest { new UnixPath(destination).setOwnerId(500).setGroupId(200); ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2")); Files.copy(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING); - assertOwnership(destination2, 0, 0, 10000, 11000); + assertOwnership(destination2, 500, 200, 10500, 11200); } @Test @@ -116,7 +116,7 @@ class ContainerFileSystemTest { new UnixPath(destination).setOwnerId(500).setGroupId(200); ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2")); Files.move(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING); - assertOwnership(destination2, 0, 0, 10000, 11000); + assertOwnership(destination2, 500, 200, 10500, 11200); } @Test diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 66f03edafa2..7793467040e 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -405,9 +405,8 @@ DummyPersistence::setClusterState(BucketSpace bucketSpace, const ClusterState& c return Result(); } -Result -DummyPersistence::setActiveState(const Bucket& b, - BucketInfo::ActiveState newState) +void +DummyPersistence::setActiveStateAsync(const Bucket& b, BucketInfo::ActiveState newState, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "setCurrentState(%s, %s)", @@ -416,11 +415,12 @@ DummyPersistence::setActiveState(const Bucket& b, assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); BucketContentGuard::UP bc(acquireBucketWithLock(b)); - if (!bc.get()) { - return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"); + if ( ! bc ) { + onComplete->onComplete(std::make_unique<BucketInfoResult>(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found")); + } else { + (*bc)->setActive(newState == BucketInfo::ACTIVE); + onComplete->onComplete(std::make_unique<Result>()); } - (*bc)->setActive(newState == BucketInfo::ACTIVE); - return Result(); } BucketInfoResult @@ -745,8 +745,8 @@ DummyPersistence::createBucket(const Bucket& b, Context&) return Result(); } -Result -DummyPersistence::deleteBucket(const Bucket& b, Context&) +void +DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "deleteBucket(%s)", b.toString().c_str()); @@ -756,7 +756,7 @@ DummyPersistence::deleteBucket(const Bucket& b, Context&) assert(!_content[b]->_inUse); } _content.erase(b); - return Result(); + onComplete->onComplete(std::make_unique<Result>()); } Result diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 486f4cec2f2..a9b611d131c 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -155,7 +155,7 @@ public: BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override; - Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override; BucketInfoResult getBucketInfo(const Bucket&) const override; Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; @@ -169,7 +169,7 @@ public: Result destroyIterator(IteratorId, Context&) override; Result createBucket(const Bucket&, Context&) override; - Result deleteBucket(const Bucket&, Context&) override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index 23a8f600024..6be0941d731 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,4 +30,14 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } +void +AbstractPersistenceProvider::setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP op) { + op->onComplete(std::make_unique<Result>()); +} + +void +AbstractPersistenceProvider::deleteBucketAsync(const Bucket &, Context &, OperationComplete::UP op) { + op->onComplete(std::make_unique<Result>()); +} + } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 2332c05b57f..472abeca161 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -14,39 +14,14 @@ namespace storage::spi { class AbstractPersistenceProvider : public PersistenceProvider { public: - /** - * Default impl is empty. - */ Result initialize() override { return Result(); }; - - /** - * Default impl empty. - */ Result createBucket(const Bucket&, Context&) override { return Result(); } - - /** - * Default impl is empty. - */ Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } - - /** - * Default impl is remove(). - */ RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override; void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; - - /** - * Default impl empty. - */ Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); } - - /** - * Default impl empty. - */ - Result setActiveState(const Bucket&, BucketInfo::ActiveState) override { return Result(); } - /** - * Default impl empty. - */ + void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; }; diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 575a95269c5..7da2ee58aa9 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -9,6 +9,22 @@ namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; Result +PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveState activeState) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + setActiveStateAsync(bucket, activeState, std::move(catcher)); + return *future.get(); +} + +Result +PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + deleteBucketAsync(bucket, context, std::move(catcher)); + return *future.get(); +} + +Result PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 56ef21b5c77..99e80cc197a 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -86,7 +86,8 @@ struct PersistenceProvider * other buckets may be deactivated, so the node must be able to serve * the data from its secondary index or get reduced coverage. */ - virtual Result setActiveState(const Bucket&, BucketInfo::ActiveState) = 0; + Result setActiveState(const Bucket&, BucketInfo::ActiveState); + virtual void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) = 0; /** * Retrieve metadata for a bucket, previously returned in listBuckets(), @@ -341,7 +342,8 @@ struct PersistenceProvider * After this operation has succeeded, a restart of the provider should * not yield the bucket in getBucketList(). */ - virtual Result deleteBucket(const Bucket&, Context&) = 0; + Result deleteBucket(const Bucket&, Context&); + virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0; /** * This function is called continuously by the service layer. It allows the diff --git a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp index 487c8741a65..29748a2010c 100644 --- a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp @@ -96,7 +96,7 @@ struct Fixture BucketStateCalculator::SP _calc; test::BucketIdListResultHandler _bucketList; test::BucketInfoResultHandler _bucketInfo; - test::GenericResultHandler _genResult; + std::shared_ptr<test::GenericResultHandler> _genResult; Fixture() : _builder(), _bucketDB(std::make_shared<bucketdb::BucketDBOwner>()), @@ -107,7 +107,8 @@ struct Fixture _handler(_exec), _changedHandler(), _calc(new BucketStateCalculator()), - _bucketList(), _bucketInfo(), _genResult() + _bucketList(), _bucketInfo(), + _genResult(std::make_shared<test::GenericResultHandler>()) { // bucket 2 & 3 & 4 & 7 in ready _ready.insertDocs(_builder.createDocs(2, 1, 3). // 2 docs diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index e2f2f21f596..2c21a30396d 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -23,6 +23,8 @@ using storage::spi::BucketInfo; using BlockedReason = IBlockableMaintenanceJob::BlockedReason; using MoveOperationVector = std::vector<MoveOperation>; using storage::spi::dummy::DummyBucketExecutor; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; using vespalib::ThreadStackExecutor; struct ControllerFixtureBase : public ::testing::Test diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index c9b009e4a17..13955953eb5 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -6,6 +6,8 @@ #include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/vespalib/util/threadstackexecutor.h> +using vespalib::RetainGuard; + using BlockedReason = IBlockableMaintenanceJob::BlockedReason; struct MyDirectJobRunner : public IMaintenanceJobRunner { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index 774acd0071a..14f2ff42dbe 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -2,14 +2,14 @@ #include "lid_space_common.h" #include <vespa/searchcore/proton/server/blockable_maintenance_job.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/searchcorespi/index/i_thread_service.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/gtest/gtest.h> namespace storage::spi::dummy { class DummyBucketExecutor; } struct JobTestBase : public ::testing::Test { - MonitoredRefCount _refCount; + vespalib::MonitoredRefCount _refCount; test::ClusterStateHandler _clusterStateHandler; test::DiskMemUsageNotifier _diskMemUsageNotifier; std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 1463be8cdbd..71ae26180ad 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -6,7 +6,6 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/common/transient_resource_usage_provider.h> #include <vespa/searchcore/proton/documentmetastore/operation_listener.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> @@ -42,6 +41,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> @@ -68,6 +68,7 @@ using search::SerialNum; using search::CommitParam; using storage::spi::BucketInfo; using storage::spi::Timestamp; +using vespalib::MonitoredRefCount; using vespalib::Slime; using vespalib::makeLambdaTask; using vespa::config::search::AttributesConfigBuilder; diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp index 2bb1eb44e25..e8cc1b54235 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp @@ -20,7 +20,7 @@ struct DummyPersistenceHandler : public IPersistenceHandler { void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {} void handleListBuckets(IBucketIdListResultHandler &) override {} void handleSetClusterState(const storage::spi::ClusterState &, IGenericResultHandler &) override {} - void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, IGenericResultHandler &) override {} + void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, std::shared_ptr<IGenericResultHandler>) override {} void handleGetBucketInfo(const storage::spi::Bucket &, IBucketInfoResultHandler &) override {} void handleCreateBucket(FeedToken, const storage::spi::Bucket &) override {} void handleDeleteBucket(FeedToken, const storage::spi::Bucket &) override {} @@ -44,8 +44,6 @@ DummyPersistenceHandler::SP handler_b(std::make_shared<DummyPersistenceHandler>( DummyPersistenceHandler::SP handler_c(std::make_shared<DummyPersistenceHandler>()); DummyPersistenceHandler::SP handler_a_new(std::make_shared<DummyPersistenceHandler>()); - - void assertHandler(const IPersistenceHandler::SP & lhs, const IPersistenceHandler * rhs) { diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 9613c505f77..c252c89a2f8 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -227,10 +227,10 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { } void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override { + std::shared_ptr<IGenericResultHandler> resultHandler) override { lastBucket = bucket; lastBucketState = newState; - resultHandler.handle(bucketStateResult); + resultHandler->handle(bucketStateResult); } void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override { diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp index a76c375bbfc..061a3fd4c32 100644 --- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp +++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp @@ -6,7 +6,6 @@ #include <vespa/document/datatype/referencedatatype.h> #include <vespa/log/log.h> #include <vespa/searchcore/proton/attribute/imported_attributes_repo.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/reference/document_db_reference_resolver.h> #include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h> #include <vespa/searchcore/proton/reference/i_document_db_reference.h> @@ -19,6 +18,7 @@ #include <vespa/searchlib/attribute/reference_attribute.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/searchlib/test/mock_attribute_manager.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -33,6 +33,7 @@ using proton::test::MockDocumentDBReference; using search::attribute::test::MockAttributeManager; using vespa::config::search::ImportedFieldsConfig; using vespa::config::search::ImportedFieldsConfigBuilder; +using vespalib::MonitoredRefCount; using vespalib::SequencedTaskExecutor; using vespalib::ISequencedTaskExecutor; diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 668be4c2c2d..15281563b93 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -2,12 +2,12 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/stllike/string.h> #include <vespa/document/base/documentid.h> -#include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h> #include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/monitored_refcount.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/searchlib/test/mock_gid_to_lid_mapping.h> #include <map> #include <vespa/log/log.h> @@ -16,12 +16,13 @@ LOG_SETUP("gid_to_lid_change_listener_test"); using document::GlobalId; using document::BucketId; using document::DocumentId; -using vespalib::GenerationHandler; using search::attribute::Config; using search::attribute::BasicType; using search::attribute::Reference; using search::attribute::ReferenceAttribute; using search::attribute::test::MockGidToLidMapperFactory; +using vespalib::MonitoredRefCount; +using vespalib::GenerationHandler; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt index 07d749d8c4f..219a2ea43a4 100644 --- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt @@ -16,7 +16,6 @@ vespa_add_library(searchcore_pcommon STATIC hw_info_sampler.cpp indexschema_inspector.cpp ipendinglidtracker.cpp - monitored_refcount.cpp operation_rate_tracker.cpp pendinglidtracker.cpp select_utils.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index e75c16ddef6..8ccb4863878 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -50,21 +50,22 @@ private: */ class OwningState : public State { public: - OwningState(std::unique_ptr<ITransport> transport) + OwningState(std::shared_ptr<ITransport> transport) : State(*transport), _owned(std::move(transport)) {} ~OwningState() override; private: - std::unique_ptr<ITransport> _owned; + std::shared_ptr<ITransport> _owned; }; inline std::shared_ptr<State> make(ITransport & latch) { return std::make_shared<State>(latch); } + inline std::shared_ptr<State> -make(std::unique_ptr<ITransport> transport) { +make(std::shared_ptr<ITransport> transport) { return std::make_shared<OwningState>(std::move(transport)); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h index b4544868bbe..b393a85f632 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h @@ -53,7 +53,7 @@ public: virtual void handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) = 0; + std::shared_ptr<IGenericResultHandler> resultHandler) = 0; virtual void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) = 0; virtual void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 136d95a068b..114292d055d 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -306,20 +306,21 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState & } -Result -PersistenceEngine::setActiveState(const Bucket& bucket, - storage::spi::BucketInfo::ActiveState newState) +void +PersistenceEngine::setActiveStateAsync(const Bucket & bucket, BucketInfo::ActiveState newState, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace()); - auto catchResult = std::make_unique<storage::spi::CatchResult>(); - auto futureResult = catchResult->future_result(); - GenericResultHandler resultHandler(snap.size(), std::move(catchResult)); - for (; snap.handlers().valid(); snap.handlers().next()) { + auto resultHandler = std::make_shared<GenericResultHandler>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleSetActiveState(bucket, newState, resultHandler); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleSetActiveState(bucket, newState, resultHandler); + } else { + handler->handleSetActiveState(bucket, newState, std::move(resultHandler)); + } } - return *futureResult.get(); } @@ -363,7 +364,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } @@ -383,7 +384,7 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); } @@ -435,7 +436,7 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP if (handler == nullptr) { return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } - auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete)); handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } @@ -563,19 +564,23 @@ PersistenceEngine::createBucket(const Bucket &b, Context &) } -Result -PersistenceEngine::deleteBucket(const Bucket& b, Context&) +void +PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "deleteBucket(%s)", b.toString().c_str()); HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace()); - TransportLatch latch(snap.size()); - for (; snap.handlers().valid(); snap.handlers().next()) { + + auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete)); + while (snap.handlers().valid()) { IPersistenceHandler *handler = snap.handlers().get(); - handler->handleDeleteBucket(feedtoken::make(latch), b); + snap.handlers().next(); + if (snap.handlers().valid()) { + handler->handleDeleteBucket(feedtoken::make(transportContext), b); + } else { + handler->handleDeleteBucket(feedtoken::make(std::move(transportContext)), b); + } } - latch.await(); - return latch.getResult(); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 0aeb3e16351..94331ac2cd6 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -103,7 +103,7 @@ public: Result initialize() override; BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override; - Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; + void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override; BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; @@ -115,7 +115,7 @@ public: Result destroyIterator(IteratorId, Context&) override; Result createBucket(const Bucket &bucketId, Context &) override ; - Result deleteBucket(const Bucket&, Context&) override; + void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override; BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp index 4cc5fb0b5db..5d1e1318b04 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp @@ -29,6 +29,8 @@ using search::AttributeVector; using search::IAttributeManager; using search::NotImplementedAttribute; using vespalib::ISequencedTaskExecutor; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; using vespa::config::search::ImportedFieldsConfig; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h index 761b9407fe5..522cdf83477 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h @@ -21,15 +21,17 @@ namespace search::attribute { namespace vespa::config::search::internal { class InternalImportedFieldsType; } namespace vespalib { - class ISequencedTaskExecutor; + +class ISequencedTaskExecutor; +class MonitoredRefCount; } + namespace proton { class IDocumentDBReference; class IDocumentDBReferenceRegistry; class ImportedAttributesRepo; class GidToLidChangeRegistrator; -class MonitoredRefCount; /** * Class that for a given document db resolves all references to parent document dbs: @@ -42,7 +44,7 @@ private: const document::DocumentType &_thisDocType; const ImportedFieldsConfig &_importedFieldsCfg; const document::DocumentType &_prevThisDocType; - MonitoredRefCount &_refCount; + vespalib::MonitoredRefCount &_refCount; vespalib::ISequencedTaskExecutor &_attributeFieldWriter; bool _useReferences; std::map<vespalib::string, std::unique_ptr<GidToLidChangeRegistrator>> _registrators; @@ -61,7 +63,7 @@ public: const document::DocumentType &thisDocType, const ImportedFieldsConfig &importedFieldsCfg, const document::DocumentType &prevThisDocType, - MonitoredRefCount &refCount, + vespalib::MonitoredRefCount &refCount, vespalib::ISequencedTaskExecutor &attributeFieldWriter, bool useReferences); ~DocumentDBReferenceResolver() override; diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index c7e716b4173..427cbab2a14 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -3,6 +3,8 @@ #include "gid_to_lid_change_listener.h" #include <future> +using vespalib::RetainGuard; + namespace proton { GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h index 23bb32af87a..28e9684ed86 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h @@ -4,8 +4,8 @@ #include "i_gid_to_lid_change_listener.h" #include <vespa/searchlib/attribute/reference_attribute.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> namespace proton { @@ -18,14 +18,14 @@ class GidToLidChangeListener : public IGidToLidChangeListener vespalib::ISequencedTaskExecutor &_attributeFieldWriter; vespalib::ISequencedTaskExecutor::ExecutorId _executorId; std::shared_ptr<search::attribute::ReferenceAttribute> _attr; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; vespalib::string _name; vespalib::string _docTypeName; public: GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, std::shared_ptr<search::attribute::ReferenceAttribute> attr, - RetainGuard refCount, + vespalib::RetainGuard refCount, const vespalib::string &name, const vespalib::string &docTypeName); ~GidToLidChangeListener() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp index d6602e18c81..c15be9336fe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp @@ -98,10 +98,10 @@ BucketHandler::handleListBuckets(IBucketIdListResultHandler &resultHandler) void BucketHandler::handleSetCurrentState(const BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) + std::shared_ptr<IGenericResultHandler> resultHandlerSP) { - _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandlerP = &resultHandler]() { - performSetCurrentState(bucketId, newState, resultHandlerP); + _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandler = std::move(resultHandlerSP)]() { + performSetCurrentState(bucketId, newState, resultHandler.get()); })); } diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h index 927744e1b8e..7f44d2ebd71 100644 --- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h @@ -55,7 +55,7 @@ public: void handleListBuckets(IBucketIdListResultHandler &resultHandler); void handleSetCurrentState(const document::BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler); + std::shared_ptr<IGenericResultHandler> resultHandler); void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler); void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler); diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index c70928cd5e8..386d8367e52 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -25,6 +25,7 @@ using document::BucketId; using storage::spi::BucketInfo; using storage::spi::Bucket; using proton::bucketdb::BucketMover; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; using vespalib::Trinary; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 66d24efb466..d4438fcd411 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -10,7 +10,7 @@ #include "maintenancedocumentsubdb.h" #include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace storage::spi { struct BucketExecutor; } @@ -59,7 +59,7 @@ private: using Movers = std::vector<BucketMoverSP>; using GuardedMoveOps = BucketMover::GuardedMoveOps; std::shared_ptr<IBucketStateCalculator> _calc; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; IDocumentMoveHandler &_moveHandler; IBucketModifiedHandler &_modifiedHandler; IThreadService &_master; @@ -80,7 +80,7 @@ private: IDiskMemUsageNotifier &_diskMemUsageNotifier; BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, @@ -115,7 +115,7 @@ private: public: static std::shared_ptr<BucketMoveJob> create(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 936ba812a44..014bba11f83 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -19,13 +19,13 @@ #include "threading_service_config.h" #include <vespa/searchcore/proton/attribute/attribute_usage_filter.h> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h> #include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/index/indexmanager.h> #include <vespa/searchlib/docstore/cachestats.h> #include <vespa/searchlib/transactionlog/syncproxy.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/varholder.h> #include <mutex> #include <condition_variable> @@ -111,7 +111,7 @@ private: DocumentDBTaggedMetrics _metrics; std::unique_ptr<metrics::UpdateHook> _metricsHook; vespalib::VarHolder<IFeedView::SP> _feedView; - MonitoredRefCount _refCount; + vespalib::MonitoredRefCount _refCount; bool _syncFeedViewEnabled; IDocumentDBOwner &_owner; storage::spi::BucketExecutor &_bucketExecutor; @@ -381,7 +381,7 @@ public: /** * Reference counting */ - RetainGuard retain() { return RetainGuard(_refCount); } + vespalib::RetainGuard retain() { return vespalib::RetainGuard(_refCount); } bool getDelayedConfig() const { return _state.getDelayedConfig(); } void replayConfig(SerialNum serialNum) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h index fe937a35db7..2098466a2c0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/flushengine/iflushhandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -13,7 +13,7 @@ class FlushHandlerProxy : public IFlushHandler { private: std::shared_ptr<DocumentDB> _documentDB; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: FlushHandlerProxy(const std::shared_ptr<DocumentDB> &documentDB); diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index efe92a11f02..a5c1d1fc2c9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -26,6 +26,7 @@ using search::DocumentMetaData; using search::LidUsageStats; using storage::spi::makeBucketTask; using storage::spi::Bucket; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; namespace proton::lidspace { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h index 717c19f093d..917ff12be4a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h @@ -6,9 +6,9 @@ #include "document_db_maintenance_config.h" #include "i_disk_mem_usage_listener.h" #include "iclusterstatechangedhandler.h" -#include <vespa/searchlib/common/idocumentmetastore.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/document/bucket/bucketspace.h> +#include <vespa/searchlib/common/idocumentmetastore.h> +#include <vespa/vespalib/util/retain_guard.h> #include <atomic> namespace storage::spi { struct BucketExecutor; } @@ -50,7 +50,7 @@ private: bool _shouldCompactLidSpace; IThreadService &_master; BucketExecutor &_bucketExecutor; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; document::BucketSpace _bucketSpace; bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; @@ -68,7 +68,7 @@ private: class MoveTask; CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, @@ -81,7 +81,7 @@ private: public: static std::shared_ptr<CompactionJob> create(const DocumentDBLidSpaceCompactionConfig &config, - RetainGuard dbRetainer, + vespalib::RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 2adcdbab217..c4826bba8ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -14,6 +14,7 @@ LOG_SETUP(".proton.server.maintenancecontroller"); using document::BucketId; using vespalib::Executor; +using vespalib::MonitoredRefCount; using vespalib::makeLambdaTask; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index bddd58a349a..8c8cc3e2d43 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -5,14 +5,17 @@ #include "maintenancedocumentsubdb.h" #include "i_maintenance_job.h" #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/util/scheduledexecutor.h> #include <mutex> namespace vespalib { - class Timer; - class Executor; + +class Executor; +class MonitoredRefCount; +class Timer; + } namespace searchcorespi::index { struct IThreadService; } @@ -20,7 +23,6 @@ namespace proton { class MaintenanceJobRunner; class DocumentDBMaintenanceConfig; -class MonitoredRefCount; /** * Class that controls the bucket moving between ready and notready sub databases @@ -36,7 +38,7 @@ public: using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName); + MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName); ~MaintenanceController(); void registerJobInMasterThread(IMaintenanceJob::UP job); @@ -70,14 +72,14 @@ public: const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; } IThreadService & masterThread() { return _masterThread; } const DocTypeName & getDocTypeName() const { return _docTypeName; } - RetainGuard retainDB() { return RetainGuard(_refCount); } + vespalib::RetainGuard retainDB() { return vespalib::RetainGuard(_refCount); } private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; IThreadService &_masterThread; vespalib::Executor &_defaultExecutor; - MonitoredRefCount &_refCount; + vespalib::MonitoredRefCount &_refCount; MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index 3d464cced5b..bec9197501b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -69,9 +69,9 @@ PersistenceHandlerProxy::handleSetClusterState(const storage::spi::ClusterState void PersistenceHandlerProxy::handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) + std::shared_ptr<IGenericResultHandler> resultHandler) { - _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, resultHandler); + _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, std::move(resultHandler)); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index 96bfbe18423..5c35434aae8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -18,7 +18,7 @@ private: FeedHandler &_feedHandler; BucketHandler &_bucketHandler; ClusterStateHandler &_clusterStateHandler; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: explicit PersistenceHandlerProxy(std::shared_ptr<DocumentDB> documentDB); @@ -40,7 +40,7 @@ public: void handleSetClusterState(const storage::spi::ClusterState &calc, IGenericResultHandler &resultHandler) override; void handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState, - IGenericResultHandler &resultHandler) override; + std::shared_ptr<IGenericResultHandler> resultHandler) override; void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) override; void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index c266a604e33..6fe1947bd1d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -16,6 +16,7 @@ using document::BucketId; using storage::spi::Timestamp; using storage::spi::Bucket; using vespalib::IDestructorCallback; +using vespalib::RetainGuard; using vespalib::makeLambdaTask; namespace proton { diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h index 5c52991d0c3..6198f588a32 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h @@ -3,9 +3,9 @@ #include "blockable_maintenance_job.h" #include "document_db_maintenance_config.h" -#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <persistence/spi/types.h> #include <vespa/document/bucket/bucketspace.h> +#include <vespa/vespalib/util/retain_guard.h> #include <atomic> namespace storage::spi { struct BucketExecutor; } @@ -36,7 +36,7 @@ private: IThreadService &_master; BucketExecutor &_bucketExecutor; const vespalib::string _docTypeName; - RetainGuard _dbRetainer; + vespalib::RetainGuard _dbRetainer; const vespalib::duration _cfgAgeLimit; const uint32_t _subDbId; const document::BucketSpace _bucketSpace; @@ -45,14 +45,14 @@ private: void remove(uint32_t lid, const RawDocumentMetaData & meta); - PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, + PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor); bool run() override; public: static std::shared_ptr<PruneRemovedDocumentsJob> - create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, + create(const Config &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor) { diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h index 4eaf15b3b1c..f7c52ed4baa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h @@ -3,7 +3,7 @@ #pragma once #include <vespa/searchcore/proton/summaryengine/isearchhandler.h> -#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> namespace proton { @@ -13,7 +13,7 @@ class SearchHandlerProxy : public ISearchHandler { private: std::shared_ptr<DocumentDB> _documentDB; - RetainGuard _retainGuard; + vespalib::RetainGuard _retainGuard; public: SearchHandlerProxy(std::shared_ptr<DocumentDB> documentDB); diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d521975e0cb..a1263c9433b 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) pending_message_tracker().insert(msg); } { - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); + // TODO we might not want this particular behavior for merge operations either + MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3))); // Not blocked for exact node match. EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index e877f4601b7..971ff36c833 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) { EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1))); } +TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) { + RemoveBucketOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(1, 3))); + // In node target set + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120)); + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120)); + // Not in node target set + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120)); + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 25009156f18..f5531a134d0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { struct PendingMessage { uint32_t _msgType; + uint16_t _node; uint8_t _pri; - PendingMessage() : _msgType(UINT32_MAX), _pri(0) {} + constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {} - PendingMessage(uint32_t msgType, uint8_t pri) - : _msgType(msgType), _pri(pri) {} + constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept + : _msgType(msgType), _node(0), _pri(pri) {} - bool shouldCheck() const { return _msgType != UINT32_MAX; } + bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; } }; void enableClusterState(const lib::ClusterState& systemState) { @@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { IdealStateOperation::UP op(result.createOperation()); if (op.get()) { if (blocker.shouldCheck() - && op->shouldBlockThisOperation(blocker._msgType, - blocker._pri)) + && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri)) { return "BLOCKED"; } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index a174d305c27..5f1acf6e7da 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -22,6 +22,15 @@ } \ } +#define CHECK_ERROR_ASYNC(className, failType, onError) \ + { \ + Guard guard(_lock); \ + if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ + onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ + return; \ + } \ + } + namespace storage { namespace { @@ -172,13 +181,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, return _spi.destroyIterator(iterId, context); } -spi::Result -PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket, - spi::Context& context) +void +PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, + spi::OperationComplete::UP operationComplete) { LOG_SPI("deleteBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET); - return _spi.deleteBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); + _spi.deleteBucketAsync(bucket, context, std::move(operationComplete)); } spi::Result diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index d90fa7b2eaa..64828a2a3ee 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -103,7 +103,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index c51357cacd1..c3caac7121c 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -66,9 +66,9 @@ public: return PersistenceProviderWrapper::createBucket(bucket, ctx); } - spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_deleteBucketInvocations; - return PersistenceProviderWrapper::deleteBucket(bucket, ctx); + PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index fbe1c142b09..7f66d1effd5 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() { } bool -GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { +GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const { return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 2e010a61bde..f51739242b7 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -21,7 +21,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "garbagecollection"; }; Type getType() const override { return GARBAGE_COLLECTION; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp index b1231fafcd9..744b24b593e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp @@ -138,8 +138,7 @@ public: bool check(uint32_t messageType, uint16_t node, uint8_t priority) override { - (void) node; - if (op.shouldBlockThisOperation(messageType, priority)) { + if (op.shouldBlockThisOperation(messageType, node, priority)) { blocked = true; return false; } @@ -232,6 +231,7 @@ IdealStateOperation::toString() const bool IdealStateOperation::shouldBlockThisOperation(uint32_t messageType, + [[maybe_unused]] uint16_t node, uint8_t) const { for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h index d4dc4e405df..f8f35afe821 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h @@ -217,7 +217,7 @@ public: /** * Should return true if the given message type should block this operation. */ - virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const; + virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const; protected: friend struct IdealStateManagerTest; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7cfe4172b2c..f951a880e5d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{ } -bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const { +bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const { for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) { if (messageType == blocking_type) { return true; } } - return IdealStateOperation::shouldBlockThisOperation(messageType, pri); + return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri); } bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx, diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 1bca1f7389f..832c0f99681 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -48,7 +48,7 @@ public: const document::BucketId&, MergeLimiter&, std::vector<MergeMetaData>&); - bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override; + bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override; bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override; private: static void addIdealNodes( diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index 9a57722dc7e..25cae5b9979 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender) bool RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg) { - api::DeleteBucketReply* rep = - dynamic_cast<api::DeleteBucketReply*>(msg.get()); + auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get()); uint16_t node = _tracker.handleReply(*rep); @@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha } bool -RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const +RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const { - return true; + // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded + // number in the worst case), so a simple linear scan suffices. + for (uint16_t node : getNodes()) { + if (target_node == node) { + return true; + } + } + return false; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h index a0d496f948a..5e0922d5685 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h @@ -30,7 +30,7 @@ public: void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; const char* getName() const override { return "remove"; }; Type getType() const override { return DELETE_BUCKET; } - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 649503cf0f5..6f3924535ef 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op bool SplitOperation::shouldBlockThisOperation(uint32_t msgType, + [[maybe_unused]] uint16_t node, uint8_t pri) const { if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h index ee957309088..6a268155fc8 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h @@ -21,7 +21,7 @@ public: const char* getName() const override { return "split"; }; Type getType() const override { return SPLIT_BUCKET; } bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override; - bool shouldBlockThisOperation(uint32_t, uint8_t) const override; + bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override; protected: MessageTracker _tracker; diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 4a28e650fac..d150f5600e5 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -3,11 +3,16 @@ #include "asynchandler.h" #include "persistenceutil.h" #include "testandsethelper.h" +#include "bucketownershipnotifier.h" #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/update/documentupdate.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/log/log.h> +LOG_SETUP(".storage.persistence.asynchandler"); + namespace storage { namespace { @@ -86,12 +91,26 @@ private: vespalib::ISequencedTaskExecutor::ExecutorId _executorId; }; +bool +bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { + // Don't check document sizes, as background moving of documents in Proton + // may trigger a change in size without any mutations taking place. This will + // only take place when a document being moved was fed _prior_ to the change + // where Proton starts reporting actual document sizes, and will eventually + // converge to a stable value. But for now, ignore it to prevent false positive + // error logs and non-deleted buckets. + return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); } + +} + AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi, + BucketOwnershipNotifier &bucketOwnershipNotifier, vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory) : _env(env), _spi(spi), + _bucketOwnershipNotifier(bucketOwnershipNotifier), _sequencedExecutor(executor), _bucketIdFactory(bucketIdFactory) {} @@ -135,6 +154,79 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons } MessageTracker::UP +AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const +{ + tracker->setMetric(_env._metrics.deleteBuckets); + LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); + if (_env._fileStorHandler.isMerging(cmd.getBucket())) { + _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); + } + spi::Bucket bucket(cmd.getBucket()); + if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { + return tracker; + } + + auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) { + // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric + (void) ignored; + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket"); + if (entry.exist() && entry->getMetaCount() > 0) { + LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " + "active operation when delete bucket was queued. " + "Updating bucket database to keep it in sync with file. " + "Cannot delete bucket from bucket database at this " + "point, as it can have been intentionally recreated " + "after delete bucket had been sent", + bucket.getBucketId().toString().c_str()); + api::BucketInfo info(0, 0, 0); + // Only set document counts/size; retain ready/active state. + info.setReady(entry->getBucketInfo().isReady()); + info.setActive(entry->getBucketInfo().isActive()); + + entry->setBucketInfo(info); + entry.write(); + } + tracker->sendReply(); + }); + _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return tracker; +} + +MessageTracker::UP +AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const +{ + trackerUP->setMetric(_env._metrics.setBucketStates); + + //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); + spi::Bucket bucket(cmd.getBucket()); + bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); + spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); + + auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket, + notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable { + if (tracker->checkForError(*response)) { + StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace())); + StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState"); + if (entry.exist()) { + entry->info.setActive(newState == spi::BucketInfo::ACTIVE); + notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); + entry.write(); + } else { + LOG(warning, "Got OK setCurrentState result from provider for %s, " + "but bucket has disappeared from service layer database", + cmd.getBucketId().toString().c_str()); + } + tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); + } + tracker->sendReply(); + }); + _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); + return trackerUP; +} + +MessageTracker::UP AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const { MessageTracker & tracker = *trackerUP; @@ -233,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra return true; } +bool +AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const +{ + spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); + if (result.hasError()) { + LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", + bucket.toString().c_str(), result.getErrorMessage().c_str()); + return false; + } + api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); + // Don't check meta fields or active/ready fields since these are not + // that important and ready may change under the hood in a race with + // getModifiedBuckets(). If bucket is empty it means it has already + // been deleted by a racing split/join. + if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { + LOG(error, + "Service layer bucket database and provider out of sync before " + "deleting bucket %s! Service layer db had %s while provider says " + "bucket has %s. Deletion has been rejected to ensure data is not " + "lost, but bucket may remain out of sync until service has been " + "restarted.", + bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); + return false; + } + return true; +} + } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index 23f3605dca1..4f5c242570c 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -14,6 +14,7 @@ namespace spi { class Context; } class PersistenceUtil; +class BucketOwnershipNotifier; /** * Handle async operations that uses a sequenced executor. @@ -21,19 +22,23 @@ class PersistenceUtil; */ class AsyncHandler : public Types { public: - AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor, - const document::BucketIdFactory & bucketIdFactory); + AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &, + vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory); MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; + MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; static bool is_async_message(api::MessageType::Id type_id) noexcept; private: + bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, spi::Context & context, bool missingDocumentImpliesMatch = false) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; + BucketOwnershipNotifier & _bucketOwnershipNotifier; vespalib::ISequencedTaskExecutor & _sequencedExecutor; const document::BucketIdFactory & _bucketIdFactory; }; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 5315d3ec0bc..297185ac54c 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -20,7 +20,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen _mergeHandler(_env, provider, component.cluster_context(), _clock, cfg.bucketMergeChunkSize, cfg.commonMergeChainOptimalizationMinimumSize), - _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()), + _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider) { @@ -45,7 +45,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::CREATEBUCKET_ID: return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker)); case api::MessageType::DELETEBUCKET_ID: - return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker)); case api::MessageType::JOINBUCKETS_ID: return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker)); case api::MessageType::SPLITBUCKET_ID: @@ -62,7 +62,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr case api::MessageType::APPLYBUCKETDIFF_ID: return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker)); case api::MessageType::SETBUCKETSTATE_ID: - return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); + return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 471d3d62a35..73033132e5d 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste return checkResult(_impl.setClusterState(bucketSpace, state)); } -spi::Result -ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) +void +ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.setActiveState(bucket, newState)); + onComplete->addResultHandler(this); + _impl.setActiveStateAsync(bucket, newState, std::move(onComplete)); } spi::BucketInfoResult @@ -130,10 +131,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont return checkResult(_impl.createBucket(bucket, context)); } -spi::Result -ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context) +void +ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) { - return checkResult(_impl.deleteBucket(bucket, context)); + onComplete->addResultHandler(this); + _impl.deleteBucketAsync(bucket, context, std::move(onComplete)); } spi::BucketIdListResult diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index d23cce9172a..6e7986ad65c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -41,7 +41,7 @@ public: spi::Result initialize() override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override; - spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override; + spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; @@ -54,7 +54,6 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; spi::Result createBucket(const spi::Bucket&, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override; @@ -67,6 +66,8 @@ public: void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; + void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override; private: template <typename ResultType> diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index b4c09f09e63..b4fe207e2e5 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa return document::FieldSet::SP(); } -bool -bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) { - // Don't check document sizes, as background moving of documents in Proton - // may trigger a change in size without any mutations taking place. This will - // only take place when a document being moved was fed _prior_ to the change - // where Proton starts reporting actual document sizes, and will eventually - // converge to a stable value. But for now, ignore it to prevent false positive - // error logs and non-deleted buckets. - return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount())); -} } SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi) : _env(env), @@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT return tracker; } -bool -SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const -{ - spi::BucketInfoResult result(_spi.getBucketInfo(bucket)); - if (result.hasError()) { - LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'", - bucket.toString().c_str(), result.getErrorMessage().c_str()); - return false; - } - api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo())); - // Don't check meta fields or active/ready fields since these are not - // that important and ready may change under the hood in a race with - // getModifiedBuckets(). If bucket is empty it means it has already - // been deleted by a racing split/join. - if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) { - LOG(error, - "Service layer bucket database and provider out of sync before " - "deleting bucket %s! Service layer db had %s while provider says " - "bucket has %s. Deletion has been rejected to ensure data is not " - "lost, but bucket may remain out of sync until service has been " - "restarted.", - bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str()); - return false; - } - return true; -} - -MessageTracker::UP -SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.deleteBuckets); - LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); - if (_env._fileStorHandler.isMerging(cmd.getBucket())) { - _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); - } - spi::Bucket bucket(cmd.getBucket()); - if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { - return tracker; - } - _spi.deleteBucket(bucket, tracker->context()); - StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); - { - StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); - if (entry.exist() && entry->getMetaCount() > 0) { - LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " - "active operation when delete bucket was queued. " - "Updating bucket database to keep it in sync with file. " - "Cannot delete bucket from bucket database at this " - "point, as it can have been intentionally recreated " - "after delete bucket had been sent", - cmd.getBucketId().toString().c_str()); - api::BucketInfo info(0, 0, 0); - // Only set document counts/size; retain ready/active state. - info.setReady(entry->getBucketInfo().isReady()); - info.setActive(entry->getBucketInfo().isActive()); - - entry->setBucketInfo(info); - entry.write(); - } - } - return tracker; -} - MessageTracker::UP SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const { diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h index 9f00f67684d..2cfbc7016c0 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.h +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h @@ -23,13 +23,11 @@ public: MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const; private: - bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const; const PersistenceUtil & _env; spi::PersistenceProvider & _spi; }; diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 0856f45c3ff..d5b44cc1911 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -5,7 +5,6 @@ #include "bucketownershipnotifier.h" #include "splitbitdetector.h" #include "messages.h" -#include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storageapi/message/bucket.h> @@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker } MessageTracker::UP -SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const -{ - tracker->setMetric(_env._metrics.setBucketStates); - NotificationGuard notifyGuard(_bucketOwnershipNotifier); - - LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str()); - spi::Bucket bucket(cmd.getBucket()); - bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE); - spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE); - - spi::Result result(_spi.setActiveState(bucket, newState)); - if (tracker->checkForError(result)) { - StorBucketDatabase::WrappedEntry - entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState"); - if (entry.exist()) { - entry->info.setActive(newState == spi::BucketInfo::ACTIVE); - notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info); - entry.write(); - } else { - LOG(warning, "Got OK setCurrentState result from provider for %s, " - "but bucket has disappeared from service layer database", - cmd.getBucketId().toString().c_str()); - } - - tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd)); - } - - return tracker; -} - -MessageTracker::UP SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const { tracker->setMetric(_env._metrics.recheckBucketInfo); diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h index ddfa22b154c..4521e520ee9 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.h +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h @@ -21,7 +21,6 @@ public: SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &, BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization); MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const; - MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const; private: diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 9e2917775f0..6115c21623d 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -38,6 +38,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT memoryusage.cpp mmap_file_allocator.cpp mmap_file_allocator_factory.cpp + monitored_refcount.cpp printable.cpp priority_queue.cpp random.cpp diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp index 97615abba31..4376e26bb66 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp @@ -3,7 +3,7 @@ #include "monitored_refcount.h" #include <cassert> -namespace proton { +namespace vespalib { MonitoredRefCount::MonitoredRefCount() : _lock(), @@ -41,4 +41,4 @@ MonitoredRefCount::waitForZeroRefCount() _cv.wait(guard, [this] { return (_refCount == 0u); }); } -} // namespace proton +} diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/monitored_refcount.h new file mode 100644 index 00000000000..465284b6fd3 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <mutex> +#include <condition_variable> + +namespace vespalib { + +class RetainGuard; +/* + * Class containing a reference count that can be waited on to become zero. + * Typically ancestor or member of a class that has to be careful of when + * portions object can be properly torn down before destruction itself. + */ +class MonitoredRefCount +{ + std::mutex _lock; + std::condition_variable _cv; + uint32_t _refCount; + void retain() noexcept; + void release() noexcept; + friend RetainGuard; +public: + MonitoredRefCount(); + virtual ~MonitoredRefCount(); + void waitForZeroRefCount(); +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/retain_guard.h index 9eb713b1220..090f3ce75cf 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h +++ b/vespalib/src/vespa/vespalib/util/retain_guard.h @@ -1,31 +1,16 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #pragma once -#include <mutex> -#include <condition_variable> +#include "monitored_refcount.h" -namespace proton { +namespace vespalib { -class RetainGuard; /* - * Class containing a reference count that can be waited on to become zero. - * Typically ancestor or member of a class that has to be careful of when - * portions object can be properly torn down before destruction itself. + * Class containing a reference to a monitored reference count, + * intended to block teardown of the class owning the monitored + * reference count. */ -class MonitoredRefCount -{ - std::mutex _lock; - std::condition_variable _cv; - uint32_t _refCount; - void retain() noexcept; - void release() noexcept; - friend RetainGuard; -public: - MonitoredRefCount(); - virtual ~MonitoredRefCount(); - void waitForZeroRefCount(); -}; - class RetainGuard { public: RetainGuard(MonitoredRefCount & refCount) noexcept @@ -57,4 +42,4 @@ private: MonitoredRefCount * _refCount; }; -} // namespace proton +} |