summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-21 16:54:10 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-25 11:45:38 +0000
commite59348551c200c15f5dc4158274d96b57b91227f (patch)
treeadcedfef06bcc510547538fe11355da33f26adab
parent78338d5a782bc464d3a8912c12c6839669f1b91b (diff)
Async createBucket
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp6
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h2
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h1
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp265
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h1
19 files changed, 206 insertions, 175 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 6e4f38fe564..b8a390ed0ce 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -714,8 +714,8 @@ DummyPersistence::destroyIterator(IteratorId id, Context&)
return Result();
}
-Result
-DummyPersistence::createBucket(const Bucket& b, Context&)
+void
+DummyPersistence::createBucketAsync(const Bucket& b, Context&, OperationComplete::UP onComplete)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "createBucket(%s)", b.toString().c_str());
@@ -727,7 +727,7 @@ DummyPersistence::createBucket(const Bucket& b, Context&)
assert(!_content[b]->_inUse);
LOG(debug, "%s already existed", b.toString().c_str());
}
- return Result();
+ onComplete->onComplete(std::make_unique<Result>());
}
void
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 99d6ba717b7..2ab97b0b403 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -168,7 +168,7 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket&, Context&) override;
+ void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index e287bdc5252..3b59f20ca96 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -15,7 +15,6 @@ class AbstractPersistenceProvider : public PersistenceProvider
{
public:
Result initialize() override { return Result(); };
- Result createBucket(const Bucket&, Context&) override { return Result(); }
Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); }
void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override;
Result setClusterState(BucketSpace, const ClusterState&) override { return Result(); }
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 3ea476c33fc..31db08a6f4f 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -17,6 +17,14 @@ PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveStat
}
Result
+PersistenceProvider::createBucket(const Bucket& bucket, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ createBucketAsync(bucket, context, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::deleteBucket(const Bucket& bucket, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 83eb042d855..09a752d4ded 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -58,6 +58,7 @@ struct PersistenceProvider
virtual ~PersistenceProvider();
// TODO Move to utility class for use in tests only
+ Result createBucket(const Bucket&, Context&);
Result deleteBucket(const Bucket&, Context&);
Result put(const Bucket&, Timestamp, DocumentSP, Context&);
Result setActiveState(const Bucket&, BucketInfo::ActiveState);
@@ -336,7 +337,7 @@ struct PersistenceProvider
* Tells the provider that the given bucket has been created in the
* service layer. There is no requirement to do anything here.
*/
- virtual Result createBucket(const Bucket&, Context&) = 0;
+ virtual void createBucketAsync(const Bucket&, Context&, OperationComplete::UP) = 0;
/**
* Deletes the given bucket and all entries contained in that bucket.
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 114292d055d..952c2218140 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -548,19 +548,23 @@ PersistenceEngine::destroyIterator(IteratorId id, Context&)
}
-Result
-PersistenceEngine::createBucket(const Bucket &b, Context &)
+void
+PersistenceEngine::createBucketAsync(const Bucket &b, Context &, OperationComplete::UP onComplete)
{
ReadGuard rguard(_rwMutex);
LOG(spam, "createBucket(%s)", b.toString().c_str());
HandlerSnapshot snap = getHandlerSnapshot(rguard, b.getBucketSpace());
- TransportLatch latch(snap.size());
- for (; snap.handlers().valid(); snap.handlers().next()) {
+
+ auto transportContext = std::make_shared<AsyncTranportContext>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleCreateBucket(feedtoken::make(latch), b);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleCreateBucket(feedtoken::make(transportContext), b);
+ } else {
+ handler->handleCreateBucket(feedtoken::make(std::move(transportContext)), b);
+ }
}
- latch.await();
- return latch.getResult();
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 94331ac2cd6..9cabebc8135 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -114,7 +114,7 @@ public:
IterateResult iterate(IteratorId, uint64_t maxByteSize, Context&) const override;
Result destroyIterator(IteratorId, Context&) override;
- Result createBucket(const Bucket &bucketId, Context &) override ;
+ void createBucketAsync(const Bucket &bucketId, Context &, OperationComplete::UP) override;
void deleteBucketAsync(const Bucket&, Context&, OperationComplete::UP) override;
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index b3bd1c6a253..302e603a9d8 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -80,12 +80,12 @@ PersistenceProviderWrapper::listBuckets(BucketSpace bucketSpace) const
return _spi.listBuckets(bucketSpace);
}
-spi::Result
-PersistenceProviderWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+PersistenceProviderWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("createBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_CREATE_BUCKET);
- return _spi.createBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_CREATE_BUCKET, onComplete);
+ return _spi.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketInfoResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index c6628814dba..3cb7b92356b 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -96,7 +96,7 @@ public:
void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
spi::OperationComplete::UP up) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 07d2b24d536..75d9b595c4f 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -62,9 +62,9 @@ public:
return PersistenceProviderWrapper::getBucketInfo(bucket);
}
- spi::Result createBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void createBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
++_createBucketInvocations;
- return PersistenceProviderWrapper::createBucket(bucket, ctx);
+ PersistenceProviderWrapper::createBucketAsync(bucket, ctx, std::move(onComplete));
}
void
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 47b5e4f5f27..b3e9d596c76 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -154,6 +154,34 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.createBuckets);
+ LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket, active=cmd.getActive()](spi::Result::UP ignored) mutable {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ if (active) {
+ auto nestedTask = makeResultTask([tracker = std::move(tracker)](spi::Result::UP nestedIgnored) {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) nestedIgnored;
+ tracker->sendReply();
+ });
+ _spi.setActiveStateAsync(bucket, spi::BucketInfo::ACTIVE, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(nestedTask)));
+ } else {
+ tracker->sendReply();
+ }
+ });
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, bucket, std::move(task)));
+
+ return tracker;
+}
+
+MessageTracker::UP
AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.deleteBuckets);
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 4f5c242570c..db5a77bfb59 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -30,6 +30,7 @@ public:
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 77e7762ec9a..2f19b8695a4 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -12,7 +12,6 @@
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
#include <algorithm>
-#include <future>
#include <vespa/log/log.h>
LOG_SETUP(".persistence.mergehandler");
@@ -663,24 +662,25 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const
}
namespace {
- void findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
- uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
- {
- for (const auto& entry : status.diff) {
- uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
- if ((entry_has_mask == 0u) ||
- (constrictHasMask && (entry_has_mask != hasMask))) {
- continue;
- }
- cmd.getDiff().emplace_back(entry);
- if (constrictHasMask) {
- cmd.getDiff().back()._entry._hasMask = newHasMask;
- } else {
- cmd.getDiff().back()._entry._hasMask = entry_has_mask;
- }
+void
+findCandidates(MergeStatus& status, uint16_t active_nodes_mask, bool constrictHasMask, uint16_t hasMask,
+ uint16_t newHasMask, api::ApplyBucketDiffCommand& cmd)
+{
+ for (const auto& entry : status.diff) {
+ uint16_t entry_has_mask = (entry._hasMask & active_nodes_mask);
+ if ((entry_has_mask == 0u) ||
+ (constrictHasMask && (entry_has_mask != hasMask))) {
+ continue;
+ }
+ cmd.getDiff().emplace_back(entry);
+ if (constrictHasMask) {
+ cmd.getDiff().back()._entry._hasMask = newHasMask;
+ } else {
+ cmd.getDiff().back()._entry._hasMask = entry_has_mask;
}
}
}
+}
api::StorageReply::SP
MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
@@ -938,141 +938,146 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
namespace {
- uint8_t findOwnIndex(
- const std::vector<api::MergeBucketCommand::Node>& nodeList,
- uint16_t us)
- {
- for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
- if (nodeList[i].index == us) return i;
- }
- throw vespalib::IllegalStateException(
- "Got GetBucketDiff cmd on node not in nodelist in command",
- VESPA_STRLOC);
+uint8_t findOwnIndex(
+ const std::vector<api::MergeBucketCommand::Node>& nodeList,
+ uint16_t us)
+{
+ for (uint32_t i=0, n=nodeList.size(); i<n; ++i) {
+ if (nodeList[i].index == us) return i;
}
+ throw vespalib::IllegalStateException(
+ "Got GetBucketDiff cmd on node not in nodelist in command",
+ VESPA_STRLOC);
+}
- struct DiffEntryTimestampOrder
- : public std::binary_function<api::GetBucketDiffCommand::Entry,
- api::GetBucketDiffCommand::Entry, bool>
- {
- bool operator()(const api::GetBucketDiffCommand::Entry& x,
- const api::GetBucketDiffCommand::Entry& y) const {
- return (x._timestamp < y._timestamp);
- }
- };
-
- /**
- * Merges list A and list B together and puts the result in result.
- * Result is swapped in as last step to keep function exception safe. Thus
- * result can be listA or listB if wanted.
- *
- * listA and listB are assumed to be in the order found in the slotfile, or
- * in the order given by a previous call to this function. (In both cases
- * this will be sorted by timestamp)
- *
- * @return false if any suspect entries was found.
- */
- bool mergeLists(
- const std::vector<api::GetBucketDiffCommand::Entry>& listA,
- const std::vector<api::GetBucketDiffCommand::Entry>& listB,
- std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
- {
- bool suspect = false;
- std::vector<api::GetBucketDiffCommand::Entry> result;
- uint32_t i = 0, j = 0;
- while (i < listA.size() && j < listB.size()) {
- const api::GetBucketDiffCommand::Entry& a(listA[i]);
- const api::GetBucketDiffCommand::Entry& b(listB[j]);
- if (a._timestamp < b._timestamp) {
- result.push_back(a);
- ++i;
- } else if (a._timestamp > b._timestamp) {
- result.push_back(b);
- ++j;
- } else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
- if (!(a == b)) {
- if (a._gid == b._gid && a._flags == b._flags) {
- if ((a._flags & getDeleteFlag()) != 0 &&
- (b._flags & getDeleteFlag()) != 0)
- {
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
- LOG(debug, "Found entries with equal timestamps of "
- "the same gid who both are remove "
- "entries: %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- } else {
- LOG(error, "Found entries with equal timestamps of "
- "the same gid. This is likely same "
- "document where size of document varies:"
- " %s <-> %s.",
- a.toString(true).c_str(),
- b.toString(true).c_str());
- }
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
- } else if ((a._flags & getDeleteFlag())
- != (b._flags & getDeleteFlag()))
+/**
+ * Merges list A and list B together and puts the result in result.
+ * Result is swapped in as last step to keep function exception safe. Thus
+ * result can be listA or listB if wanted.
+ *
+ * listA and listB are assumed to be in the order found in the slotfile, or
+ * in the order given by a previous call to this function. (In both cases
+ * this will be sorted by timestamp)
+ *
+ * @return false if any suspect entries was found.
+ */
+bool mergeLists(
+ const std::vector<api::GetBucketDiffCommand::Entry>& listA,
+ const std::vector<api::GetBucketDiffCommand::Entry>& listB,
+ std::vector<api::GetBucketDiffCommand::Entry>& finalResult)
+{
+ bool suspect = false;
+ std::vector<api::GetBucketDiffCommand::Entry> result;
+ uint32_t i = 0, j = 0;
+ while (i < listA.size() && j < listB.size()) {
+ const api::GetBucketDiffCommand::Entry& a(listA[i]);
+ const api::GetBucketDiffCommand::Entry& b(listB[j]);
+ if (a._timestamp < b._timestamp) {
+ result.push_back(a);
+ ++i;
+ } else if (a._timestamp > b._timestamp) {
+ result.push_back(b);
+ ++j;
+ } else {
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
+ if (!(a == b)) {
+ if (a._gid == b._gid && a._flags == b._flags) {
+ if ((a._flags & getDeleteFlag()) != 0 &&
+ (b._flags & getDeleteFlag()) != 0)
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
- const api::GetBucketDiffCommand::Entry& deletedEntry(
- (a._flags & getDeleteFlag()) != 0 ? a : b);
- result.push_back(deletedEntry);
- LOG(debug,
- "Found put and remove on same timestamp. Keeping"
- "remove as it is likely caused by remove with "
- "copies unavailable at the time: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
+ LOG(debug, "Found entries with equal timestamps of "
+ "the same gid who both are remove "
+ "entries: %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
} else {
- LOG(error, "Found entries with equal timestamps that "
- "weren't the same entry: %s, %s.",
- a.toString().c_str(), b.toString().c_str());
- result.push_back(a);
- result.back()._hasMask |= b._hasMask;
- suspect = true;
+ LOG(error, "Found entries with equal timestamps of "
+ "the same gid. This is likely same "
+ "document where size of document varies:"
+ " %s <-> %s.",
+ a.toString(true).c_str(),
+ b.toString(true).c_str());
}
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
+ suspect = true;
+ } else if ((a._flags & getDeleteFlag())
+ != (b._flags & getDeleteFlag()))
+ {
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
+ const api::GetBucketDiffCommand::Entry& deletedEntry(
+ (a._flags & getDeleteFlag()) != 0 ? a : b);
+ result.push_back(deletedEntry);
+ LOG(debug,
+ "Found put and remove on same timestamp. Keeping"
+ "remove as it is likely caused by remove with "
+ "copies unavailable at the time: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
} else {
+ LOG(error, "Found entries with equal timestamps that "
+ "weren't the same entry: %s, %s.",
+ a.toString().c_str(), b.toString().c_str());
result.push_back(a);
result.back()._hasMask |= b._hasMask;
+ suspect = true;
}
- ++i;
- ++j;
+ } else {
+ result.push_back(a);
+ result.back()._hasMask |= b._hasMask;
}
+ ++i;
+ ++j;
}
- if (i < listA.size()) {
- assert(j >= listB.size());
- for (uint32_t n = listA.size(); i<n; ++i) {
- result.push_back(listA[i]);
- }
- } else if (j < listB.size()) {
- assert(i >= listA.size());
- for (uint32_t n = listB.size(); j<n; ++j) {
- result.push_back(listB[j]);
- }
+ }
+ if (i < listA.size()) {
+ assert(j >= listB.size());
+ for (uint32_t n = listA.size(); i<n; ++i) {
+ result.push_back(listA[i]);
+ }
+ } else if (j < listB.size()) {
+ assert(i >= listA.size());
+ for (uint32_t n = listB.size(); j<n; ++j) {
+ result.push_back(listB[j]);
}
- result.swap(finalResult);
- return !suspect;
}
+ result.swap(finalResult);
+ return !suspect;
+}
+
+struct CheckResult : public spi::OperationComplete {
+ spi::Bucket _bucket;
+ const char *_msg;
+ CheckResult(spi::Bucket bucket, const char * msg) : _bucket(bucket), _msg(msg) { }
+ void onComplete(std::unique_ptr<spi::Result> result) override {
+ checkResult(*result, _bucket, _msg);
+ }
+ void addResultHandler(const spi::ResultHandler *) override { }
+};
}
MessageTracker::UP
-MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
-{
+MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const {
tracker->setMetric(_env._metrics.getBucketDiff);
spi::Bucket bucket(cmd.getBucket());
LOG(debug, "GetBucketDiff(%s)", bucket.toString().c_str());
- checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket");
+ _spi.createBucketAsync(bucket, tracker->context(), std::make_unique<CheckResult>(bucket, "create bucket"));
+ return handleGetBucketDiffStage2(cmd, std::move(tracker));
+}
+MessageTracker::UP
+MergeHandler::handleGetBucketDiffStage2(api::GetBucketDiffCommand& cmd, MessageTracker::UP tracker) const
+{
+ spi::Bucket bucket(cmd.getBucket());
if (_env._fileStorHandler.isMerging(bucket.getBucket())) {
tracker->fail(api::ReturnCode::BUSY, "A merge is already running on this bucket.");
return tracker;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 17cfb847d2c..e6b4d047209 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -86,6 +86,7 @@ private:
const uint32_t _commonMergeChainOptimalizationMinimumSize;
std::atomic<bool> _async_apply_bucket_diff;
+ MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
MergeStatus& status,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index aa1a9c136fd..761021a9612 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -44,7 +44,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::REVERT_ID:
return _simpleHandler.handleRevert(static_cast<api::RevertCommand&>(msg), std::move(tracker));
case api::MessageType::CREATEBUCKET_ID:
- return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index ce424f0ce83..5c27d112a7c 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -99,17 +99,18 @@ ProviderErrorWrapper::destroyIterator(spi::IteratorId iteratorId, spi::Context&
return checkResult(_impl.destroyIterator(iteratorId, context));
}
-spi::Result
-ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.createBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
void
-ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
+ProviderErrorWrapper::createBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
- _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
+ _impl.createBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index c9d2411e372..e999f33f2bd 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -49,7 +49,6 @@ public:
spi::Context &context) override;
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -63,6 +62,7 @@ public:
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+ void createBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4fe207e2e5..9a7a451b906 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -88,22 +88,6 @@ SimpleMessageHandler::handleRevert(api::RevertCommand& cmd, MessageTracker::UP t
}
MessageTracker::UP
-SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.createBuckets);
- LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- LOG(warning, "Bucket %s was merging at create time. Unexpected.", cmd.getBucketId().toString().c_str());
- }
- spi::Bucket spiBucket(cmd.getBucket());
- _spi.createBucket(spiBucket, tracker->context());
- if (cmd.getActive()) {
- _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
- }
- return tracker;
-}
-
-MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.visit);
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 2cfbc7016c0..009fd6dff52 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -22,7 +22,6 @@ public:
SimpleMessageHandler(const PersistenceUtil&, spi::PersistenceProvider&);
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;