diff options
Diffstat (limited to 'storage/src')
14 files changed, 100 insertions, 48 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 65c1ac726b5..776cfc14d84 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -3,6 +3,7 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h> #include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/idealstatemetricsset.h> #include <tests/distributor/distributortestutil.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/document/test/make_document_bucket.h> @@ -35,11 +36,13 @@ struct GarbageCollectionOperationTest : Test, DistributorTestUtil { } // FIXME fragile to assume that send order == node index, but that's the way it currently works - void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) { + void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, + uint32_t bucket_info_checksum, uint32_t n_docs_removed) { auto msg = _sender.command(n); assert(msg->getType() == api::MessageType::REMOVELOCATION); std::shared_ptr<api::StorageReply> reply(msg->makeReply()); auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply); + gc_reply.set_documents_removed(n_docs_removed); gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500)); op.receive(_sender, reply); @@ -56,6 +59,13 @@ struct GarbageCollectionOperationTest : Test, DistributorTestUtil { << entry->getNode(i)->getBucketInfo(); } } + + uint32_t gc_removed_documents_metric() { + auto metric_base = getIdealStateManager().getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION]; + auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base); + assert(gc_metrics); + return gc_metrics->documents_removed.getValue(); + } }; TEST_F(GarbageCollectionOperationTest, simple) { @@ -63,29 +73,34 @@ TEST_F(GarbageCollectionOperationTest, simple) { op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); + EXPECT_EQ(0u, gc_removed_documents_metric()); for (uint32_t i = 0; i < 2; ++i) { std::shared_ptr<api::StorageCommand> msg = _sender.command(i); ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION); auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg); EXPECT_EQ("music.date < 34", tmp.getDocumentSelection()); - reply_to_nth_request(*op, i, 777 + i); + reply_to_nth_request(*op, i, 777 + i, 50); } ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34)); + EXPECT_EQ(50u, gc_removed_documents_metric()); } TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) { auto op = create_op(); op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); + EXPECT_EQ(0u, gc_removed_documents_metric()); // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet - reply_to_nth_request(*op, 0, 1234); + reply_to_nth_request(*op, 0, 1234, 70); ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); // Respond to 2nd request. This _should_ cause bucket info to be merged into the database. - reply_to_nth_request(*op, 1, 4567); + reply_to_nth_request(*op, 1, 4567, 60); ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34)); + + EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics } TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { @@ -93,10 +108,10 @@ TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_s op->start(_sender, framework::MilliSecTime(0)); ASSERT_EQ(2, _sender.commands().size()); - reply_to_nth_request(*op, 0, 1234); + reply_to_nth_request(*op, 0, 1234, 0); // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost. insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000); - reply_to_nth_request(*op, 1, 4567); + reply_to_nth_request(*op, 1, 4567, 0); // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op. ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34)); } diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 8c0f8853d2d..83f243ed1b2 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -23,11 +23,15 @@ TEST_F(ProcessAllHandlerTest, remove_location) { api::RemoveLocationCommand removeLocation("id.user == 4", makeDocumentBucket(bucketId)); ProcessAllHandler handler(getEnv(), getPersistenceProvider()); spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - handler.handleRemoveLocation(removeLocation, context); + auto tracker = handler.handleRemoveLocation(removeLocation, context); EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", dumpBucket(bucketId)); + + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReply()); + ASSERT_TRUE(reply.get() != nullptr); + EXPECT_EQ(2u, reply->documents_removed()); } TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { @@ -44,7 +48,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { api::RemoveLocationCommand removeLocation("testdoctype1.headerval % 2 == 0", makeDocumentBucket(bucketId)); spi::Context context(documentapi::LoadType::DEFAULT, 0, 0); - handler.handleRemoveLocation(removeLocation, context); + auto tracker = handler.handleRemoveLocation(removeLocation, context); EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" @@ -57,6 +61,10 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { "DocEntry(108, 1, id:mail:testdoctype1:n=4:42967.html)\n" "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", dumpBucket(bucketId)); + + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(tracker->getReply()); + ASSERT_TRUE(reply.get() != nullptr); + EXPECT_EQ(5u, reply->documents_removed()); } TEST_F(ProcessAllHandlerTest, remove_location_throws_exception_on_unknown_doc_type) { diff --git a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp index 61e67b40f44..c211e775326 100644 --- a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp @@ -4,8 +4,7 @@ #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/idealstatemetricsset.h> -namespace storage { -namespace distributor { +namespace storage::distributor { BucketDBMetricUpdater::Stats::Stats() : _docCount(0), @@ -27,9 +26,7 @@ BucketDBMetricUpdater::BucketDBMetricUpdater() { } -BucketDBMetricUpdater::~BucketDBMetricUpdater() -{ -} +BucketDBMetricUpdater::~BucketDBMetricUpdater() = default; void BucketDBMetricUpdater::resetStats() @@ -148,5 +145,4 @@ BucketDBMetricUpdater::reset() resetStats(); } -} // distributor -} // storage +} // storage::distributor diff --git a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h index 6e15ee03d12..7ef8479866f 100644 --- a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h @@ -7,12 +7,9 @@ #include <unordered_map> -namespace storage { +namespace storage::distributor { class DistributorMetricSet; - -namespace distributor { - class IdealStateMetricSet; class BucketDBMetricUpdater { @@ -107,5 +104,4 @@ private: void resetStats(); }; -} // distributor -} // storage +} // storage::distributor diff --git a/storage/src/vespa/storage/distributor/distributorinterface.h b/storage/src/vespa/storage/distributor/distributorinterface.h index aba58e112dc..b17bcd56d19 100644 --- a/storage/src/vespa/storage/distributor/distributorinterface.h +++ b/storage/src/vespa/storage/distributor/distributorinterface.h @@ -12,10 +12,10 @@ namespace storage::api { class MergeBucketReply; } namespace storage::lib { class ClusterStateBundle; } namespace storage { class DistributorConfiguration; - class DistributorMetricSet; } namespace storage::distributor { +class DistributorMetricSet; class PendingMessageTracker; class DistributorInterface : public DistributorMessageSender diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp index 244406ca6fb..98e96f9294f 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp +++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp @@ -3,7 +3,7 @@ #include <vespa/metrics/loadmetric.hpp> #include <vespa/metrics/summetric.hpp> -namespace storage { +namespace storage::distributor { using metrics::MetricSet; diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h index 1e4730b8de6..b5be72e8c14 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.h +++ b/storage/src/vespa/storage/distributor/distributormetricsset.h @@ -7,7 +7,7 @@ #include <vespa/metrics/metrics.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> -namespace storage { +namespace storage::distributor { class DistributorMetricSet : public metrics::MetricSet { diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 96875a3644a..60cad15a791 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -13,11 +13,11 @@ namespace storage { -class DistributorMetricSet; class PersistenceOperationMetricSet; namespace distributor { +class DistributorMetricSet; class Distributor; class MaintenanceOperationGenerator; class DirectDispatchSender; diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp index d72f4a80ef4..fd193ad6fd8 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp @@ -6,7 +6,7 @@ namespace storage { namespace distributor { OperationMetricSet::OperationMetricSet(const std::string& name, metrics::Metric::Tags tags, const std::string& description, MetricSet* owner) - : MetricSet(name, tags, description, owner), + : MetricSet(name, std::move(tags), description, owner), pending("pending", {{"logdefault"},{"yamasdefault"}}, "The number of operations pending", this), @@ -16,14 +16,25 @@ OperationMetricSet::OperationMetricSet(const std::string& name, metrics::Metric: failed("done_failed", {{"logdefault"},{"yamasdefault"}}, "The number of operations that failed", this) -{ } +{} -OperationMetricSet::~OperationMetricSet() { } +OperationMetricSet::~OperationMetricSet() = default; + +GcMetricSet::GcMetricSet(const std::string& name, metrics::Metric::Tags tags, const std::string& description, MetricSet* owner) + : OperationMetricSet(name, std::move(tags), description, owner), + documents_removed("documents_removed", + {{"logdefault"},{"yamasdefault"}}, + "Number of documents removed by GC operations", this) +{} + +GcMetricSet::~GcMetricSet() = default; void IdealStateMetricSet::createOperationMetrics() { typedef IdealStateOperation ISO; operations.resize(ISO::OPERATION_COUNT); + // Note: naked new is used instead of make_shared due to the latter not being + // able to properly transitively deduce the types for the tag initializer lists. operations[ISO::DELETE_BUCKET] = std::shared_ptr<OperationMetricSet>( new OperationMetricSet("delete_bucket", {{"logdefault"},{"yamasdefault"}}, @@ -45,9 +56,9 @@ IdealStateMetricSet::createOperationMetrics() { {{"logdefault"},{"yamasdefault"}}, "Operations to set active/ready state for bucket copies", this)); operations[ISO::GARBAGE_COLLECTION] = std::shared_ptr<OperationMetricSet>( - new OperationMetricSet("garbage_collection", - {{"logdefault"},{"yamasdefault"}}, - "Operations to garbage collect data from buckets", this)); + new GcMetricSet("garbage_collection", + {{"logdefault"},{"yamasdefault"}}, + "Operations to garbage collect data from buckets", this)); } IdealStateMetricSet::IdealStateMetricSet() @@ -81,7 +92,7 @@ IdealStateMetricSet::IdealStateMetricSet() createOperationMetrics(); } -IdealStateMetricSet::~IdealStateMetricSet() { } +IdealStateMetricSet::~IdealStateMetricSet() = default; void IdealStateMetricSet::setPendingOperations(const std::vector<uint64_t>& newMetrics) { for (uint32_t i = 0; i < IdealStateOperation::OPERATION_COUNT; i++) { diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.h b/storage/src/vespa/storage/distributor/idealstatemetricsset.h index 7bb472b4a2c..2679da17598 100644 --- a/storage/src/vespa/storage/distributor/idealstatemetricsset.h +++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.h @@ -16,13 +16,21 @@ public: metrics::LongCountMetric failed; OperationMetricSet(const std::string& name, metrics::Metric::Tags tags, const std::string& description, MetricSet* owner); - ~OperationMetricSet(); + ~OperationMetricSet() override; +}; + +struct GcMetricSet : OperationMetricSet { + metrics::LongCountMetric documents_removed; + + GcMetricSet(const std::string& name, metrics::Metric::Tags tags, + const std::string& description, MetricSet* owner); + ~GcMetricSet() override; }; class IdealStateMetricSet : public metrics::MetricSet { public: - std::vector<std::shared_ptr<OperationMetricSet> > operations; + std::vector<std::shared_ptr<OperationMetricSet>> operations; metrics::LongValueMetric idealstate_diff; metrics::LongValueMetric buckets_toofewcopies; metrics::LongValueMetric buckets_toomanycopies; @@ -35,7 +43,7 @@ public: void createOperationMetrics(); IdealStateMetricSet(); - ~IdealStateMetricSet(); + ~IdealStateMetricSet() override; void setPendingOperations(const std::vector<uint64_t>& newMetrics); }; diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index c674add80f7..fc127c2e0eb 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -2,6 +2,7 @@ #include "garbagecollectionoperation.h" #include <vespa/storage/distributor/idealstatemanager.h> +#include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storageapi/message/removelocation.h> @@ -9,19 +10,18 @@ #include <vespa/log/log.h> LOG_SETUP(".distributor.operation.idealstate.remove"); -using namespace storage::distributor; +namespace storage::distributor { GarbageCollectionOperation::GarbageCollectionOperation(const std::string& clusterName, const BucketAndNodes& nodes) : IdealStateOperation(nodes), _tracker(clusterName), - _replica_info() + _replica_info(), + _max_documents_removed(0) {} GarbageCollectionOperation::~GarbageCollectionOperation() = default; -void -GarbageCollectionOperation::onStart(DistributorMessageSender& sender) -{ +void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) { BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); std::vector<uint16_t> nodes = entry->getNodes(); @@ -43,7 +43,7 @@ GarbageCollectionOperation::onStart(DistributorMessageSender& sender) void GarbageCollectionOperation::onReceive(DistributorMessageSender&, - const std::shared_ptr<api::StorageReply>& reply) + const std::shared_ptr<api::StorageReply>& reply) { auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get()); assert(rep != nullptr); @@ -53,6 +53,7 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&, if (!rep->getResult().failed()) { _replica_info.emplace_back(_manager->getDistributorComponent().getUniqueTimestamp(), node, rep->getBucketInfo()); + _max_documents_removed = std::max(_max_documents_removed, rep->documents_removed()); } else { _ok = false; } @@ -61,6 +62,7 @@ GarbageCollectionOperation::onReceive(DistributorMessageSender&, if (_ok) { merge_received_bucket_info_into_db(); } + update_gc_metrics(); done(); } } @@ -76,8 +78,16 @@ void GarbageCollectionOperation::merge_received_bucket_info_into_db() { } } +void GarbageCollectionOperation::update_gc_metrics() { + auto metric_base = _manager->getMetrics().operations[IdealStateOperation::GARBAGE_COLLECTION]; + auto gc_metrics = std::dynamic_pointer_cast<GcMetricSet>(metric_base); + assert(gc_metrics); + gc_metrics->documents_removed.inc(_max_documents_removed); +} + bool -GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const -{ +GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { return true; } + +} diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 47ea11bb328..28de9592a63 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -26,8 +26,10 @@ protected: MessageTracker _tracker; private: std::vector<BucketCopy> _replica_info; + uint32_t _max_documents_removed; void merge_received_bucket_info_into_db(); + void update_gc_metrics(); }; } diff --git a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h index d951d7ceba2..1299fdad2ad 100644 --- a/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h +++ b/storage/src/vespa/storage/distributor/persistence_operation_metric_set.h @@ -16,7 +16,7 @@ class PersistenceFailuresMetricSet : public metrics::MetricSet { public: explicit PersistenceFailuresMetricSet(metrics::MetricSet* owner); - ~PersistenceFailuresMetricSet(); + ~PersistenceFailuresMetricSet() override; metrics::SumMetric<metrics::LongCountMetric> sum; metrics::LongCountMetric notready; @@ -44,7 +44,7 @@ public: PersistenceFailuresMetricSet failures; PersistenceOperationMetricSet(const std::string& name, metrics::MetricSet* owner = nullptr); - ~PersistenceOperationMetricSet(); + ~PersistenceOperationMetricSet() override; MetricSet * clone(std::vector<Metric::UP>& ownerList, CopyType copyType, metrics::MetricSet* owner, bool includeUnused) const override; diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 8c951a9f50d..5b94a3da027 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -23,6 +23,7 @@ public: spi::PersistenceProvider& _provider; const spi::Bucket& _bucket; spi::Context& _context; + uint32_t _n_removed; UnrevertableRemoveEntryProcessor( spi::PersistenceProvider& provider, @@ -30,7 +31,9 @@ public: spi::Context& context) : _provider(provider), _bucket(bucket), - _context(context) {} + _context(context), + _n_removed(0) + {} void process(spi::DocEntry& entry) override { spi::RemoveResult removeResult = _provider.remove( @@ -45,13 +48,14 @@ public: << removeResult.getErrorMessage(); throw std::runtime_error(ss.str()); } + ++_n_removed; } }; class StatEntryProcessor : public BucketProcessor::EntryProcessor { public: std::ostream& ost; - StatEntryProcessor(std::ostream& o) + explicit StatEntryProcessor(std::ostream& o) : ost(o) {}; void process(spi::DocEntry& e) override { @@ -97,7 +101,9 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, context); spi::Result result = _spi.flush(bucket, context); uint32_t code = _env.convertErrorCode(result); - if (code != 0) { + if (code == 0) { + tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); + } else { tracker->fail(code, result.getErrorMessage()); } |