diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-24 11:17:01 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-24 13:05:22 +0000 |
commit | 23940966c79d9977e3d7ba4939037f90058a5b50 (patch) | |
tree | 82a1928fef6a5076eb7a8a861bb4ff0cc646bf28 /storage | |
parent | 1545c0208a32e96db1bf17237b465757e7eab617 (diff) |
Remove load type from MergeStatus and Mergehandler.
Diffstat (limited to 'storage')
17 files changed, 54 insertions, 52 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index 646e9df7258..079abd6df06 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -2,6 +2,7 @@ #include <vespa/storage/persistence/messages.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <tests/persistence/common/filestortestfixture.h> #include <vespa/document/repo/documenttyperepo.h> diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp index 35aee60d30f..8b38083b33d 100644 --- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp +++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp @@ -2,7 +2,7 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/persistence/messages.h> -#include <tests/persistence/common/persistenceproviderwrapper.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <tests/persistence/common/filestortestfixture.h> #include <vector> diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 0582aa5c6b3..95e2d8e2c43 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/storage/persistence/messages.h> +#include <vespa/storageapi/message/bucket.h> #include <tests/persistence/common/persistenceproviderwrapper.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <tests/persistence/common/filestortestfixture.h> @@ -9,7 +10,6 @@ #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/stllike/hash_set_insert.hpp> #include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/time.h> #include <thread> #include <vespa/log/log.h> diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index fd01d1683de..c1da8e9ec06 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -2,6 +2,7 @@ #include <vespa/document/base/testdocman.h> #include <vespa/storage/persistence/mergehandler.h> +#include <vespa/storage/persistence/filestorage/mergestatus.h> #include <tests/persistence/persistencetestutils.h> #include <tests/persistence/common/persistenceproviderwrapper.h> #include <tests/common/message_sender_stub.h> @@ -1066,7 +1067,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { TEST_F(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; - MergeStatus status(clock, documentapi::LoadType::DEFAULT, 0, 0); + MergeStatus status(clock, 0, 0); std::vector<api::GetBucketDiffCommand::Entry> diff(2); diff[0]._timestamp = 1234; diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index ed54565c3c7..7943790f13d 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -3,9 +3,9 @@ #include <vespa/document/datatype/datatype.h> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> -#include <vespa/document/fieldvalue/rawfieldvalue.h> #include <vespa/storageapi/message/datagram.h> #include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index aafc87aa84f..8b031af4b69 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -13,10 +13,10 @@ #pragma once -#include "mergestatus.h" #include <vespa/document/bucket/bucket.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/storageapi/messageapi/storagemessage.h> namespace storage { namespace api { @@ -34,6 +34,7 @@ struct FileStorMetrics; struct MessageSender; struct ServiceLayerComponentRegister; class AbortBucketOperationsCommand; +class MergeStatus; class FileStorHandler : public MessageSender { public: @@ -220,7 +221,7 @@ public: /** * Add a new merge state to the registry. */ - virtual void addMergeStatus(const document::Bucket&, MergeStatus::SP) = 0; + virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0; /** * Returns the reference to the current merge status for the given bucket. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 01f44d2df42..980bd09a133 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -2,6 +2,7 @@ #include "filestorhandlerimpl.h" #include "filestormetrics.h" +#include "mergestatus.h" #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> @@ -77,7 +78,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe FileStorHandlerImpl::~FileStorHandlerImpl() = default; void -FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) +FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_ptr<MergeStatus> status) { std::lock_guard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) { @@ -90,7 +91,7 @@ MergeStatus& FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) { std::lock_guard mlock(_mergeStatesLock); - MergeStatus::SP status = _mergeStates[bucket]; + std::shared_ptr<MergeStatus> status = _mergeStates[bucket]; if ( ! status ) { throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } @@ -140,7 +141,7 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api: return; } if (code != 0) { - MergeStatus::SP statusPtr(it->second); + std::shared_ptr<MergeStatus> statusPtr(it->second); assert(statusPtr.get()); MergeStatus& status(*statusPtr); if (status.reply.get()) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 549de164229..b3d4ff0c730 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -16,11 +16,11 @@ #pragma once #include "filestorhandler.h" -#include "mergestatus.h" #include <vespa/document/bucket/bucketid.h> #include <vespa/metrics/metrics.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/storageapi/messageapi/storagereply.h> #include <boost/multi_index_container.hpp> #include <boost/multi_index/identity.hpp> #include <boost/multi_index/member.hpp> @@ -218,7 +218,7 @@ public: return stripe(bucket).lock(bucket, lockReq); } - void addMergeStatus(const document::Bucket&, MergeStatus::SP) override; + void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override; MergeStatus& editMergeStatus(const document::Bucket&) override; bool isMerging(const document::Bucket&) const override; uint32_t getNumActiveMerges() const override; @@ -242,7 +242,7 @@ private: MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; mutable std::mutex _mergeStatesLock; - std::map<document::Bucket, MergeStatus::SP> _mergeStates; + std::map<document::Bucket, std::shared_ptr<MergeStatus>> _mergeStates; vespalib::duration _getNextMessageTimeout; const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes. mutable std::mutex _pauseMonitor; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 2953462dd1e..7abb41f4741 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -34,6 +34,7 @@ namespace storage { namespace api { class ReturnCode; class StorageReply; + class BucketCommand; } struct FileStorManagerTest; diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index a42a57f648c..d75f09dfb3c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -9,7 +9,7 @@ LOG_SETUP(".mergestatus"); namespace storage { -MergeStatus::MergeStatus(const framework::Clock& clock, const metrics::LoadType&, +MergeStatus::MergeStatus(const framework::Clock& clock, api::StorageMessage::Priority priority, uint32_t traceLevel) : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), @@ -30,8 +30,7 @@ MergeStatus::removeFromDiff( const std::vector<api::MergeBucketCommand::Node> &nodes) { std::deque<api::GetBucketDiffCommand::Entry>::iterator it(diff.begin()); - std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it2( - part.begin()); + std::vector<api::ApplyBucketDiffCommand::Entry>::const_iterator it2(part.begin()); bool altered = false; HasMaskRemapper remap_mask(nodeList, nodes); // We expect part array to be sorted in the same order as in the diff, diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index 51930f337c6..97a66ef56c1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -15,8 +15,6 @@ namespace storage { class MergeStatus : public document::Printable { public: - using SP = std::shared_ptr<MergeStatus>; - std::shared_ptr<api::StorageReply> reply; std::vector<api::MergeBucketCommand::Node> full_node_list; std::vector<api::MergeBucketCommand::Node> nodeList; @@ -29,8 +27,8 @@ public: framework::MilliSecTimer startTime; spi::Context context; - MergeStatus(const framework::Clock&, const metrics::LoadType&, api::StorageMessage::Priority, uint32_t traceLevel); - ~MergeStatus(); + MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel); + ~MergeStatus() override; /** * Note: hasMask parameter and _entry._hasMask in part vector are per-reply masks, @@ -41,7 +39,7 @@ public: */ bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes); void print(std::ostream& out, bool verbose, const std::string& indent) const override; - bool isFirstNode() const { return (reply.get() != 0); } + bool isFirstNode() const { return static_cast<bool>(reply); } }; } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index cab35e77bac..148763f8d30 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -4,6 +4,7 @@ #include "persistenceutil.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_entry_result.h" +#include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> @@ -145,7 +146,6 @@ MergeHandler::populateMetaData( bool MergeHandler::buildBucketInfoList( const spi::Bucket& bucket, - const documentapi::LoadType& /*loadType*/, Timestamp maxTimestamp, uint8_t myNodeIndex, std::vector<api::GetBucketDiffCommand::Entry>& output, @@ -320,7 +320,6 @@ namespace { void MergeHandler::fetchLocalData( const spi::Bucket& bucket, - const documentapi::LoadType& /*loadType*/, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context) const @@ -510,7 +509,6 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, api::BucketInfo MergeHandler::applyDiffLocally( const spi::Bucket& bucket, - const documentapi::LoadType& /*loadType*/, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context) const @@ -804,7 +802,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd->setTimeout(status.timeout); if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, cmd->getLoadType(), cmd->getDiff(), 0, context); + fetchLocalData(bucket, cmd->getDiff(), 0, context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } status.pendingId = cmd->getMsgId(); @@ -882,9 +880,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP checkResult(_spi.createBucket(bucket, tracker->context()), bucket, "create bucket"); MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - auto s = std::make_shared<MergeStatus>( - _clock, cmd.getLoadType(), - cmd.getPriority(), cmd.getTrace().getLevel()); + auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->full_node_list = cmd.getNodes(); s->nodeList = cmd.getNodes(); @@ -893,7 +889,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP s->startTime = framework::MilliSecTimer(_clock); auto cmd2 = std::make_shared<api::GetBucketDiffCommand>(bucket.getBucket(), s->nodeList, s->maxTimestamp.getTime()); - if (!buildBucketInfoList(bucket, cmd.getLoadType(), s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) { + if (!buildBucketInfoList(bucket, s->maxTimestamp, 0, cmd2->getDiff(), tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); tracker->fail(ReturnCode::BUCKET_DELETED, "Bucket not found in buildBucketInfo step"); return tracker; @@ -1065,8 +1061,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker std::vector<api::GetBucketDiffCommand::Entry>& remote(cmd.getDiff()); std::vector<api::GetBucketDiffCommand::Entry> local; framework::MilliSecTimer startTime(_clock); - if (!buildBucketInfoList(bucket, cmd.getLoadType(), - Timestamp(cmd.getMaxTimestamp()), + if (!buildBucketInfoList(bucket, Timestamp(cmd.getMaxTimestamp()), index, local, tracker->context())) { LOG(debug, "Bucket non-existing in db. Failing merge."); @@ -1106,9 +1101,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker // When not the last node in merge chain, we must save reply, and // send command on. MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - auto s = std::make_shared<MergeStatus>(_clock, - cmd.getLoadType(), cmd.getPriority(), - cmd.getTrace().getLevel()); + auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingGetDiff = std::make_shared<api::GetBucketDiffReply>(cmd); @@ -1250,7 +1243,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra bool lastInChain = index + 1u >= cmd.getNodes().size(); if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); + fetchLocalData(bucket, cmd.getDiff(), index, tracker->context()); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Moving %zu entries, didn't need " @@ -1262,7 +1255,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); - (void) applyDiffLocally(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); + (void) applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context()); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { @@ -1296,9 +1289,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra // When not the last node in merge chain, we must save reply, and // send command on. MergeStateDeleter stateGuard(_env._fileStorHandler, bucket.getBucket()); - auto s = std::make_shared<MergeStatus>(_clock, - cmd.getLoadType(), cmd.getPriority(), - cmd.getTrace().getLevel()); + auto s = std::make_shared<MergeStatus>(_clock, cmd.getPriority(), cmd.getTrace().getLevel()); _env._fileStorHandler.addMergeStatus(bucket.getBucket(), s); s->pendingApplyDiff = std::make_shared<api::ApplyBucketDiffReply>(cmd); @@ -1352,12 +1343,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex); if (applyDiffNeedLocalData(diff, index, false)) { framework::MilliSecTimer startTime(_clock); - fetchLocalData(bucket, reply.getLoadType(), diff, index, s.context); + fetchLocalData(bucket, diff, index, s.context); _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); - (void) applyDiffLocally(bucket, reply.getLoadType(), diff, index, s.context); + (void) applyDiffLocally(bucket, diff, index, s.context); _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 64b1448577a..25b7f281ef0 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -16,15 +16,18 @@ #include "types.h" #include <vespa/persistence/spi/bucket.h> #include <vespa/persistence/spi/docentry.h> -#include <vespa/storage/persistence/filestorage/mergestatus.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/messagesender.h> namespace storage { -namespace spi { struct PersistenceProvider; } +namespace spi { + struct PersistenceProvider; + class Context; +} class PersistenceUtil; class ApplyBucketDiffEntryResult; +class MergeStatus; class MergeHandler : public Types { @@ -42,19 +45,16 @@ public: bool buildBucketInfoList( const spi::Bucket& bucket, - const documentapi::LoadType&, Timestamp maxTimestamp, uint8_t myNodeIndex, std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const; void fetchLocalData(const spi::Bucket& bucket, - const documentapi::LoadType&, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context) const; api::BucketInfo applyDiffLocally( const spi::Bucket& bucket, - const documentapi::LoadType&, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, uint8_t nodeIndex, spi::Context& context) const; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 90845e17b6f..0449b21fc81 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistenceutil.h" +#include <vespa/storageapi/messageapi/bucketinforeply.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/log/bufferedlogger.h> diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 8eeea6dddd2..ddd5619b964 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -6,11 +6,17 @@ #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/persistence/filestorage/filestorhandler.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> +#include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storage/storageutil/utils.h> +namespace storage::api { + class StorageMessage; + class StorageReply; +} namespace storage { + class PersistenceUtil; class MessageTracker : protected Types { @@ -18,7 +24,7 @@ public: typedef std::unique_ptr<MessageTracker> UP; MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); ~MessageTracker(); @@ -29,7 +35,7 @@ public: * non-default reply. They should call this function as soon as they create * a reply, to ensure it is stored in case of failure after reply creation. */ - void setReply(api::StorageReply::SP reply) { + void setReply(std::shared_ptr<api::StorageReply> reply) { assert( ! _reply ); _reply = std::move(reply); } @@ -52,7 +58,7 @@ public: api::StorageReply & getReply() { return *_reply; } - api::StorageReply::SP && stealReplySP() && { + std::shared_ptr<api::StorageReply> && stealReplySP() && { return std::move(_reply); } @@ -71,23 +77,23 @@ public: static MessageTracker::UP createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); private: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); [[nodiscard]] bool count_result_as_failure() const noexcept; bool _sendReply; bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; - api::StorageMessage::SP _msg; + std::shared_ptr<api::StorageMessage> _msg; spi::Context _context; const PersistenceUtil &_env; MessageSender &_replySender; FileStorThreadMetrics::Op *_metric; // needs a better and thread safe solution - api::StorageReply::SP _reply; + std::shared_ptr<api::StorageReply> _reply; api::ReturnCode _result; framework::MilliSecTimer _timer; }; diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp index 3d5a4efc1af..6ed928245db 100644 --- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp +++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp @@ -4,6 +4,7 @@ #include "persistenceutil.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/common/bucketoperationlogger.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/base/exceptions.h> #include <vespa/document/fieldset/fieldsetrepo.h> diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp index 6049dabc2fa..a0fabca4890 100644 --- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp +++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp @@ -7,6 +7,7 @@ #include "messages.h" #include <vespa/storage/common/bucketmessages.h> #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/log/log.h> LOG_SETUP(".persistence.splitjoinhandler"); |