aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-26 23:09:09 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-26 23:09:09 +0000
commit4663fd3fab5ee4d7a00c7549a96628d44f27ca6d (patch)
tree7073b0b6fbad92d4ca13a623ab62751bda3fdeae /storage
parent6a17198c6d2564836c64a2c5fb701a4a09af16c5 (diff)
Prepare for making persistence layer async.
Avoid state in the thread.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/persistencethread_splittest.cpp2
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp36
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp122
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h29
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/testandsethelper.h4
6 files changed, 103 insertions, 102 deletions
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp
index ea7dce96e0c..c8318bef211 100644
--- a/storage/src/tests/persistence/persistencethread_splittest.cpp
+++ b/storage/src/tests/persistence/persistencethread_splittest.cpp
@@ -213,7 +213,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase)
cmd.setMinByteSize(maxSize);
cmd.setMinDocCount(maxCount);
cmd.setSourceIndex(0);
- MessageTracker::UP result(thread->handleSplitBucket(cmd));
+ MessageTracker::UP result(thread->handleSplitBucket(cmd, context));
api::ReturnCode code(result->getResult());
EXPECT_EQ(error, code);
if (!code.success()) {
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index 1507f0e8f0d..1e67e90b540 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -33,15 +33,15 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils {
unique_ptr<PersistenceThread> thread;
shared_ptr<document::Document> testDoc;
document::DocumentId testDocId;
+ spi::Context context;
+
+ TestAndSetTest()
+ : context(spi::LoadType(0, "default"), 0, 0)
+ {}
void SetUp() override {
SingleDiskPersistenceTestUtils::SetUp();
- spi::Context context(
- spi::LoadType(0, "default"),
- spi::Priority(0),
- spi::Trace::TraceLevel(0));
-
createBucket(BUCKET_ID);
getPersistenceProvider().createBucket(
makeSpiBucket(BUCKET_ID),
@@ -85,7 +85,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) {
api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
setTestCondition(putTwo);
- ASSERT_EQ(thread->handlePut(putTwo)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
}
@@ -105,7 +105,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) {
api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo);
setTestCondition(putTwo);
- ASSERT_EQ(thread->handlePut(putTwo)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -125,7 +125,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) {
api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
setTestCondition(remove);
- ASSERT_EQ(thread->handleRemove(remove)->getResult().getResult(),
+ ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -145,7 +145,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) {
api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo);
setTestCondition(remove);
- ASSERT_EQ(thread->handleRemove(remove)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY),
dumpBucket(BUCKET_ID));
@@ -172,7 +172,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -185,7 +185,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -197,7 +197,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(),
+ ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -206,7 +206,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
@@ -218,7 +218,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) {
api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
put.setCondition(documentapi::TestAndSetCondition("bjarne"));
- ASSERT_EQ(thread->handlePut(put)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
+ ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -228,9 +228,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) {
api::Timestamp timestamp = 0;
api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
setTestCondition(put);
- thread->handlePut(put);
+ thread->handlePut(put, context);
- ASSERT_EQ(thread->handlePut(put)->getResult().getResult(),
+ ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -254,7 +254,7 @@ TestAndSetTest::createTestDocument()
document::Document::SP TestAndSetTest::retrieveTestDocument()
{
api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]");
- auto tracker = thread->handleGet(get);
+ auto tracker = thread->handleGet(get, context);
assert(tracker->getResult() == api::ReturnCode::Result::OK);
auto & reply = static_cast<api::GetReply &>(*tracker->getReply());
@@ -274,7 +274,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta
}
api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp);
- thread->handlePut(put);
+ thread->handlePut(put, context);
}
void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value)
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 4bcd92293d3..60cab4d1216 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -29,7 +29,6 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
_processAllHandler(_env, provider),
_mergeHandler(_spi, _env),
_diskMoveHandler(_env, _spi),
- _context(documentapi::LoadType::DEFAULT, 0, 0),
_bucketOwnershipNotifier(),
_flushMonitor(),
_closed(false)
@@ -86,11 +85,11 @@ bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) {
}
bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- bool missingDocumentImpliesMatch) {
+ spi::Context & context, bool missingDocumentImpliesMatch) {
try {
TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch);
- auto code = helper.retrieveAndMatch();
+ auto code = helper.retrieveAndMatch(context);
if (code.failed()) {
tracker.fail(code.getResult(), code.getMessage());
return false;
@@ -105,35 +104,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd,
}
MessageTracker::UP
-PersistenceThread::handlePut(api::PutCommand& cmd)
+PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context)
{
auto& metrics = _env._metrics.put[cmd.getLoadType()];
auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
return tracker;
}
spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context);
checkForError(response, *tracker);
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleRemove(api::RemoveCommand& cmd)
+PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context)
{
auto& metrics = _env._metrics.remove[cmd.getLoadType()];
auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) {
return tracker;
}
spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context);
if (checkForError(response, *tracker)) {
tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -144,18 +143,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context)
{
auto& metrics = _env._metrics.update[cmd.getLoadType()];
auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock());
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, cmd.getUpdate()->getCreateIfNonExistent())) {
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) {
return tracker;
}
spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context);
+ spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context);
if (checkForError(response, *tracker)) {
auto reply = std::make_shared<api::UpdateReply>(cmd);
reply->setOldTimestamp(response.getExistingTimestamp());
@@ -177,7 +176,7 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co
}
MessageTracker::UP
-PersistenceThread::handleGet(api::GetCommand& cmd)
+PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context)
{
auto& metrics = _env._metrics.get[cmd.getLoadType()];
auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock());
@@ -186,9 +185,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd)
document::FieldSetRepo repo;
document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet());
// _context is reset per command, so it's safe to modify it like this.
- _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
+ context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency()));
spi::GetResult result =
- _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context);
+ _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context);
if (checkForError(result, *tracker)) {
if (!result.hasDocument()) {
@@ -227,19 +226,19 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleRevert(api::RevertCommand& cmd)
+PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock());
spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition));
const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens();
for (const api::Timestamp & token : tokens) {
- spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context);
+ spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context);
}
return tracker;
}
MessageTracker::UP
-PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
+PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock());
LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str());
@@ -248,7 +247,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd)
DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId());
}
spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition));
- _spi.createBucket(spiBucket, _context);
+ _spi.createBucket(spiBucket, context);
if (cmd.getActive()) {
_spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE);
}
@@ -297,7 +296,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con
}
MessageTracker::UP
-PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
+PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock());
LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
@@ -310,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
return tracker;
}
- _spi.deleteBucket(bucket, _context);
+ _spi.deleteBucket(bucket, context);
StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
{
StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
@@ -335,10 +334,10 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleGetIter(GetIterCommand& cmd)
+PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock());
- spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context));
+ spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context));
if (checkForError(result, *tracker)) {
GetIterReply::SP reply(new GetIterReply(cmd));
reply->getEntries() = result.steal_entries();
@@ -376,16 +375,16 @@ PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd)
}
MessageTracker::UP
-PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
+PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock());
document::FieldSetRepo repo;
document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields());
// _context is reset per command, so it's safe to modify it like this.
- _context.setReadConsistency(cmd.getReadConsistency());
+ context.setReadConsistency(cmd.getReadConsistency());
spi::CreateIteratorResult result(_spi.createIterator(
spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)),
- *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context));
+ *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context));
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId())));
}
@@ -393,7 +392,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
+PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock());
NotificationGuard notifyGuard(*_bucketOwnershipNotifier);
@@ -414,7 +413,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
SplitBitDetector::Result targetInfo;
if (_env._config.enableMultibitSplitOptimalization) {
targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(),
- _context, cmd.getMinDocCount(), cmd.getMinByteSize());
+ context, cmd.getMinDocCount(), cmd.getMinByteSize());
}
if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) {
document::BucketId src(cmd.getBucketId());
@@ -454,7 +453,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
#endif
spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)),
- spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context);
+ spi::Bucket(target2, spi::PartitionId(lock2.disk)), context);
if (result.hasError()) {
tracker->fail(_env.convertErrorCode(result), result.getErrorMessage());
return tracker;
@@ -512,7 +511,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
spi::PartitionId(targets[i].second.diskIndex)));
LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it",
createTarget.toString().c_str());
- _spi.createBucket(createTarget, _context);
+ _spi.createBucket(createTarget, context);
}
splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo());
@@ -557,7 +556,7 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa
}
MessageTracker::UP
-PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
+PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock());
if (!validateJoinCommand(cmd, *tracker)) {
@@ -606,11 +605,11 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
_spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)),
spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)),
spi::Bucket(destBucket, spi::PartitionId(_env._partition)),
- _context);
+ context);
if (!checkForError(result, *tracker)) {
return tracker;
}
- result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), _context);
+ result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), context);
if (!checkForError(result, *tracker)) {
return tracker;
}
@@ -672,7 +671,7 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd)
+PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context)
{
auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock());
document::Bucket destBucket = cmd.getBucket();
@@ -689,7 +688,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd)
_spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())),
spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())),
- _context);
+ context);
if (checkForError(result, *tracker)) {
tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket())));
}
@@ -727,46 +726,46 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd)
}
MessageTracker::UP
-PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
+PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context)
{
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
- return handleGet(static_cast<api::GetCommand&>(msg));
+ return handleGet(static_cast<api::GetCommand&>(msg), context);
case api::MessageType::PUT_ID:
- return handlePut(static_cast<api::PutCommand&>(msg));
+ return handlePut(static_cast<api::PutCommand&>(msg), context);
case api::MessageType::REMOVE_ID:
- return handleRemove(static_cast<api::RemoveCommand&>(msg));
+ return handleRemove(static_cast<api::RemoveCommand&>(msg), context);
case api::MessageType::UPDATE_ID:
- return handleUpdate(static_cast<api::UpdateCommand&>(msg));
+ return handleUpdate(static_cast<api::UpdateCommand&>(msg), context);
case api::MessageType::REVERT_ID:
- return handleRevert(static_cast<api::RevertCommand&>(msg));
+ return handleRevert(static_cast<api::RevertCommand&>(msg), context);
case api::MessageType::CREATEBUCKET_ID:
- return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg));
+ return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context);
case api::MessageType::DELETEBUCKET_ID:
- return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg));
+ return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context);
case api::MessageType::JOINBUCKETS_ID:
- return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg));
+ return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context);
case api::MessageType::SPLITBUCKET_ID:
- return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg));
+ return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context);
// Depends on iterators
case api::MessageType::STATBUCKET_ID:
- return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), _context);
+ return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context);
case api::MessageType::REMOVELOCATION_ID:
- return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), _context);
+ return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context);
case api::MessageType::MERGEBUCKET_ID:
- return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), _context);
+ return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context);
case api::MessageType::GETBUCKETDIFF_ID:
- return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), _context);
+ return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context);
case api::MessageType::APPLYBUCKETDIFF_ID:
- return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), _context);
+ return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context);
case api::MessageType::SETBUCKETSTATE_ID:
return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
- return handleGetIter(static_cast<GetIterCommand&>(msg));
+ return handleGetIter(static_cast<GetIterCommand&>(msg), context);
case CreateIteratorCommand::ID:
- return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg));
+ return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context);
case ReadBucketList::ID:
return handleReadBucketList(static_cast<ReadBucketList&>(msg));
case ReadBucketInfo::ID:
@@ -774,9 +773,9 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
case RepairBucketCommand::ID:
return handleRepairBucket(static_cast<RepairBucketCommand&>(msg));
case BucketDiskMoveCommand::ID:
- return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), _context);
+ return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context);
case InternalBucketJoinCommand::ID:
- return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg));
+ return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context);
case RecheckBucketInfoCommand::ID:
return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg));
default:
@@ -792,13 +791,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg)
MessageTracker::UP
PersistenceThread::handleCommand(api::StorageCommand& msg)
{
- _context = spi::Context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel());
- MessageTracker::UP mtracker(handleCommandSplitByType(msg));
- if (mtracker && ! _context.getTrace().getRoot().isEmpty()) {
+ spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel());
+ MessageTracker::UP mtracker(handleCommandSplitByType(msg, context));
+ if (mtracker && ! context.getTrace().getRoot().isEmpty()) {
if (mtracker->getReply()) {
- mtracker->getReply()->getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ mtracker->getReply()->getTrace().getRoot().addChild(context.getTrace().getRoot());
} else {
- msg.getTrace().getRoot().addChild(_context.getTrace().getRoot());
+ msg.getTrace().getRoot().addChild(context.getTrace().getRoot());
}
}
return mtracker;
@@ -934,7 +933,10 @@ PersistenceThread::flushAllReplies(
}
#endif
spi::Bucket b(bucket, spi::PartitionId(_env._partition));
- spi::Result result = _spi.flush(b, _context);
+ // Flush is not used for anything currentlu, and the context is not correct either when batching is done
+ //So just faking it here.
+ spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0);
+ spi::Result result = _spi.flush(b, dummyContext);
uint32_t errorCode = _env.convertErrorCode(result);
if (errorCode != 0) {
for (uint32_t i = 0; i < replies.size(); ++i) {
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index ed27a759e8b..e410843c1be 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -28,21 +28,21 @@ public:
void flush() override;
framework::Thread& getThread() override { return *_thread; }
- MessageTracker::UP handlePut(api::PutCommand& cmd);
- MessageTracker::UP handleRemove(api::RemoveCommand& cmd);
- MessageTracker::UP handleUpdate(api::UpdateCommand& cmd);
- MessageTracker::UP handleGet(api::GetCommand& cmd);
- MessageTracker::UP handleRevert(api::RevertCommand& cmd);
- MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd);
- MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd);
- MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd);
- MessageTracker::UP handleGetIter(GetIterCommand& cmd);
+ MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context);
MessageTracker::UP handleReadBucketList(ReadBucketList& cmd);
MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd);
- MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd);
+ MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context);
MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd);
- MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd);
- MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd);
+ MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context);
+ MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context);
MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd);
MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
@@ -56,7 +56,6 @@ private:
DiskMoveOperationHandler _diskMoveHandler;
ServiceLayerComponent::UP _component;
framework::Thread::UP _thread;
- spi::Context _context;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
vespalib::Monitor _flushMonitor;
bool _closed;
@@ -72,7 +71,7 @@ private:
// Message handling functions
MessageTracker::UP handleCommand(api::StorageCommand&);
- MessageTracker::UP handleCommandSplitByType(api::StorageCommand&);
+ MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context);
void handleReply(api::StorageReply&);
MessageTracker::UP processMessage(api::StorageMessage& msg);
@@ -88,7 +87,7 @@ private:
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
- bool missingDocumentImpliesMatch = false);
+ spi::Context & context, bool missingDocumentImpliesMatch = false);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp
index 511d44ad331..e1909252c8f 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.cpp
+++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp
@@ -31,12 +31,12 @@ void TestAndSetHelper::parseDocumentSelection() {
}
}
-spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet) {
+spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) {
return _thread._spi.get(
_thread.getBucket(_docId, _cmd.getBucket()),
fieldSet,
_cmd.getDocumentId(),
- _thread._context);
+ context);
}
TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd,
@@ -51,16 +51,16 @@ TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAn
parseDocumentSelection();
}
-TestAndSetHelper::~TestAndSetHelper() {
-}
+TestAndSetHelper::~TestAndSetHelper() = default;
-api::ReturnCode TestAndSetHelper::retrieveAndMatch() {
+api::ReturnCode
+TestAndSetHelper::retrieveAndMatch(spi::Context & context) {
// Walk document selection tree to build a minimal field set
FieldVisitor fieldVisitor(*_docTypePtr);
_docSelectionUp->visit(fieldVisitor);
// Retrieve document
- auto result = retrieveDocument(fieldVisitor.getFieldSet());
+ auto result = retrieveDocument(fieldVisitor.getFieldSet(), context);
// If document exists, match it with selection
if (result.hasDocument()) {
diff --git a/storage/src/vespa/storage/persistence/testandsethelper.h b/storage/src/vespa/storage/persistence/testandsethelper.h
index 21c111c712f..b5fa29d0106 100644
--- a/storage/src/vespa/storage/persistence/testandsethelper.h
+++ b/storage/src/vespa/storage/persistence/testandsethelper.h
@@ -30,13 +30,13 @@ class TestAndSetHelper {
void getDocumentType();
void parseDocumentSelection();
- spi::GetResult retrieveDocument(const document::FieldSet & fieldSet);
+ spi::GetResult retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context);
public:
TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd,
bool missingDocumentImpliesMatch = false);
~TestAndSetHelper();
- api::ReturnCode retrieveAndMatch();
+ api::ReturnCode retrieveAndMatch(spi::Context & context);
};
} // storage