diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-26 23:09:09 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-26 23:09:09 +0000 |
commit | 4663fd3fab5ee4d7a00c7549a96628d44f27ca6d (patch) | |
tree | 7073b0b6fbad92d4ca13a623ab62751bda3fdeae /storage | |
parent | 6a17198c6d2564836c64a2c5fb701a4a09af16c5 (diff) |
Prepare for making persistence layer async.
Avoid state in the thread.
Diffstat (limited to 'storage')
6 files changed, 103 insertions, 102 deletions
diff --git a/storage/src/tests/persistence/persistencethread_splittest.cpp b/storage/src/tests/persistence/persistencethread_splittest.cpp index ea7dce96e0c..c8318bef211 100644 --- a/storage/src/tests/persistence/persistencethread_splittest.cpp +++ b/storage/src/tests/persistence/persistencethread_splittest.cpp @@ -213,7 +213,7 @@ PersistenceThreadSplitTest::doTest(SplitCase splitCase) cmd.setMinByteSize(maxSize); cmd.setMinDocCount(maxCount); cmd.setSourceIndex(0); - MessageTracker::UP result(thread->handleSplitBucket(cmd)); + MessageTracker::UP result(thread->handleSplitBucket(cmd, context)); api::ReturnCode code(result->getResult()); EXPECT_EQ(error, code); if (!code.success()) { diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index 1507f0e8f0d..1e67e90b540 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -33,15 +33,15 @@ struct TestAndSetTest : SingleDiskPersistenceTestUtils { unique_ptr<PersistenceThread> thread; shared_ptr<document::Document> testDoc; document::DocumentId testDocId; + spi::Context context; + + TestAndSetTest() + : context(spi::LoadType(0, "default"), 0, 0) + {} void SetUp() override { SingleDiskPersistenceTestUtils::SetUp(); - spi::Context context( - spi::LoadType(0, "default"), - spi::Priority(0), - spi::Trace::TraceLevel(0)); - createBucket(BUCKET_ID); getPersistenceProvider().createBucket( makeSpiBucket(BUCKET_ID), @@ -85,7 +85,7 @@ TEST_F(TestAndSetTest, conditional_put_not_executed_on_condition_mismatch) { api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); setTestCondition(putTwo); - ASSERT_EQ(thread->handlePut(putTwo)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); } @@ -105,7 +105,7 @@ TEST_F(TestAndSetTest, conditional_put_executed_on_condition_match) { api::PutCommand putTwo(makeDocumentBucket(BUCKET_ID), testDoc, timestampTwo); setTestCondition(putTwo); - ASSERT_EQ(thread->handlePut(putTwo)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handlePut(putTwo, context)->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -125,7 +125,7 @@ TEST_F(TestAndSetTest, conditional_remove_not_executed_on_condition_mismatch) { api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); setTestCondition(remove); - ASSERT_EQ(thread->handleRemove(remove)->getResult().getResult(), + ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -145,7 +145,7 @@ TEST_F(TestAndSetTest, conditional_remove_executed_on_condition_match) { api::RemoveCommand remove(makeDocumentBucket(BUCKET_ID), testDocId, timestampTwo); setTestCondition(remove); - ASSERT_EQ(thread->handleRemove(remove)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleRemove(remove, context)->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId, spi::REMOVE_ENTRY), dumpBucket(BUCKET_ID)); @@ -172,7 +172,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -185,7 +185,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -197,7 +197,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), + ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -206,7 +206,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp)->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(thread->handleUpdate(*updateUp, context)->getResult().getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } @@ -218,7 +218,7 @@ TEST_F(TestAndSetTest, invalid_document_selection_should_fail) { api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); put.setCondition(documentapi::TestAndSetCondition("bjarne")); - ASSERT_EQ(thread->handlePut(put)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); + ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::ILLEGAL_PARAMETERS); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -228,9 +228,9 @@ TEST_F(TestAndSetTest, conditional_put_to_non_existing_document_should_fail) { api::Timestamp timestamp = 0; api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); setTestCondition(put); - thread->handlePut(put); + thread->handlePut(put, context); - ASSERT_EQ(thread->handlePut(put)->getResult().getResult(), + ASSERT_EQ(thread->handlePut(put, context)->getResult().getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -254,7 +254,7 @@ TestAndSetTest::createTestDocument() document::Document::SP TestAndSetTest::retrieveTestDocument() { api::GetCommand get(makeDocumentBucket(BUCKET_ID), testDocId, "[all]"); - auto tracker = thread->handleGet(get); + auto tracker = thread->handleGet(get, context); assert(tracker->getResult() == api::ReturnCode::Result::OK); auto & reply = static_cast<api::GetReply &>(*tracker->getReply()); @@ -274,7 +274,7 @@ void TestAndSetTest::putTestDocument(bool matchingHeader, api::Timestamp timesta } api::PutCommand put(makeDocumentBucket(BUCKET_ID), testDoc, timestamp); - thread->handlePut(put); + thread->handlePut(put, context); } void TestAndSetTest::assertTestDocumentFoundAndMatchesContent(const document::FieldValue & value) diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 4bcd92293d3..60cab4d1216 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -29,7 +29,6 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, _processAllHandler(_env, provider), _mergeHandler(_spi, _env), _diskMoveHandler(_env, _spi), - _context(documentapi::LoadType::DEFAULT, 0, 0), _bucketOwnershipNotifier(), _flushMonitor(), _closed(false) @@ -86,11 +85,11 @@ bool PersistenceThread::tasConditionExists(const api::TestAndSetCommand & cmd) { } bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - bool missingDocumentImpliesMatch) { + spi::Context & context, bool missingDocumentImpliesMatch) { try { TestAndSetHelper helper(*this, cmd, missingDocumentImpliesMatch); - auto code = helper.retrieveAndMatch(); + auto code = helper.retrieveAndMatch(context); if (code.failed()) { tracker.fail(code.getResult(), code.getMessage()); return false; @@ -105,35 +104,35 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, } MessageTracker::UP -PersistenceThread::handlePut(api::PutCommand& cmd) +PersistenceThread::handlePut(api::PutCommand& cmd, spi::Context & context) { auto& metrics = _env._metrics.put[cmd.getLoadType()]; auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { return tracker; } spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context); + spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), context); checkForError(response, *tracker); return tracker; } MessageTracker::UP -PersistenceThread::handleRemove(api::RemoveCommand& cmd) +PersistenceThread::handleRemove(api::RemoveCommand& cmd, spi::Context & context) { auto& metrics = _env._metrics.remove[cmd.getLoadType()]; auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context)) { return tracker; } spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context); + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), context); if (checkForError(response, *tracker)) { tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -144,18 +143,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd) } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, spi::Context & context) { auto& metrics = _env._metrics.update[cmd.getLoadType()]; auto tracker = std::make_unique<MessageTracker>(metrics, _env._component.getClock()); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, cmd.getUpdate()->getCreateIfNonExistent())) { + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, context, cmd.getUpdate()->getCreateIfNonExistent())) { return tracker; } spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context); + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), context); if (checkForError(response, *tracker)) { auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); @@ -177,7 +176,7 @@ spi::ReadConsistency api_read_consistency_to_spi(api::InternalReadConsistency co } MessageTracker::UP -PersistenceThread::handleGet(api::GetCommand& cmd) +PersistenceThread::handleGet(api::GetCommand& cmd, spi::Context & context) { auto& metrics = _env._metrics.get[cmd.getLoadType()]; auto tracker = std::make_unique<MessageTracker>(metrics,_env._component.getClock()); @@ -186,9 +185,9 @@ PersistenceThread::handleGet(api::GetCommand& cmd) document::FieldSetRepo repo; document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); // _context is reset per command, so it's safe to modify it like this. - _context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); + context.setReadConsistency(api_read_consistency_to_spi(cmd.internal_read_consistency())); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), context); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { @@ -227,19 +226,19 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) } MessageTracker::UP -PersistenceThread::handleRevert(api::RevertCommand& cmd) +PersistenceThread::handleRevert(api::RevertCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); for (const api::Timestamp & token : tokens) { - spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context); + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), context); } return tracker; } MessageTracker::UP -PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) +PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); @@ -248,7 +247,7 @@ PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) DUMP_LOGGED_BUCKET_OPERATIONS(cmd.getBucketId()); } spi::Bucket spiBucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - _spi.createBucket(spiBucket, _context); + _spi.createBucket(spiBucket, context); if (cmd.getActive()) { _spi.setActiveState(spiBucket, spi::BucketInfo::ACTIVE); } @@ -297,7 +296,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, con } MessageTracker::UP -PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) +PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); @@ -310,7 +309,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { return tracker; } - _spi.deleteBucket(bucket, _context); + _spi.deleteBucket(bucket, context); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); @@ -335,10 +334,10 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) } MessageTracker::UP -PersistenceThread::handleGetIter(GetIterCommand& cmd) +PersistenceThread::handleGetIter(GetIterCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context)); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), context)); if (checkForError(result, *tracker)) { GetIterReply::SP reply(new GetIterReply(cmd)); reply->getEntries() = result.steal_entries(); @@ -376,16 +375,16 @@ PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) } MessageTracker::UP -PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) +PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); document::FieldSetRepo repo; document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); // _context is reset per command, so it's safe to modify it like this. - _context.setReadConsistency(cmd.getReadConsistency()); + context.setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), context)); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } @@ -393,7 +392,7 @@ PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) } MessageTracker::UP -PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) +PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.splitBuckets,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); @@ -414,7 +413,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) SplitBitDetector::Result targetInfo; if (_env._config.enableMultibitSplitOptimalization) { targetInfo = SplitBitDetector::detectSplit(_spi, spiBucket, cmd.getMaxSplitBits(), - _context, cmd.getMinDocCount(), cmd.getMinByteSize()); + context, cmd.getMinDocCount(), cmd.getMinByteSize()); } if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); @@ -454,7 +453,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) } #endif spi::Result result = _spi.split(spiBucket, spi::Bucket(target1, spi::PartitionId(lock1.disk)), - spi::Bucket(target2, spi::PartitionId(lock2.disk)), _context); + spi::Bucket(target2, spi::PartitionId(lock2.disk)), context); if (result.hasError()) { tracker->fail(_env.convertErrorCode(result), result.getErrorMessage()); return tracker; @@ -512,7 +511,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) spi::PartitionId(targets[i].second.diskIndex))); LOG(debug, "Split target %s was empty, but re-creating it since there are remapped operations queued to it", createTarget.toString().c_str()); - _spi.createBucket(createTarget, _context); + _spi.createBucket(createTarget, context); } splitReply.getSplitInfo().emplace_back(targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo()); @@ -557,7 +556,7 @@ PersistenceThread::validateJoinCommand(const api::JoinBucketsCommand& cmd, Messa } MessageTracker::UP -PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) +PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.joinBuckets,_env._component.getClock()); if (!validateJoinCommand(cmd, *tracker)) { @@ -606,11 +605,11 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd) _spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)), spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)), spi::Bucket(destBucket, spi::PartitionId(_env._partition)), - _context); + context); if (!checkForError(result, *tracker)) { return tracker; } - result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), _context); + result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), context); if (!checkForError(result, *tracker)) { return tracker; } @@ -672,7 +671,7 @@ PersistenceThread::handleSetBucketState(api::SetBucketStateCommand& cmd) } MessageTracker::UP -PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) +PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context) { auto tracker = std::make_unique<MessageTracker>(_env._metrics.internalJoin,_env._component.getClock()); document::Bucket destBucket = cmd.getBucket(); @@ -689,7 +688,7 @@ PersistenceThread::handleInternalBucketJoin(InternalBucketJoinCommand& cmd) _spi.join(spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToJoin())), spi::Bucket(destBucket, spi::PartitionId(cmd.getDiskOfInstanceToKeep())), - _context); + context); if (checkForError(result, *tracker)) { tracker->setReply(std::make_shared<InternalBucketJoinReply>(cmd, _env.getBucketInfo(cmd.getBucket()))); } @@ -727,46 +726,46 @@ PersistenceThread::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd) } MessageTracker::UP -PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) +PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg, spi::Context & context) { switch (msg.getType().getId()) { case api::MessageType::GET_ID: - return handleGet(static_cast<api::GetCommand&>(msg)); + return handleGet(static_cast<api::GetCommand&>(msg), context); case api::MessageType::PUT_ID: - return handlePut(static_cast<api::PutCommand&>(msg)); + return handlePut(static_cast<api::PutCommand&>(msg), context); case api::MessageType::REMOVE_ID: - return handleRemove(static_cast<api::RemoveCommand&>(msg)); + return handleRemove(static_cast<api::RemoveCommand&>(msg), context); case api::MessageType::UPDATE_ID: - return handleUpdate(static_cast<api::UpdateCommand&>(msg)); + return handleUpdate(static_cast<api::UpdateCommand&>(msg), context); case api::MessageType::REVERT_ID: - return handleRevert(static_cast<api::RevertCommand&>(msg)); + return handleRevert(static_cast<api::RevertCommand&>(msg), context); case api::MessageType::CREATEBUCKET_ID: - return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg)); + return handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), context); case api::MessageType::DELETEBUCKET_ID: - return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg)); + return handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), context); case api::MessageType::JOINBUCKETS_ID: - return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg)); + return handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), context); case api::MessageType::SPLITBUCKET_ID: - return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg)); + return handleSplitBucket(static_cast<api::SplitBucketCommand&>(msg), context); // Depends on iterators case api::MessageType::STATBUCKET_ID: - return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), _context); + return _processAllHandler.handleStatBucket(static_cast<api::StatBucketCommand&>(msg), context); case api::MessageType::REMOVELOCATION_ID: - return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), _context); + return _processAllHandler.handleRemoveLocation(static_cast<api::RemoveLocationCommand&>(msg), context); case api::MessageType::MERGEBUCKET_ID: - return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), _context); + return _mergeHandler.handleMergeBucket(static_cast<api::MergeBucketCommand&>(msg), context); case api::MessageType::GETBUCKETDIFF_ID: - return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), _context); + return _mergeHandler.handleGetBucketDiff(static_cast<api::GetBucketDiffCommand&>(msg), context); case api::MessageType::APPLYBUCKETDIFF_ID: - return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), _context); + return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), context); case api::MessageType::SETBUCKETSTATE_ID: return handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg)); case api::MessageType::INTERNAL_ID: switch(static_cast<api::InternalCommand&>(msg).getType()) { case GetIterCommand::ID: - return handleGetIter(static_cast<GetIterCommand&>(msg)); + return handleGetIter(static_cast<GetIterCommand&>(msg), context); case CreateIteratorCommand::ID: - return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg)); + return handleCreateIterator(static_cast<CreateIteratorCommand&>(msg), context); case ReadBucketList::ID: return handleReadBucketList(static_cast<ReadBucketList&>(msg)); case ReadBucketInfo::ID: @@ -774,9 +773,9 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) case RepairBucketCommand::ID: return handleRepairBucket(static_cast<RepairBucketCommand&>(msg)); case BucketDiskMoveCommand::ID: - return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), _context); + return _diskMoveHandler.handleBucketDiskMove(static_cast<BucketDiskMoveCommand&>(msg), context); case InternalBucketJoinCommand::ID: - return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg)); + return handleInternalBucketJoin(static_cast<InternalBucketJoinCommand&>(msg), context); case RecheckBucketInfoCommand::ID: return handleRecheckBucketInfo(static_cast<RecheckBucketInfoCommand&>(msg)); default: @@ -792,13 +791,13 @@ PersistenceThread::handleCommandSplitByType(api::StorageCommand& msg) MessageTracker::UP PersistenceThread::handleCommand(api::StorageCommand& msg) { - _context = spi::Context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); - MessageTracker::UP mtracker(handleCommandSplitByType(msg)); - if (mtracker && ! _context.getTrace().getRoot().isEmpty()) { + spi::Context context(msg.getLoadType(), msg.getPriority(), msg.getTrace().getLevel()); + MessageTracker::UP mtracker(handleCommandSplitByType(msg, context)); + if (mtracker && ! context.getTrace().getRoot().isEmpty()) { if (mtracker->getReply()) { - mtracker->getReply()->getTrace().getRoot().addChild(_context.getTrace().getRoot()); + mtracker->getReply()->getTrace().getRoot().addChild(context.getTrace().getRoot()); } else { - msg.getTrace().getRoot().addChild(_context.getTrace().getRoot()); + msg.getTrace().getRoot().addChild(context.getTrace().getRoot()); } } return mtracker; @@ -934,7 +933,10 @@ PersistenceThread::flushAllReplies( } #endif spi::Bucket b(bucket, spi::PartitionId(_env._partition)); - spi::Result result = _spi.flush(b, _context); + // Flush is not used for anything currentlu, and the context is not correct either when batching is done + //So just faking it here. + spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0); + spi::Result result = _spi.flush(b, dummyContext); uint32_t errorCode = _env.convertErrorCode(result); if (errorCode != 0) { for (uint32_t i = 0; i < replies.size(); ++i) { diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index ed27a759e8b..e410843c1be 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -28,21 +28,21 @@ public: void flush() override; framework::Thread& getThread() override { return *_thread; } - MessageTracker::UP handlePut(api::PutCommand& cmd); - MessageTracker::UP handleRemove(api::RemoveCommand& cmd); - MessageTracker::UP handleUpdate(api::UpdateCommand& cmd); - MessageTracker::UP handleGet(api::GetCommand& cmd); - MessageTracker::UP handleRevert(api::RevertCommand& cmd); - MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd); - MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd); - MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd); - MessageTracker::UP handleGetIter(GetIterCommand& cmd); + MessageTracker::UP handlePut(api::PutCommand& cmd, spi::Context & context); + MessageTracker::UP handleRemove(api::RemoveCommand& cmd, spi::Context & context); + MessageTracker::UP handleUpdate(api::UpdateCommand& cmd, spi::Context & context); + MessageTracker::UP handleGet(api::GetCommand& cmd, spi::Context & context); + MessageTracker::UP handleRevert(api::RevertCommand& cmd, spi::Context & context); + MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd, spi::Context & context); + MessageTracker::UP handleDeleteBucket(api::DeleteBucketCommand& cmd, spi::Context & context); + MessageTracker::UP handleCreateIterator(CreateIteratorCommand& cmd, spi::Context & context); + MessageTracker::UP handleGetIter(GetIterCommand& cmd, spi::Context & context); MessageTracker::UP handleReadBucketList(ReadBucketList& cmd); MessageTracker::UP handleReadBucketInfo(ReadBucketInfo& cmd); - MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd); + MessageTracker::UP handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context & context); MessageTracker::UP handleSetBucketState(api::SetBucketStateCommand& cmd); - MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd); - MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd); + MessageTracker::UP handleInternalBucketJoin(InternalBucketJoinCommand& cmd, spi::Context & context); + MessageTracker::UP handleSplitBucket(api::SplitBucketCommand& cmd, spi::Context & context); MessageTracker::UP handleRepairBucket(RepairBucketCommand& cmd); MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd); @@ -56,7 +56,6 @@ private: DiskMoveOperationHandler _diskMoveHandler; ServiceLayerComponent::UP _component; framework::Thread::UP _thread; - spi::Context _context; std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; vespalib::Monitor _flushMonitor; bool _closed; @@ -72,7 +71,7 @@ private: // Message handling functions MessageTracker::UP handleCommand(api::StorageCommand&); - MessageTracker::UP handleCommandSplitByType(api::StorageCommand&); + MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, spi::Context & context); void handleReply(api::StorageReply&); MessageTracker::UP processMessage(api::StorageMessage& msg); @@ -88,7 +87,7 @@ private: friend class TestAndSetHelper; bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, - bool missingDocumentImpliesMatch = false); + spi::Context & context, bool missingDocumentImpliesMatch = false); }; } // storage diff --git a/storage/src/vespa/storage/persistence/testandsethelper.cpp b/storage/src/vespa/storage/persistence/testandsethelper.cpp index 511d44ad331..e1909252c8f 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.cpp +++ b/storage/src/vespa/storage/persistence/testandsethelper.cpp @@ -31,12 +31,12 @@ void TestAndSetHelper::parseDocumentSelection() { } } -spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet) { +spi::GetResult TestAndSetHelper::retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context) { return _thread._spi.get( _thread.getBucket(_docId, _cmd.getBucket()), fieldSet, _cmd.getDocumentId(), - _thread._context); + context); } TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd, @@ -51,16 +51,16 @@ TestAndSetHelper::TestAndSetHelper(PersistenceThread & thread, const api::TestAn parseDocumentSelection(); } -TestAndSetHelper::~TestAndSetHelper() { -} +TestAndSetHelper::~TestAndSetHelper() = default; -api::ReturnCode TestAndSetHelper::retrieveAndMatch() { +api::ReturnCode +TestAndSetHelper::retrieveAndMatch(spi::Context & context) { // Walk document selection tree to build a minimal field set FieldVisitor fieldVisitor(*_docTypePtr); _docSelectionUp->visit(fieldVisitor); // Retrieve document - auto result = retrieveDocument(fieldVisitor.getFieldSet()); + auto result = retrieveDocument(fieldVisitor.getFieldSet(), context); // If document exists, match it with selection if (result.hasDocument()) { diff --git a/storage/src/vespa/storage/persistence/testandsethelper.h b/storage/src/vespa/storage/persistence/testandsethelper.h index 21c111c712f..b5fa29d0106 100644 --- a/storage/src/vespa/storage/persistence/testandsethelper.h +++ b/storage/src/vespa/storage/persistence/testandsethelper.h @@ -30,13 +30,13 @@ class TestAndSetHelper { void getDocumentType(); void parseDocumentSelection(); - spi::GetResult retrieveDocument(const document::FieldSet & fieldSet); + spi::GetResult retrieveDocument(const document::FieldSet & fieldSet, spi::Context & context); public: TestAndSetHelper(PersistenceThread & thread, const api::TestAndSetCommand & cmd, bool missingDocumentImpliesMatch = false); ~TestAndSetHelper(); - api::ReturnCode retrieveAndMatch(); + api::ReturnCode retrieveAndMatch(spi::Context & context); }; } // storage |