summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-25 14:43:14 +0200
committerGitHub <noreply@github.com>2021-10-25 14:43:14 +0200
commit1711c0cff4b3f43857bf054af7dcb63e13b8d2af (patch)
treebced3356fa5db157e42c94391123ba4ac0e16e64
parent78338d5a782bc464d3a8912c12c6839669f1b91b (diff)
parent90676b68765027c1df6c4a3f22530a45118b4f3c (diff)
Merge pull request #19719 from vespa-engine/balder/async-create-bucket-2
Balder/async create bucket 2
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp113
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h4
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h1
-rw-r--r--persistence/src/vespa/persistence/spi/catchresult.h5
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp20
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h4
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h4
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp6
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp26
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp276
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h4
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h1
21 files changed, 256 insertions, 266 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 6e4f38fe564..74fef13f141 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -34,12 +34,12 @@ BucketContent::BucketContent()
_inUse(false),
_outdatedInfo(true),
_active(false)
-{ }
+{}
+
BucketContent::~BucketContent() = default;
uint32_t
-BucketContent::computeEntryChecksum(const BucketEntry& e) const
-{
+BucketContent::computeEntryChecksum(const BucketEntry &e) const {
vespalib::crc_32_type checksummer;
uint64_t ts(e.entry->getTimestamp());
@@ -49,8 +49,7 @@ BucketContent::computeEntryChecksum(const BucketEntry& e) const
}
BucketChecksum
-BucketContent::updateRollingChecksum(uint32_t entryChecksum)
-{
+BucketContent::updateRollingChecksum(uint32_t entryChecksum) {
uint32_t checksum = _info.getChecksum();
checksum ^= entryChecksum;
if (checksum == 0) {
@@ -59,9 +58,8 @@ BucketContent::updateRollingChecksum(uint32_t entryChecksum)
return BucketChecksum(checksum);
}
-const BucketInfo&
-BucketContent::getBucketInfo() const
-{
+const BucketInfo &
+BucketContent::getBucketInfo() const {
if (!_outdatedInfo) {
return _info;
}
@@ -73,9 +71,9 @@ BucketContent::getBucketInfo() const
uint32_t totalSize = 0;
uint32_t checksum = 0;
- for (const BucketEntry & bucketEntry : _entries) {
- const DocEntry& entry(*bucketEntry.entry);
- const GlobalId& gid(bucketEntry.gid);
+ for (const BucketEntry &bucketEntry: _entries) {
+ const DocEntry &entry(*bucketEntry.entry);
+ const GlobalId &gid(bucketEntry.gid);
GidMapType::const_iterator gidIt(_gidMap.find(gid));
assert(gidIt != _gidMap.end());
@@ -114,17 +112,19 @@ BucketContent::getBucketInfo() const
namespace {
struct TimestampLess {
- bool operator()(const BucketEntry &bucketEntry, Timestamp t)
- { return bucketEntry.entry->getTimestamp() < t; }
- bool operator()(Timestamp t, const BucketEntry &bucketEntry)
- { return t < bucketEntry.entry->getTimestamp(); }
+ bool operator()(const BucketEntry &bucketEntry, Timestamp t) {
+ return bucketEntry.entry->getTimestamp() < t;
+ }
+
+ bool operator()(Timestamp t, const BucketEntry &bucketEntry) {
+ return t < bucketEntry.entry->getTimestamp();
+ }
};
} // namespace
bool
-BucketContent::hasTimestamp(Timestamp t) const
-{
+BucketContent::hasTimestamp(Timestamp t) const {
if (!_entries.empty() && _entries.back().entry->getTimestamp() < t) {
return false;
}
@@ -148,10 +148,9 @@ BucketContent::hasTimestamp(Timestamp t) const
*/
void
-BucketContent::insert(DocEntry::SP e)
-{
+BucketContent::insert(DocEntry::SP e) {
LOG(spam, "insert(%s)", e->toString().c_str());
- const DocumentId* docId(e->getDocumentId());
+ const DocumentId *docId(e->getDocumentId());
assert(docId != 0);
GlobalId gid(docId->getGlobalId());
GidMapType::iterator gidIt(_gidMap.find(gid));
@@ -160,22 +159,15 @@ BucketContent::insert(DocEntry::SP e)
_entries.back().entry->getTimestamp() < e->getTimestamp()) {
_entries.push_back(BucketEntry(e, gid));
} else {
- std::vector<BucketEntry>::iterator it =
- lower_bound(_entries.begin(),
- _entries.end(),
- e->getTimestamp(),
- TimestampLess());
+ auto it = lower_bound(_entries.begin(), _entries.end(), e->getTimestamp(), TimestampLess());
if (it != _entries.end()) {
if (it->entry->getTimestamp() == e->getTimestamp()) {
if (*it->entry.get() == *e) {
- LOG(debug, "Ignoring duplicate put entry %s",
- e->toString().c_str());
+ LOG(debug, "Ignoring duplicate put entry %s", e->toString().c_str());
return;
} else {
- LOG(error, "Entry %s was already present."
- "Was trying to insert %s.",
- it->entry->toString().c_str(),
- e->toString().c_str());
+ LOG(error, "Entry %s was already present. Was trying to insert %s.",
+ it->entry->toString().c_str(), e->toString().c_str());
LOG_ABORT("should not reach here");
}
}
@@ -190,11 +182,8 @@ BucketContent::insert(DocEntry::SP e)
// newer versions of a document etc. by XORing away old checksum.
gidIt->second = e;
} else {
- LOG(spam,
- "Newly inserted entry %s was older than existing entry %s; "
- "not updating GID mapping",
- e->toString().c_str(),
- gidIt->second->toString().c_str());
+ LOG(spam, "Newly inserted entry %s was older than existing entry %s; not updating GID mapping",
+ e->toString().c_str(), gidIt->second->toString().c_str());
}
_outdatedInfo = true;
} else {
@@ -226,10 +215,8 @@ BucketContent::insert(DocEntry::SP e)
_info.getActive());
}
- LOG(spam,
- "After cheap bucketinfo update, state is %s (inserted %s)",
- _info.toString().c_str(),
- e->toString().c_str());
+ LOG(spam, "After cheap bucketinfo update, state is %s (inserted %s)",
+ _info.toString().c_str(), e->toString().c_str());
}
}
@@ -237,9 +224,8 @@ BucketContent::insert(DocEntry::SP e)
}
DocEntry::SP
-BucketContent::getEntry(const DocumentId& did) const
-{
- GidMapType::const_iterator it(_gidMap.find(did.getGlobalId()));
+BucketContent::getEntry(const DocumentId &did) const {
+ auto it(_gidMap.find(did.getGlobalId()));
if (it != _gidMap.end()) {
return it->second;
}
@@ -247,10 +233,8 @@ BucketContent::getEntry(const DocumentId& did) const
}
DocEntry::SP
-BucketContent::getEntry(Timestamp t) const
-{
- std::vector<BucketEntry>::const_iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::getEntry(Timestamp t) const {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter == _entries.end() || iter->entry->getTimestamp() != t) {
return DocEntry::SP();
@@ -260,15 +244,12 @@ BucketContent::getEntry(Timestamp t) const
}
void
-BucketContent::eraseEntry(Timestamp t)
-{
- std::vector<BucketEntry>::iterator iter =
- lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
+BucketContent::eraseEntry(Timestamp t) {
+ auto iter = lower_bound(_entries.begin(), _entries.end(), t, TimestampLess());
if (iter != _entries.end() && iter->entry->getTimestamp() == t) {
assert(iter->entry->getDocumentId() != 0);
- GidMapType::iterator gidIt(
- _gidMap.find(iter->entry->getDocumentId()->getGlobalId()));
+ GidMapType::iterator gidIt = _gidMap.find(iter->entry->getDocumentId()->getGlobalId());
assert(gidIt != _gidMap.end());
_entries.erase(iter);
if (gidIt->second->getTimestamp() == t) {
@@ -281,7 +262,7 @@ BucketContent::eraseEntry(Timestamp t)
}
}
-DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo>& repo)
+DummyPersistence::DummyPersistence(const std::shared_ptr<const document::DocumentTypeRepo> &repo)
: _initialized(false),
_repo(repo),
_content(),
@@ -294,13 +275,12 @@ DummyPersistence::DummyPersistence(const std::shared_ptr<const document::Documen
DummyPersistence::~DummyPersistence() = default;
document::select::Node::UP
-DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf)
-{
+DummyPersistence::parseDocumentSelection(const string &documentSelection, bool allowLeaf) {
document::select::Node::UP ret;
try {
document::select::Parser parser(*_repo, document::BucketIdFactory());
ret = parser.parse(documentSelection);
- } catch (document::select::ParsingFailedException& e) {
+ } catch (document::select::ParsingFailedException &e) {
return document::select::Node::UP();
}
if (ret->isLeafNode() && !allowLeaf) {
@@ -310,18 +290,17 @@ DummyPersistence::parseDocumentSelection(const string& documentSelection, bool a
}
Result
-DummyPersistence::initialize()
-{
+DummyPersistence::initialize() {
assert(!_initialized);
_initialized = true;
return Result();
}
#define DUMMYPERSISTENCE_VERIFY_INITIALIZED \
- if (!_initialized) throw vespalib::IllegalStateException( \
- "initialize() must always be called first in order to " \
- "trigger lazy initialization.", VESPA_STRLOC)
-
+ if (!_initialized) { \
+ LOG(error, "initialize() must always be called first in order to trigger lazy initialization."); \
+ abort(); \
+ }
BucketIdListResult
DummyPersistence::listBuckets(BucketSpace bucketSpace) const
@@ -714,8 +693,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&)
return Result();
}
-Result
-DummyPersistence::createBucket(const Bucket& b, Context&)
+void
+DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "createBucket(%s)", b.toString().c_str());
@@ -727,11 +706,11 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
assert(!_content[b]->_inUse);
LOG(debug, "%s already existed", b.toString().c_str());
}
- return Result();
+ onComplete->onComplete(std::make_unique<Result>());
}
void
-DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+DummyPersistence::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "deleteBucket(%s)", b.toString().c_str());
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 99d6ba717b7..a25bf6b8a8e 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -168,8 +168,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket&, Context&) override;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index e287bdc5252..3b59f20ca96 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider
{
public:
Result initialize() override { return Result(); };
- Result createBucket(const Bucket&, Context&) override { return Result(); }
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); }
diff --git a/persistence/src/vespa/persistence/spi/catchresult.h b/persistence/src/vespa/persistence/spi/catchresult.h
index 02c626ea23e..7b04498205d 100644
--- a/persistence/src/vespa/persistence/spi/catchresult.h
+++ b/persistence/src/vespa/persistence/spi/catchresult.h
@@ -19,4 +19,9 @@ private:
const ResultHandler *_resulthandler;
};
+class NoopOperationComplete : public OperationComplete {
+ void onComplete(std::unique_ptr<spi::Result>) noexcept override { }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 3ea476c33fc..31db08a6f4f 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat
}
Result
+PersistenceProvider::createBucket(const Bucket& bucket, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ createBucketAsync(bucket, context, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 83eb042d855..269175f7d26 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -58,6 +58,7 @@ struct PersistenceProvider
virtual ~PersistenceProvider();
// TODO Move to utility class for use in tests only
+ Result createBucket(const Bucket&, Context&);
Result deleteBucket(const Bucket&, Context&);
Result put(const Bucket&, Timestamp, DocumentSP, Context&);
Result setActiveState(const Bucket&, BucketInfo::ActiveState);
@@ -336,14 +337,14 @@ struct PersistenceProvider
* Tells the provider that the given bucket has been created in the
* service layer. There is no requirement to do anything here.
*/
- virtual Result createBucket(const Bucket&, Context&) = 0;
+ virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* Deletes the given bucket and all entries contained in that bucket.
* After this operation has succeeded, a restart of the provider should
* not yield the bucket in getBucketList().
*/
- virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0;
+ virtual void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept = 0;
/**
* This function is called continuously by the service layer. It allows the
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 114292d055d..2e1fc74037c 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -548,24 +548,28 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
}
-Result
-PersistenceEngine::createBucket(const Bucket &b, Context &)
+void
+PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "createBucket(%s)", b.toString().c_str());
HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace());
- TransportLatch latch(snap.size());
- for (; snap.handlers().valid(); snap.handlers().next()) {
+
+ auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleCreateBucket(feedtoken::make(latch), b);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleCreateBucket(feedtoken::make(transportContext), b);
+ } else {
+ handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b);
+ }
}
- latch.await();
- return latch.getResult();
}
void
-PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
+PersistenceEngine::deleteBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete) noexcept
{
ReadGuard rguard(_rwMutex);
LOG(spam, "deleteBucket(%s)", b.toString().c_str());
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 94331ac2cd6..fe564d01459 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -114,8 +114,8 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket &bucketId, Context &) override ;
- void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
+ void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) noexcept override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index b3bd1c6a253..02b43a32df3 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -24,7 +24,7 @@
#define CHECK_ERROR_ASYNC(className, failType, onError) \
{ \
- Guard guard(_lock); \
+ Guard guard(_lock); \
if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
return; \
@@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const
return _spi.listBuckets(bucketSpace);
}
-spi::Result
-PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
LOG_SPI("createBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET);
- return _spi.createBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete);
+ return _spi.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketInfoResult
@@ -177,7 +177,7 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
void
PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
- spi::OperationComplete::UP operationComplete)
+ spi::OperationComplete::UP operationComplete) noexcept
{
LOG_SPI("deleteBucket(" << bucket << ")");
CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index c6628814dba..cfc7002a643 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -96,7 +96,7 @@ public:
void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
spi::OperationComplete::UP up) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
@@ -111,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 07d2b24d536..a3f0182ba30 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -62,13 +62,13 @@ public:
return PersistenceProviderWrapper::getBucketInfo(bucket);
}
- spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_createBucketInvocations;
- return PersistenceProviderWrapper::createBucket(bucket, ctx);
+ PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete));
}
void
- deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) noexcept override {
++_deleteBucketInvocations;
PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 017b8ce2b92..75e85fb4b6f 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -872,7 +872,6 @@ TEST_P(MergeHandlerTest, merge_bucket_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
@@ -903,7 +902,6 @@ TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) {
setUpChain(MIDDLE);
ExpectedExceptionSpec exceptions[] = {
- { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" },
{ PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" },
{ PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" },
{ PersistenceProviderWrapper::FAIL_ITERATE, "iterate" },
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 47b5e4f5f27..bc6e67578c0 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -5,6 +5,7 @@
#include "testandsethelper.h"
#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
@@ -154,6 +155,31 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP ignored) mutable {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ tracker->sendReply();
+ });
+
+ if (cmd.getActive()) {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ } else {
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+ }
+
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.deleteBuckets);
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4f5c242570c..db5a77bfb59 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -30,6 +30,7 @@ public:
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 77e7762ec9a..4cd0b181155 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -6,13 +6,12 @@
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
#include <vespa/persistence/spi/persistenceprovider.h>
-#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
#include <algorithm>
-#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
@@ -51,20 +50,6 @@ constexpr int getDeleteFlag() {
return 2;
}
-/**
- * Throws std::runtime_error if result has an error.
- */
-void
-checkResult(const spi::Result& result, const spi::Bucket& bucket, const char* op)
-{
- if (result.hasError()) {
- vespalib::asciistream ss;
- ss << "Failed " << op << " in " << bucket << ": " << result.toString();
- throw std::runtime_error(ss.str());
- }
-}
-
-
class IteratorGuard {
spi::PersistenceProvider& _spi;
spi::IteratorId _iteratorId;
@@ -663,25 +648,28 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
- void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
- {
- for (const auto& entry : status.diff) {
- uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
- if ((entry_has_mask == 0u) ||
- (constrictHasMask && (entry_has_mask != hasMask))) {
- continue;
- }
- cmd.getDiff().emplace_back(entry);
- if (constrictHasMask) {
- cmd.getDiff().back()._entry._hasMask = newHasMask;
- } else {
- cmd.getDiff().back()._entry._hasMask = entry_has_mask;
- }
+
+void
+findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
+{
+ for (const auto& entry : status.diff) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
+ continue;
+ }
+ cmd.getDiff().emplace_back(entry);
+ if (constrictHasMask) {
+ cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
+}
+
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
MessageSender& sender, spi::Context& context,
@@ -898,7 +886,8 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
tracker->fail(api::ReturnCode::BUSY, err);
return tracker;
}
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+
MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket());
auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel());
@@ -938,141 +927,136 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
namespace {
- uint8_t findOwnIndex(
- const std::vector<api::MergeBucketCommand::Node>& nodeList,
- uint16_t us)
- {
- for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
- if (nodeList[i].index == us) return i;
- }
- throw vespalib::IllegalStateException(
- "Got GetBucketDiff cmd on node not in nodelist in command",
- VESPA_STRLOC);
+uint8_t findOwnIndex(
+ const std::vector<api::MergeBucketCommand::Node>& nodeList,
+ uint16_t us)
+{
+ for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
+ if (nodeList[i].index == us) return i;
}
+ throw vespalib::IllegalStateException(
+ "Got GetBucketDiff cmd on node not in nodelist in command",
+ VESPA_STRLOC);
+}
- struct DiffEntryTimestampOrder
- : public std::binary_function<api::GetBucketDiffCommand::Entry,
- api::GetBucketDiffCommand::Entry, bool>
- {
- bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const {
- return (x._timestamp < y._timestamp);
- }
- };
-
- /**
- * Merges list A and list B together and puts the result in result.
- * Result is swapped in as last step to keep function exception safe. Thus
- * result can be listA or listB if wanted.
- *
- * listA and listB are assumed to be in the order found in the slotfile, or
- * in the order given by a previous call to this function. (In both cases
- * this will be sorted by timestamp)
- *
- * @return false if any suspect entries was found.
- */
- bool mergeLists(
- const std::vector<api::GetBucketDiffCommand::Entry>& listA,
- const std::vector<api::GetBucketDiffCommand::Entry>& listB,
- std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
- {
- bool suspect = false;
- std::vector<api::GetBucketDiffCommand::Entry> result;
- uint32_t i = 0, j = 0;
- while (i < listA.size() && j < listB.size()) {
- const api::GetBucketDiffCommand::Entry& a(listA[i]);
- const api::GetBucketDiffCommand::Entry& b(listB[j]);
- if (a._timestamp < b._timestamp) {
- result.push_back(a);
- ++i;
- } else if (a._timestamp > b._timestamp) {
- result.push_back(b);
- ++j;
- } else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
- if (!(a == b)) {
- if (a._gid == b._gid && a._flags == b._flags) {
- if ((a._flags & getDeleteFlag()) != 0 &&
- (b._flags & getDeleteFlag()) != 0)
- {
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
- LOG(debug, "Found entries with equal timestamps of "
- "the same gid who both are remove "
- "entries: %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- } else {
- LOG(error, "Found entries with equal timestamps of "
- "the same gid. This is likely same "
- "document where size of document varies:"
- " %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- }
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
- } else if ((a._flags & getDeleteFlag())
- != (b._flags & getDeleteFlag()))
+/**
+ * Merges list A and list B together and puts the result in result.
+ * Result is swapped in as last step to keep function exception safe. Thus
+ * result can be listA or listB if wanted.
+ *
+ * listA and listB are assumed to be in the order found in the slotfile, or
+ * in the order given by a previous call to this function. (In both cases
+ * this will be sorted by timestamp)
+ *
+ * @return false if any suspect entries was found.
+ */
+bool mergeLists(
+ const std::vector<api::GetBucketDiffCommand::Entry>& listA,
+ const std::vector<api::GetBucketDiffCommand::Entry>& listB,
+ std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
+{
+ bool suspect = false;
+ std::vector<api::GetBucketDiffCommand::Entry> result;
+ uint32_t i = 0, j = 0;
+ while (i < listA.size() && j < listB.size()) {
+ const api::GetBucketDiffCommand::Entry& a(listA[i]);
+ const api::GetBucketDiffCommand::Entry& b(listB[j]);
+ if (a._timestamp < b._timestamp) {
+ result.push_back(a);
+ ++i;
+ } else if (a._timestamp > b._timestamp) {
+ result.push_back(b);
+ ++j;
+ } else {
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
+ if (!(a == b)) {
+ if (a._gid == b._gid && a._flags == b._flags) {
+ if ((a._flags & getDeleteFlag()) != 0 &&
+ (b._flags & getDeleteFlag()) != 0)
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
- const api::GetBucketDiffCommand::Entry& deletedEntry(
- (a._flags & getDeleteFlag()) != 0 ? a : b);
- result.push_back(deletedEntry);
- LOG(debug,
- "Found put and remove on same timestamp. Keeping"
- "remove as it is likely caused by remove with "
- "copies unavailable at the time: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
+ LOG(debug, "Found entries with equal timestamps of "
+ "the same gid who both are remove "
+ "entries: %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
} else {
- LOG(error, "Found entries with equal timestamps that "
- "weren't the same entry: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
+ LOG(error, "Found entries with equal timestamps of "
+ "the same gid. This is likely same "
+ "document where size of document varies:"
+ " %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
}
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
+ suspect = true;
+ } else if ((a._flags & getDeleteFlag())
+ != (b._flags & getDeleteFlag()))
+ {
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
+ const api::GetBucketDiffCommand::Entry& deletedEntry(
+ (a._flags & getDeleteFlag()) != 0 ? a : b);
+ result.push_back(deletedEntry);
+ LOG(debug,
+ "Found put and remove on same timestamp. Keeping"
+ "remove as it is likely caused by remove with "
+ "copies unavailable at the time: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
} else {
+ LOG(error, "Found entries with equal timestamps that "
+ "weren't the same entry: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
result.push_back(a);
result.back()._hasMask |= b._hasMask;
+ suspect = true;
}
- ++i;
- ++j;
+ } else {
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
}
+ ++i;
+ ++j;
}
- if (i < listA.size()) {
- assert(j >= listB.size());
- for (uint32_t n = listA.size(); i<n; ++i) {
- result.push_back(listA[i]);
- }
- } else if (j < listB.size()) {
- assert(i >= listA.size());
- for (uint32_t n = listB.size(); j<n; ++j) {
- result.push_back(listB[j]);
- }
+ }
+ if (i < listA.size()) {
+ assert(j >= listB.size());
+ for (uint32_t n = listA.size(); i<n; ++i) {
+ result.push_back(listA[i]);
+ }
+ } else if (j < listB.size()) {
+ assert(i >= listA.size());
+ for (uint32_t n = listB.size(); j<n; ++j) {
+ result.push_back(listB[j]);
}
- result.swap(finalResult);
- return !suspect;
}
+ result.swap(finalResult);
+ return !suspect;
+}
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
-{
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const {
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<spi::NoopOperationComplete>());
+ return handleGetBucketDiffStage2(cmd, std::move(tracker));
+}
+MessageTracker::UP
+MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
+{
+ spi::Bucket bucket(cmd.getBucket());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket.");
return tracker;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 17cfb847d2c..e6b4d047209 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -86,6 +86,7 @@ private:
const uint32_t _commonMergeChainOptimalizationMinimumSize;
std::atomic<bool> _async_apply_bucket_diff;
+ MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index aa1a9c136fd..761021a9612 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::REVERT_ID:
return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ce424f0ce83..9ccd901744b 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context&
return checkResult(_impl.destroyIterator(iteratorId, context));
}
-spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
- return checkResult(_impl.createBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
void
-ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
+ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete) noexcept
{
onComplete->addResultHandler(this);
- _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
+ _impl.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index c9d2411e372..14d20cf8a52 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -49,7 +49,6 @@ public:
spi::Context &context) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -63,7 +62,8 @@ public:
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
- void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) noexcept override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4fe207e2e5..9a7a451b906 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t
}
MessageTracker::UP
-SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.visit);
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 2cfbc7016c0..009fd6dff52 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -22,7 +22,6 @@ public:
SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;