summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--metrics/src/vespa/metrics/metricvalueset.h2
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java4
-rw-r--r--node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java2
-rw-r--r--node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java4
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp6
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h2
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp5
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h27
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h3
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h4
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp3
-rw-r--r--searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp3
-rw-r--r--searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp24
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h4
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp4
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp12
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp19
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp85
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h5
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp74
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/monitored_refcount.cpp (renamed from searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp)4
-rw-r--r--vespalib/src/vespa/vespalib/util/monitored_refcount.h29
-rw-r--r--vespalib/src/vespa/vespalib/util/retain_guard.h (renamed from searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h)29
63 files changed, 296 insertions, 234 deletions
diff --git a/metrics/src/vespa/metrics/metricvalueset.h b/metrics/src/vespa/metrics/metricvalueset.h
index 82c01343472..bb2a7409ce7 100644
--- a/metrics/src/vespa/metrics/metricvalueset.h
+++ b/metrics/src/vespa/metrics/metricvalueset.h
@@ -68,7 +68,7 @@ public:
ValueClass getValues() const;
/**
- * Get the current values from the metric. This function should not be
+ * Set the current values for the metric. This function should not be
* called in parallel. Only call it from a single thread or use external
* locking. If it returns false, it means the metric have just been reset.
* In which case, redo getValues(), apply the update again, and call
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
index 59f5e0f3f40..0a9496be0a6 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/maintenance/servicedump/VespaServiceDumperImpl.java
@@ -100,7 +100,9 @@ public class VespaServiceDumperImpl implements VespaServiceDumper {
unixPathDirectory.deleteRecursively();
}
context.log(log, Level.INFO, "Creating '" + unixPathDirectory +"'.");
- unixPathDirectory.createDirectory("rwxrwxrwx");
+ unixPathDirectory.createDirectory("rwxr-x---")
+ .setOwner(context.userNamespace().vespaUser())
+ .setGroup(context.userNamespace().vespaGroup());
URI destination = serviceDumpDestination(nodeSpec, createDumpId(request));
ProducerContext producerCtx = new ProducerContext(context, directory, request);
List<Artifact> producedArtifacts = new ArrayList<>();
diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java
index a44f90b164b..909c6c9cbc1 100644
--- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java
+++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemProvider.java
@@ -114,7 +114,6 @@ class ContainerFileSystemProvider extends FileSystemProvider {
// Only called when both 'source' and 'target' have 'this' as the FS provider
Path targetPathOnHost = pathOnHost(target);
provider(targetPathOnHost).copy(pathOnHost(source), targetPathOnHost, options);
- fixOwnerToContainerRoot(toContainerPath(target));
}
@Override
@@ -122,7 +121,6 @@ class ContainerFileSystemProvider extends FileSystemProvider {
// Only called when both 'source' and 'target' have 'this' as the FS provider
Path targetPathOnHost = pathOnHost(target);
provider(targetPathOnHost).move(pathOnHost(source), targetPathOnHost, options);
- fixOwnerToContainerRoot(toContainerPath(target));
}
@Override
diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java
index a5fc6a1373f..4e85052a176 100644
--- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java
+++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/fs/ContainerFileSystemTest.java
@@ -85,7 +85,7 @@ class ContainerFileSystemTest {
new UnixPath(destination).setOwnerId(500).setGroupId(200);
ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2"));
Files.copy(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING);
- assertOwnership(destination2, 0, 0, 10000, 11000);
+ assertOwnership(destination2, 500, 200, 10500, 11200);
}
@Test
@@ -116,7 +116,7 @@ class ContainerFileSystemTest {
new UnixPath(destination).setOwnerId(500).setGroupId(200);
ContainerPath destination2 = ContainerPath.fromPathInContainer(containerFs, Path.of("/dest2"));
Files.move(destination, destination2, StandardCopyOption.COPY_ATTRIBUTES, StandardCopyOption.REPLACE_EXISTING);
- assertOwnership(destination2, 0, 0, 10000, 11000);
+ assertOwnership(destination2, 500, 200, 10500, 11200);
}
@Test
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index ff9676da7d0..7793467040e 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -745,8 +745,8 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
return Result();
}
-Result
-DummyPersistence::deleteBucket(const Bucket& b, Context&)
+void
+DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "deleteBucket(%s)", b.toString().c_str());
@@ -756,7 +756,7 @@ DummyPersistence::deleteBucket(const Bucket& b, Context&)
assert(!_content[b]->_inUse);
}
_content.erase(b);
- return Result();
+ onComplete->onComplete(std::make_unique<Result>());
}
Result
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 9d93316d382..a9b611d131c 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -169,7 +169,7 @@ public:
Result destroyIterator(IteratorId, Context&) override;
Result createBucket(const Bucket&, Context&) override;
- Result deleteBucket(const Bucket&, Context&) override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index e423e0aaac5..6be0941d731 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -35,4 +35,9 @@ AbstractPersistenceProvider::setActiveStateAsync(const Bucket &, BucketInfo::Act
op->onComplete(std::make_unique<Result>());
}
+void
+AbstractPersistenceProvider::deleteBucketAsync(const Bucket &, Context &, OperationComplete::UP op) {
+ op->onComplete(std::make_unique<Result>());
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 5f8cf2fc171..472abeca161 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -14,39 +14,14 @@ namespace storage::spi {
class AbstractPersistenceProvider : public PersistenceProvider
{
public:
- /**
- * Default impl is empty.
- */
Result initialize() override { return Result(); };
-
- /**
- * Default impl empty.
- */
Result createBucket(const Bucket&, Context&) override { return Result(); }
-
- /**
- * Default impl is empty.
- */
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
-
- /**
- * Default impl is remove().
- */
RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override;
void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
-
- /**
- * Default impl empty.
- */
Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); }
-
- /**
- * Default impl empty.
- */
void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) override;
- /**
- * Default impl empty.
- */
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
};
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index bb819fc9e50..7da2ee58aa9 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat
}
Result
+PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ deleteBucketAsync(bucket, context, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 45ca49435a7..99e80cc197a 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -342,7 +342,8 @@ struct PersistenceProvider
* After this operation has succeeded, a restart of the provider should
* not yield the bucket in getBucketList().
*/
- virtual Result deleteBucket(const Bucket&, Context&) = 0;
+ Result deleteBucket(const Bucket&, Context&);
+ virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0;
/**
* This function is called continuously by the service layer. It allows the
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index e2f2f21f596..2c21a30396d 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -23,6 +23,8 @@ using storage::spi::BucketInfo;
using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
using MoveOperationVector = std::vector<MoveOperation>;
using storage::spi::dummy::DummyBucketExecutor;
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
using vespalib::ThreadStackExecutor;
struct ControllerFixtureBase : public ::testing::Test
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
index c9b009e4a17..13955953eb5 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
@@ -6,6 +6,8 @@
#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
+using vespalib::RetainGuard;
+
using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
struct MyDirectJobRunner : public IMaintenanceJobRunner {
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index 774acd0071a..14f2ff42dbe 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -2,14 +2,14 @@
#include "lid_space_common.h"
#include <vespa/searchcore/proton/server/blockable_maintenance_job.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/searchcorespi/index/i_thread_service.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/vespalib/gtest/gtest.h>
namespace storage::spi::dummy { class DummyBucketExecutor; }
struct JobTestBase : public ::testing::Test {
- MonitoredRefCount _refCount;
+ vespalib::MonitoredRefCount _refCount;
test::ClusterStateHandler _clusterStateHandler;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor;
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 1463be8cdbd..71ae26180ad 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -6,7 +6,6 @@
#include <vespa/searchcore/proton/attribute/i_attribute_manager.h>
#include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h>
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/common/transient_resource_usage_provider.h>
#include <vespa/searchcore/proton/documentmetastore/operation_listener.h>
#include <vespa/searchcore/proton/documentmetastore/documentmetastore.h>
@@ -42,6 +41,7 @@
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
@@ -68,6 +68,7 @@ using search::SerialNum;
using search::CommitParam;
using storage::spi::BucketInfo;
using storage::spi::Timestamp;
+using vespalib::MonitoredRefCount;
using vespalib::Slime;
using vespalib::makeLambdaTask;
using vespa::config::search::AttributesConfigBuilder;
diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
index a76c375bbfc..061a3fd4c32 100644
--- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
+++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
@@ -6,7 +6,6 @@
#include <vespa/document/datatype/referencedatatype.h>
#include <vespa/log/log.h>
#include <vespa/searchcore/proton/attribute/imported_attributes_repo.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/reference/document_db_reference_resolver.h>
#include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h>
#include <vespa/searchcore/proton/reference/i_document_db_reference.h>
@@ -19,6 +18,7 @@
#include <vespa/searchlib/attribute/reference_attribute.h>
#include <vespa/searchlib/common/i_gid_to_lid_mapper.h>
#include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/searchlib/test/mock_attribute_manager.h>
#include <vespa/vespalib/test/insertion_operators.h>
@@ -33,6 +33,7 @@ using proton::test::MockDocumentDBReference;
using search::attribute::test::MockAttributeManager;
using vespa::config::search::ImportedFieldsConfig;
using vespa::config::search::ImportedFieldsConfigBuilder;
+using vespalib::MonitoredRefCount;
using vespalib::SequencedTaskExecutor;
using vespalib::ISequencedTaskExecutor;
diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
index 668be4c2c2d..15281563b93 100644
--- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
+++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
@@ -2,12 +2,12 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/stllike/string.h>
#include <vespa/document/base/documentid.h>
-#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/reference/gid_to_lid_change_listener.h>
#include <vespa/searchlib/common/i_gid_to_lid_mapper_factory.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
+#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/searchlib/test/mock_gid_to_lid_mapping.h>
#include <map>
#include <vespa/log/log.h>
@@ -16,12 +16,13 @@ LOG_SETUP("gid_to_lid_change_listener_test");
using document::GlobalId;
using document::BucketId;
using document::DocumentId;
-using vespalib::GenerationHandler;
using search::attribute::Config;
using search::attribute::BasicType;
using search::attribute::Reference;
using search::attribute::ReferenceAttribute;
using search::attribute::test::MockGidToLidMapperFactory;
+using vespalib::MonitoredRefCount;
+using vespalib::GenerationHandler;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
index 07d749d8c4f..219a2ea43a4 100644
--- a/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/common/CMakeLists.txt
@@ -16,7 +16,6 @@ vespa_add_library(searchcore_pcommon STATIC
hw_info_sampler.cpp
indexschema_inspector.cpp
ipendinglidtracker.cpp
- monitored_refcount.cpp
operation_rate_tracker.cpp
pendinglidtracker.cpp
select_utils.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index e75c16ddef6..8ccb4863878 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -50,21 +50,22 @@ private:
*/
class OwningState : public State {
public:
- OwningState(std::unique_ptr<ITransport> transport)
+ OwningState(std::shared_ptr<ITransport> transport)
: State(*transport),
_owned(std::move(transport))
{}
~OwningState() override;
private:
- std::unique_ptr<ITransport> _owned;
+ std::shared_ptr<ITransport> _owned;
};
inline std::shared_ptr<State>
make(ITransport & latch) {
return std::make_shared<State>(latch);
}
+
inline std::shared_ptr<State>
-make(std::unique_ptr<ITransport> transport) {
+make(std::shared_ptr<ITransport> transport) {
return std::make_shared<OwningState>(std::move(transport));
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index cb26e80b3ff..114292d055d 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -364,7 +364,7 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do
return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR,
make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete));
handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc));
}
@@ -384,7 +384,7 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d
return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete));
handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did);
}
@@ -436,7 +436,7 @@ PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP
if (handler == nullptr) {
return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())));
}
- auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ auto transportContext = std::make_shared<AsyncTranportContext>(1, std::move(onComplete));
handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd));
}
@@ -564,19 +564,23 @@ PersistenceEngine::createBucket(const Bucket &b, Context &)
}
-Result
-PersistenceEngine::deleteBucket(const Bucket& b, Context&)
+void
+PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
{
ReadGuard rguard(_rwMutex);
LOG(spam, "deleteBucket(%s)", b.toString().c_str());
HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace());
- TransportLatch latch(snap.size());
- for (; snap.handlers().valid(); snap.handlers().next()) {
+
+ auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleDeleteBucket(feedtoken::make(latch), b);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleDeleteBucket(feedtoken::make(transportContext), b);
+ } else {
+ handler->handleDeleteBucket(feedtoken::make(std::move(transportContext)), b);
+ }
}
- latch.await();
- return latch.getResult();
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index e131cb13ae1..94331ac2cd6 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -115,7 +115,7 @@ public:
Result destroyIterator(IteratorId, Context&) override;
Result createBucket(const Bucket &bucketId, Context &) override ;
- Result deleteBucket(const Bucket&, Context&) override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
index 4cc5fb0b5db..5d1e1318b04 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
@@ -29,6 +29,8 @@ using search::AttributeVector;
using search::IAttributeManager;
using search::NotImplementedAttribute;
using vespalib::ISequencedTaskExecutor;
+using vespalib::MonitoredRefCount;
+using vespalib::RetainGuard;
using vespa::config::search::ImportedFieldsConfig;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h
index 761b9407fe5..522cdf83477 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h
+++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.h
@@ -21,15 +21,17 @@ namespace search::attribute {
namespace vespa::config::search::internal { class InternalImportedFieldsType; }
namespace vespalib {
- class ISequencedTaskExecutor;
+
+class ISequencedTaskExecutor;
+class MonitoredRefCount;
}
+
namespace proton {
class IDocumentDBReference;
class IDocumentDBReferenceRegistry;
class ImportedAttributesRepo;
class GidToLidChangeRegistrator;
-class MonitoredRefCount;
/**
* Class that for a given document db resolves all references to parent document dbs:
@@ -42,7 +44,7 @@ private:
const document::DocumentType &_thisDocType;
const ImportedFieldsConfig &_importedFieldsCfg;
const document::DocumentType &_prevThisDocType;
- MonitoredRefCount &_refCount;
+ vespalib::MonitoredRefCount &_refCount;
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
bool _useReferences;
std::map<vespalib::string, std::unique_ptr<GidToLidChangeRegistrator>> _registrators;
@@ -61,7 +63,7 @@ public:
const document::DocumentType &thisDocType,
const ImportedFieldsConfig &importedFieldsCfg,
const document::DocumentType &prevThisDocType,
- MonitoredRefCount &refCount,
+ vespalib::MonitoredRefCount &refCount,
vespalib::ISequencedTaskExecutor &attributeFieldWriter,
bool useReferences);
~DocumentDBReferenceResolver() override;
diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
index c7e716b4173..427cbab2a14 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
@@ -3,6 +3,8 @@
#include "gid_to_lid_change_listener.h"
#include <future>
+using vespalib::RetainGuard;
+
namespace proton {
GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter,
diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
index 23bb32af87a..28e9684ed86 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
+++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
@@ -4,8 +4,8 @@
#include "i_gid_to_lid_change_listener.h"
#include <vespa/searchlib/attribute/reference_attribute.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
namespace proton {
@@ -18,14 +18,14 @@ class GidToLidChangeListener : public IGidToLidChangeListener
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
std::shared_ptr<search::attribute::ReferenceAttribute> _attr;
- RetainGuard _retainGuard;
+ vespalib::RetainGuard _retainGuard;
vespalib::string _name;
vespalib::string _docTypeName;
public:
GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter,
std::shared_ptr<search::attribute::ReferenceAttribute> attr,
- RetainGuard refCount,
+ vespalib::RetainGuard refCount,
const vespalib::string &name,
const vespalib::string &docTypeName);
~GidToLidChangeListener() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index c70928cd5e8..386d8367e52 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -25,6 +25,7 @@ using document::BucketId;
using storage::spi::BucketInfo;
using storage::spi::Bucket;
using proton::bucketdb::BucketMover;
+using vespalib::RetainGuard;
using vespalib::makeLambdaTask;
using vespalib::Trinary;
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
index 66d24efb466..d4438fcd411 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
@@ -10,7 +10,7 @@
#include "maintenancedocumentsubdb.h"
#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
namespace storage::spi { struct BucketExecutor; }
@@ -59,7 +59,7 @@ private:
using Movers = std::vector<BucketMoverSP>;
using GuardedMoveOps = BucketMover::GuardedMoveOps;
std::shared_ptr<IBucketStateCalculator> _calc;
- RetainGuard _dbRetainer;
+ vespalib::RetainGuard _dbRetainer;
IDocumentMoveHandler &_moveHandler;
IBucketModifiedHandler &_modifiedHandler;
IThreadService &_master;
@@ -80,7 +80,7 @@ private:
IDiskMemUsageNotifier &_diskMemUsageNotifier;
BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc,
- RetainGuard dbRetainer,
+ vespalib::RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &modifiedHandler,
IThreadService & master,
@@ -115,7 +115,7 @@ private:
public:
static std::shared_ptr<BucketMoveJob>
create(const std::shared_ptr<IBucketStateCalculator> &calc,
- RetainGuard dbRetainer,
+ vespalib::RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &modifiedHandler,
IThreadService & master,
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index 936ba812a44..014bba11f83 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -19,13 +19,13 @@
#include "threading_service_config.h"
#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h>
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h>
#include <vespa/searchcore/proton/metrics/documentdb_tagged_metrics.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/index/indexmanager.h>
#include <vespa/searchlib/docstore/cachestats.h>
#include <vespa/searchlib/transactionlog/syncproxy.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <vespa/vespalib/util/varholder.h>
#include <mutex>
#include <condition_variable>
@@ -111,7 +111,7 @@ private:
DocumentDBTaggedMetrics _metrics;
std::unique_ptr<metrics::UpdateHook> _metricsHook;
vespalib::VarHolder<IFeedView::SP> _feedView;
- MonitoredRefCount _refCount;
+ vespalib::MonitoredRefCount _refCount;
bool _syncFeedViewEnabled;
IDocumentDBOwner &_owner;
storage::spi::BucketExecutor &_bucketExecutor;
@@ -381,7 +381,7 @@ public:
/**
* Reference counting
*/
- RetainGuard retain() { return RetainGuard(_refCount); }
+ vespalib::RetainGuard retain() { return vespalib::RetainGuard(_refCount); }
bool getDelayedConfig() const { return _state.getDelayedConfig(); }
void replayConfig(SerialNum serialNum) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
index fe937a35db7..2098466a2c0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
@@ -3,7 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/flushengine/iflushhandler.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
namespace proton {
@@ -13,7 +13,7 @@ class FlushHandlerProxy : public IFlushHandler
{
private:
std::shared_ptr<DocumentDB> _documentDB;
- RetainGuard _retainGuard;
+ vespalib::RetainGuard _retainGuard;
public:
FlushHandlerProxy(const std::shared_ptr<DocumentDB> &documentDB);
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index efe92a11f02..a5c1d1fc2c9 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -26,6 +26,7 @@ using search::DocumentMetaData;
using search::LidUsageStats;
using storage::spi::makeBucketTask;
using storage::spi::Bucket;
+using vespalib::RetainGuard;
using vespalib::makeLambdaTask;
namespace proton::lidspace {
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index 717c19f093d..917ff12be4a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -6,9 +6,9 @@
#include "document_db_maintenance_config.h"
#include "i_disk_mem_usage_listener.h"
#include "iclusterstatechangedhandler.h"
-#include <vespa/searchlib/common/idocumentmetastore.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/searchlib/common/idocumentmetastore.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <atomic>
namespace storage::spi { struct BucketExecutor; }
@@ -50,7 +50,7 @@ private:
bool _shouldCompactLidSpace;
IThreadService &_master;
BucketExecutor &_bucketExecutor;
- RetainGuard _dbRetainer;
+ vespalib::RetainGuard _dbRetainer;
document::BucketSpace _bucketSpace;
bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const;
@@ -68,7 +68,7 @@ private:
class MoveTask;
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
- RetainGuard dbRetainer,
+ vespalib::RetainGuard dbRetainer,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
IOperationStorer &opStorer,
IThreadService & master,
@@ -81,7 +81,7 @@ private:
public:
static std::shared_ptr<CompactionJob>
create(const DocumentDBLidSpaceCompactionConfig &config,
- RetainGuard dbRetainer,
+ vespalib::RetainGuard dbRetainer,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
IOperationStorer &opStorer,
IThreadService & master,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 2adcdbab217..c4826bba8ea 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -14,6 +14,7 @@ LOG_SETUP(".proton.server.maintenancecontroller");
using document::BucketId;
using vespalib::Executor;
+using vespalib::MonitoredRefCount;
using vespalib::makeLambdaTask;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index bddd58a349a..8c8cc3e2d43 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -5,14 +5,17 @@
#include "maintenancedocumentsubdb.h"
#include "i_maintenance_job.h"
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <vespa/vespalib/util/scheduledexecutor.h>
#include <mutex>
namespace vespalib {
- class Timer;
- class Executor;
+
+class Executor;
+class MonitoredRefCount;
+class Timer;
+
}
namespace searchcorespi::index { struct IThreadService; }
@@ -20,7 +23,6 @@ namespace proton {
class MaintenanceJobRunner;
class DocumentDBMaintenanceConfig;
-class MonitoredRefCount;
/**
* Class that controls the bucket moving between ready and notready sub databases
@@ -36,7 +38,7 @@ public:
using UP = std::unique_ptr<MaintenanceController>;
enum class State {INITIALIZING, STARTED, PAUSED, STOPPING};
- MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName);
+ MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName);
~MaintenanceController();
void registerJobInMasterThread(IMaintenanceJob::UP job);
@@ -70,14 +72,14 @@ public:
const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; }
IThreadService & masterThread() { return _masterThread; }
const DocTypeName & getDocTypeName() const { return _docTypeName; }
- RetainGuard retainDB() { return RetainGuard(_refCount); }
+ vespalib::RetainGuard retainDB() { return vespalib::RetainGuard(_refCount); }
private:
using Mutex = std::mutex;
using Guard = std::lock_guard<Mutex>;
IThreadService &_masterThread;
vespalib::Executor &_defaultExecutor;
- MonitoredRefCount &_refCount;
+ vespalib::MonitoredRefCount &_refCount;
MaintenanceDocumentSubDB _readySubDB;
MaintenanceDocumentSubDB _remSubDB;
MaintenanceDocumentSubDB _notReadySubDB;
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
index f4d6175391c..5c35434aae8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
@@ -3,7 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
namespace proton {
@@ -18,7 +18,7 @@ private:
FeedHandler &_feedHandler;
BucketHandler &_bucketHandler;
ClusterStateHandler &_clusterStateHandler;
- RetainGuard _retainGuard;
+ vespalib::RetainGuard _retainGuard;
public:
explicit PersistenceHandlerProxy(std::shared_ptr<DocumentDB> documentDB);
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
index c266a604e33..6fe1947bd1d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
@@ -16,6 +16,7 @@ using document::BucketId;
using storage::spi::Timestamp;
using storage::spi::Bucket;
using vespalib::IDestructorCallback;
+using vespalib::RetainGuard;
using vespalib::makeLambdaTask;
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
index 5c52991d0c3..6198f588a32 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
@@ -3,9 +3,9 @@
#include "blockable_maintenance_job.h"
#include "document_db_maintenance_config.h"
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <persistence/spi/types.h>
#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/vespalib/util/retain_guard.h>
#include <atomic>
namespace storage::spi { struct BucketExecutor; }
@@ -36,7 +36,7 @@ private:
IThreadService &_master;
BucketExecutor &_bucketExecutor;
const vespalib::string _docTypeName;
- RetainGuard _dbRetainer;
+ vespalib::RetainGuard _dbRetainer;
const vespalib::duration _cfgAgeLimit;
const uint32_t _subDbId;
const document::BucketSpace _bucketSpace;
@@ -45,14 +45,14 @@ private:
void remove(uint32_t lid, const RawDocumentMetaData & meta);
- PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore,
+ PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore,
uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName,
IPruneRemovedDocumentsHandler &handler, IThreadService & master,
BucketExecutor & bucketExecutor);
bool run() override;
public:
static std::shared_ptr<PruneRemovedDocumentsJob>
- create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId,
+ create(const Config &config, vespalib::RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId,
document::BucketSpace bucketSpace, const vespalib::string &docTypeName,
IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
index 4eaf15b3b1c..f7c52ed4baa 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
@@ -3,7 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/summaryengine/isearchhandler.h>
-#include <vespa/searchcore/proton/common/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
namespace proton {
@@ -13,7 +13,7 @@ class SearchHandlerProxy : public ISearchHandler
{
private:
std::shared_ptr<DocumentDB> _documentDB;
- RetainGuard _retainGuard;
+ vespalib::RetainGuard _retainGuard;
public:
SearchHandlerProxy(std::shared_ptr<DocumentDB> documentDB);
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d521975e0cb..a1263c9433b 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
pending_message_tracker().insert(msg);
}
{
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
+ // TODO we might not want this particular behavior for merge operations either
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3)));
// Not blocked for exact node match.
EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index e877f4601b7..971ff36c833 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) {
EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1)));
}
+TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) {
+ RemoveBucketOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(1, 3)));
+ // In node target set
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120));
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120));
+ // Not in node target set
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120));
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 25009156f18..f5531a134d0 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
struct PendingMessage
{
uint32_t _msgType;
+ uint16_t _node;
uint8_t _pri;
- PendingMessage() : _msgType(UINT32_MAX), _pri(0) {}
+ constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {}
- PendingMessage(uint32_t msgType, uint8_t pri)
- : _msgType(msgType), _pri(pri) {}
+ constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept
+ : _msgType(msgType), _node(0), _pri(pri) {}
- bool shouldCheck() const { return _msgType != UINT32_MAX; }
+ bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; }
};
void enableClusterState(const lib::ClusterState& systemState) {
@@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
IdealStateOperation::UP op(result.createOperation());
if (op.get()) {
if (blocker.shouldCheck()
- && op->shouldBlockThisOperation(blocker._msgType,
- blocker._pri))
+ && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri))
{
return "BLOCKED";
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index a174d305c27..5f1acf6e7da 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -22,6 +22,15 @@
} \
}
+#define CHECK_ERROR_ASYNC(className, failType, onError) \
+ { \
+ Guard guard(_lock); \
+ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
+ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
+ return; \
+ } \
+ }
+
namespace storage {
namespace {
@@ -172,13 +181,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
return _spi.destroyIterator(iterId, context);
}
-spi::Result
-PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket,
- spi::Context& context)
+void
+PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
+ spi::OperationComplete::UP operationComplete)
{
LOG_SPI("deleteBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET);
- return _spi.deleteBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
+ _spi.deleteBucketAsync(bucket, context, std::move(operationComplete));
}
spi::Result
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index d90fa7b2eaa..64828a2a3ee 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -103,7 +103,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index c51357cacd1..c3caac7121c 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -66,9 +66,9 @@ public:
return PersistenceProviderWrapper::createBucket(bucket, ctx);
}
- spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
++_deleteBucketInvocations;
- return PersistenceProviderWrapper::deleteBucket(bucket, ctx);
+ PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index fbe1c142b09..7f66d1effd5 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() {
}
bool
-GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const {
+GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const {
return true;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 2e010a61bde..f51739242b7 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -21,7 +21,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index b1231fafcd9..744b24b593e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -138,8 +138,7 @@ public:
bool check(uint32_t messageType, uint16_t node, uint8_t priority) override
{
- (void) node;
- if (op.shouldBlockThisOperation(messageType, priority)) {
+ if (op.shouldBlockThisOperation(messageType, node, priority)) {
blocked = true;
return false;
}
@@ -232,6 +231,7 @@ IdealStateOperation::toString() const
bool
IdealStateOperation::shouldBlockThisOperation(uint32_t messageType,
+ [[maybe_unused]] uint16_t node,
uint8_t) const
{
for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index d4dc4e405df..f8f35afe821 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -217,7 +217,7 @@ public:
/**
* Should return true if the given message type should block this operation.
*/
- virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const;
+ virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const;
protected:
friend struct IdealStateManagerTest;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 7cfe4172b2c..f951a880e5d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{
}
-bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const {
+bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const {
for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) {
if (messageType == blocking_type) {
return true;
}
}
- return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
+ return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri);
}
bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 1bca1f7389f..832c0f99681 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -48,7 +48,7 @@ public:
const document::BucketId&, MergeLimiter&,
std::vector<MergeMetaData>&);
- bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
+ bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override;
bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
private:
static void addIdealNodes(
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 9a57722dc7e..25cae5b9979 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
bool
RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg)
{
- api::DeleteBucketReply* rep =
- dynamic_cast<api::DeleteBucketReply*>(msg.get());
+ auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
uint16_t node = _tracker.handleReply(*rep);
@@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha
}
bool
-RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const
+RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const
{
- return true;
+ // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded
+ // number in the worst case), so a simple linear scan suffices.
+ for (uint16_t node : getNodes()) {
+ if (target_node == node) {
+ return true;
+ }
+ }
+ return false;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
index a0d496f948a..5e0922d5685 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
@@ -30,7 +30,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "remove"; };
Type getType() const override { return DELETE_BUCKET; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 649503cf0f5..6f3924535ef 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op
bool
SplitOperation::shouldBlockThisOperation(uint32_t msgType,
+ [[maybe_unused]] uint16_t node,
uint8_t pri) const
{
if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index ee957309088..6a268155fc8 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -21,7 +21,7 @@ public:
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override;
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index d2bfabd2950..d150f5600e5 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -91,7 +91,19 @@ private:
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
};
+bool
+bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
+ // Don't check document sizes, as background moving of documents in Proton
+ // may trigger a change in size without any mutations taking place. This will
+ // only take place when a document being moved was fed _prior_ to the change
+ // where Proton starts reporting actual document sizes, and will eventually
+ // converge to a stable value. But for now, ignore it to prevent false positive
+ // error logs and non-deleted buckets.
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
+}
+
}
+
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
@@ -142,6 +154,47 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exist() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
{
trackerUP->setMetric(_env._metrics.setBucketStates);
@@ -154,9 +207,8 @@ AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrack
auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
if (tracker->checkForError(*response)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(),
- "handleSetBucketState");
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState");
if (entry.exist()) {
entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
@@ -273,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra
return true;
}
+bool
+AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
+{
+ spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
+ if (result.hasError()) {
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
+ return false;
+ }
+ api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
+ // Don't check meta fields or active/ready fields since these are not
+ // that important and ready may change under the hood in a race with
+ // getModifiedBuckets(). If bucket is empty it means it has already
+ // been deleted by a racing split/join.
+ if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
+ LOG(error,
+ "Service layer bucket database and provider out of sync before "
+ "deleting bucket %s! Service layer db had %s while provider says "
+ "bucket has %s. Deletion has been rejected to ensure data is not "
+ "lost, but bucket may remain out of sync until service has been "
+ "restarted.",
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
+ return false;
+ }
+ return true;
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index bf37becb2c3..4f5c242570c 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -29,8 +29,10 @@ public:
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 3d9b359f506..297185ac54c 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -45,7 +45,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::CREATEBUCKET_ID:
return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ab5066576fd..73033132e5d 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -131,10 +131,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont
return checkResult(_impl.createBucket(bucket, context));
}
-spi::Result
-ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.deleteBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 9361cd1d19d..6e7986ad65c 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,7 +41,7 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
@@ -54,7 +54,6 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -67,6 +66,8 @@ public:
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4c09f09e63..b4fe207e2e5 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa
return document::FieldSet::SP();
}
-bool
-bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
- // Don't check document sizes, as background moving of documents in Proton
- // may trigger a change in size without any mutations taking place. This will
- // only take place when a document being moved was fed _prior_ to the change
- // where Proton starts reporting actual document sizes, and will eventually
- // converge to a stable value. But for now, ignore it to prevent false positive
- // error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
-}
}
SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
: _env(env),
@@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT
return tracker;
}
-bool
-SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
-{
- spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
- if (result.hasError()) {
- LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(), result.getErrorMessage().c_str());
- return false;
- }
- api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
- // Don't check meta fields or active/ready fields since these are not
- // that important and ready may change under the hood in a race with
- // getModifiedBuckets(). If bucket is empty it means it has already
- // been deleted by a racing split/join.
- if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
- LOG(error,
- "Service layer bucket database and provider out of sync before "
- "deleting bucket %s! Service layer db had %s while provider says "
- "bucket has %s. Deletion has been rejected to ensure data is not "
- "lost, but bucket may remain out of sync until service has been "
- "restarted.",
- bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
- return false;
- }
- return true;
-}
-
-MessageTracker::UP
-SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.deleteBuckets);
- LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
- }
- spi::Bucket bucket(cmd.getBucket());
- if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
- return tracker;
- }
- _spi.deleteBucket(bucket, tracker->context());
- StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
- {
- StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
- if (entry.exist() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- cmd.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
- }
- return tracker;
-}
-
MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 9f00f67684d..2cfbc7016c0 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -23,13 +23,11 @@ public:
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
};
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 9e2917775f0..6115c21623d 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -38,6 +38,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
memoryusage.cpp
mmap_file_allocator.cpp
mmap_file_allocator_factory.cpp
+ monitored_refcount.cpp
printable.cpp
priority_queue.cpp
random.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp
index 97615abba31..4376e26bb66 100644
--- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp
+++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.cpp
@@ -3,7 +3,7 @@
#include "monitored_refcount.h"
#include <cassert>
-namespace proton {
+namespace vespalib {
MonitoredRefCount::MonitoredRefCount()
: _lock(),
@@ -41,4 +41,4 @@ MonitoredRefCount::waitForZeroRefCount()
_cv.wait(guard, [this] { return (_refCount == 0u); });
}
-} // namespace proton
+}
diff --git a/vespalib/src/vespa/vespalib/util/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/monitored_refcount.h
new file mode 100644
index 00000000000..465284b6fd3
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/monitored_refcount.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+
+namespace vespalib {
+
+class RetainGuard;
+/*
+ * Class containing a reference count that can be waited on to become zero.
+ * Typically ancestor or member of a class that has to be careful of when
+ * portions object can be properly torn down before destruction itself.
+ */
+class MonitoredRefCount
+{
+ std::mutex _lock;
+ std::condition_variable _cv;
+ uint32_t _refCount;
+ void retain() noexcept;
+ void release() noexcept;
+ friend RetainGuard;
+public:
+ MonitoredRefCount();
+ virtual ~MonitoredRefCount();
+ void waitForZeroRefCount();
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h b/vespalib/src/vespa/vespalib/util/retain_guard.h
index 9eb713b1220..090f3ce75cf 100644
--- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h
+++ b/vespalib/src/vespa/vespalib/util/retain_guard.h
@@ -1,31 +1,16 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
#pragma once
-#include <mutex>
-#include <condition_variable>
+#include "monitored_refcount.h"
-namespace proton {
+namespace vespalib {
-class RetainGuard;
/*
- * Class containing a reference count that can be waited on to become zero.
- * Typically ancestor or member of a class that has to be careful of when
- * portions object can be properly torn down before destruction itself.
+ * Class containing a reference to a monitored reference count,
+ * intended to block teardown of the class owning the monitored
+ * reference count.
*/
-class MonitoredRefCount
-{
- std::mutex _lock;
- std::condition_variable _cv;
- uint32_t _refCount;
- void retain() noexcept;
- void release() noexcept;
- friend RetainGuard;
-public:
- MonitoredRefCount();
- virtual ~MonitoredRefCount();
- void waitForZeroRefCount();
-};
-
class RetainGuard {
public:
RetainGuard(MonitoredRefCount & refCount) noexcept
@@ -57,4 +42,4 @@ private:
MonitoredRefCount * _refCount;
};
-} // namespace proton
+}