diff options
author | Tor Egge <Tor.Egge@yahoo-inc.com> | 2016-06-23 17:41:52 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@yahoo-inc.com> | 2016-06-27 09:30:48 +0000 |
commit | 272d6b7e0d7fa16f49c43c63cf514026528d5a07 (patch) | |
tree | ca3ab5d8f004b0c1cb022febda5d370e551422eb /searchcore | |
parent | a7a363120c4f5ab96937a5f7a176ae5c62ede7bd (diff) |
Stop bucket move maintenance job when disk is full.
Diffstat (limited to 'searchcore')
19 files changed, 353 insertions, 12 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 9062067132c..ad9ffdfbe3e 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -13,6 +13,7 @@ LOG_SETUP("documentbucketmover_test"); #include <vespa/searchcore/proton/server/idocumentmovehandler.h> #include <vespa/searchcore/proton/test/clusterstatehandler.h> #include <vespa/searchcore/proton/test/buckethandler.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/searchcore/proton/server/maintenancedocumentsubdb.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> @@ -536,6 +537,7 @@ struct ControllerFixtureBase MySubDb _ready; MySubDb _notReady; MyFrozenBucketHandler _fbh; + test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketMoveJob _bmj; ControllerFixtureBase() : _builder(), @@ -547,8 +549,10 @@ struct ControllerFixtureBase _ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY), _notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY), _fbh(), + _diskMemUsageNotifier(), _bmj(_calc, _moveHandler, _modifiedHandler, _ready._subDb, _notReady._subDb, _fbh, _clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, "test") { } @@ -1175,6 +1179,27 @@ TEST_F("require that thawed bucket is not moved if active as well", ControllerFi } +TEST_F("require that bucket move stops when disk limit is reached", ControllerFixture) +{ + // Bucket 1 shold be moved + f.addReady(f._ready.bucket(2)); + // Note: This depends on f._bmj.run() moving max 1 documents + EXPECT_TRUE(!f._bmj.run()); + EXPECT_EQUAL(1u, f.docsMoved().size()); + EXPECT_EQUAL(0u, f.bucketsModified().size()); + // Notify that we've over disk limit + f._diskMemUsageNotifier.notify(DiskMemUsageState(true, false)); + EXPECT_TRUE(f._bmj.run()); + EXPECT_EQUAL(1u, f.docsMoved().size()); + EXPECT_EQUAL(0u, f.bucketsModified().size()); + // Notify that we've under disk limit + f._diskMemUsageNotifier.notify(DiskMemUsageState(false, false)); + EXPECT_TRUE(!f._bmj.run()); + EXPECT_EQUAL(2u, f.docsMoved().size()); + EXPECT_EQUAL(0u, f.bucketsModified().size()); +} + + TEST_MAIN() { TEST_RUN_ALL(); diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 0513d8f45d9..711e1391c05 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -26,6 +26,7 @@ LOG_SETUP("maintenancecontroller_test"); #include <vespa/searchcore/proton/feedoperation/moveoperation.h> #include <vespa/searchcore/proton/test/clusterstatehandler.h> #include <vespa/searchcore/proton/test/buckethandler.h> +#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> #include <vespa/vespalib/util/threadstackexecutor.h> using namespace proton; @@ -479,6 +480,7 @@ public: std::shared_ptr<proton::IAttributeManager> _readyAttributeManager; std::shared_ptr<proton::IAttributeManager> _notReadyAttributeManager; AttributeUsageFilter _attributeUsageFilter; + test::DiskMemUsageNotifier _diskMemUsageNotifier; MaintenanceController _mc; MaintenanceControllerFixture(void); @@ -1017,7 +1019,9 @@ MaintenanceControllerFixture::injectMaintenanceJobs() MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _fh, _gsp, _fh, lscHandlers, _fh, _mc, _docTypeName.getName(), _fh, _fh, _bmc, _clusterStateHandler, _bucketHandler, - _calc, _jobTrackers, *this, + _calc, + _diskMemUsageNotifier, + _jobTrackers, *this, _readyAttributeManager, _notReadyAttributeManager, _attributeUsageFilter); diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index e0f885a8c6f..cc7ce7e8138 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_library(searchcore_server STATIC ddbstate.cpp disk_mem_usage_filter.cpp disk_mem_usage_sampler.cpp + disk_mem_usage_forwarder.cpp docstorevalidator.cpp document_db_explorer.cpp document_db_maintenance_config.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index b000e8b3543..46f8bc21a7e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -8,6 +8,7 @@ LOG_SETUP(".proton.server.bucketmovejob"); #include "ibucketstatechangednotifier.h" #include "iclusterstatechangednotifier.h" #include "maintenancedocumentsubdb.h" +#include "i_disk_mem_usage_notifier.h" #include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> using document::BucketId; @@ -143,6 +144,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, IFrozenBucketHandler &frozenBuckets, IClusterStateChangedNotifier &clusterStateChangedNotifier, IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, const vespalib::string &docTypeName) : IMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0), IClusterStateChangedHandler(), @@ -165,15 +167,18 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _clusterUp(false), _nodeUp(false), _nodeInitializing(false), + _resourcesOK(false), _runnable(false), _clusterStateChangedNotifier(clusterStateChangedNotifier), - _bucketStateChangedNotifier(bucketStateChangedNotifier) + _bucketStateChangedNotifier(bucketStateChangedNotifier), + _diskMemUsageNotifier(diskMemUsageNotifier) { refreshDerivedClusterState(); _frozenBuckets.addListener(this); _clusterStateChangedNotifier.addClusterStateChangedHandler(this); _bucketStateChangedNotifier.addBucketStateChangedHandler(this); + _diskMemUsageNotifier.addDiskMemUsageListener(this); } @@ -182,6 +187,7 @@ BucketMoveJob::~BucketMoveJob() _frozenBuckets.removeListener(this); _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); _bucketStateChangedNotifier.removeBucketStateChangedHandler(this); + _diskMemUsageNotifier.removeDiskMemUsageListener(this); } @@ -321,6 +327,11 @@ BucketMoveJob::run() return done(); } +void +BucketMoveJob::refreshRunnable() +{ + _runnable = _clusterUp && _nodeUp && !_nodeInitializing && _resourcesOK; +} void BucketMoveJob::refreshDerivedClusterState() @@ -328,7 +339,7 @@ BucketMoveJob::refreshDerivedClusterState() _clusterUp = _calc.get() != NULL && _calc->clusterUp(); _nodeUp = _calc.get() != NULL && _calc->nodeUp(); _nodeInitializing = _calc.get() != NULL && _calc->nodeInitializing(); - _runnable = _clusterUp && _nodeUp && !_nodeInitializing; + refreshRunnable(); } void @@ -359,6 +370,15 @@ BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId, } } - +void BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state) +{ + // Called by master write thread + bool resourcesOK = !state.aboveDiskLimit(); + _resourcesOK = resourcesOK; + refreshRunnable(); + if (_runner && _runnable) { + _runner->run(); + } +} } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index d0adc4cefb0..086bbd33c48 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -10,6 +10,7 @@ #include "iclusterstatechangedhandler.h" #include "ibucketfreezelistener.h" #include "ibucketstatechangedhandler.h" +#include "i_disk_mem_usage_listener.h" #include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h> namespace proton @@ -18,6 +19,7 @@ namespace proton class IBucketStateChangedNotifier; class IClusterStateChangedNotifier; +class IDiskMemUsageNotifier; /** * Class used to control the moving of buckets between the ready and @@ -26,8 +28,8 @@ class IClusterStateChangedNotifier; class BucketMoveJob : public IMaintenanceJob, public IClusterStateChangedHandler, public IBucketFreezeListener, - public IBucketStateChangedHandler - + public IBucketStateChangedHandler, + public IDiskMemUsageListener { public: struct ScanPosition @@ -98,9 +100,11 @@ private: bool _clusterUp; bool _nodeUp; bool _nodeInitializing; + bool _resourcesOK; bool _runnable; // can try to perform work IClusterStateChangedNotifier &_clusterStateChangedNotifier; IBucketStateChangedNotifier &_bucketStateChangedNotifier; + IDiskMemUsageNotifier &_diskMemUsageNotifier; ScanResult scanBuckets(size_t maxBucketsToScan, @@ -120,6 +124,7 @@ private: DocumentBucketMover &mover, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard); + void refreshRunnable(); void refreshDerivedClusterState(); /** @@ -143,6 +148,7 @@ public: IFrozenBucketHandler &frozenBuckets, IClusterStateChangedNotifier &clusterStateChangedNotifier, IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, const vespalib::string &docTypeName); virtual ~BucketMoveJob(); @@ -180,6 +186,7 @@ public: void notifyBucketStateChanged(const document::BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState) override; + virtual void notifyDiskMemUsage(DiskMemUsageState state) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp index ddcd038077e..6baa99cacdf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp @@ -2,6 +2,7 @@ #include <vespa/fastos/fastos.h> #include "disk_mem_usage_filter.h" +#include "i_disk_mem_usage_listener.h" #include <sstream> namespace proton { @@ -52,12 +53,14 @@ makeDiskLimitMessage(std::ostream &os, void DiskMemUsageFilter::recalcState(const Guard &guard) { - (void) guard; + bool diskBlocked = false; + bool memoryBlocked = false; bool hasMessage = false; std::ostringstream message; double memoryUsed = getMemoryUsedRatio(guard); if (memoryUsed > _config._memoryLimit) { hasMessage = true; + memoryBlocked = true; makeMemoryLimitMessage(message, memoryUsed, _config._memoryLimit, _memoryStats, _physicalMemory); } @@ -67,6 +70,7 @@ DiskMemUsageFilter::recalcState(const Guard &guard) message << ", "; } hasMessage = true; + diskBlocked = true; makeDiskLimitMessage(message, diskUsed, _config._diskLimit, _diskStats); } if (hasMessage) { @@ -76,6 +80,8 @@ DiskMemUsageFilter::recalcState(const Guard &guard) _state = State(); _acceptWrite = true; } + DiskMemUsageState dmstate(diskBlocked, memoryBlocked); + notifyDiskMemUsage(guard, dmstate); } double @@ -102,7 +108,9 @@ DiskMemUsageFilter::DiskMemUsageFilter(uint64_t physicalMemory_in) _diskStats(), _config(), _state(), - _acceptWrite(true) + _acceptWrite(true), + _dmstate(), + _listeners() { } @@ -179,4 +187,40 @@ DiskMemUsageFilter::getAcceptState() const return _state; } + +void +DiskMemUsageFilter::addDiskMemUsageListener(IDiskMemUsageListener *listener) +{ + Guard guard(_lock); + _listeners.push_back(listener); + listener->notifyDiskMemUsage(_dmstate); +} + +void +DiskMemUsageFilter::removeDiskMemUsageListener(IDiskMemUsageListener *listener) +{ + Guard guard(_lock); + for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) { + if (*itr == listener) { + _listeners.erase(itr); + break; + } + } +} + +void +DiskMemUsageFilter::notifyDiskMemUsage(const Guard &guard, + DiskMemUsageState state) +{ + (void) guard; + if (_dmstate == state) { + return; + } + _dmstate = state; + for (const auto &listener : _listeners) { + listener->notifyDiskMemUsage(_dmstate); + } +} + + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h index 1879e7b9385..20cd436cc17 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h @@ -7,6 +7,8 @@ #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <mutex> #include <atomic> +#include "i_disk_mem_usage_notifier.h" +#include "disk_mem_usage_state.h" namespace proton { @@ -15,7 +17,8 @@ namespace proton { * usage. If resource limit is reached then further writes are denied * in order to prevent entering an unrecoverable state. */ -class DiskMemUsageFilter : public IResourceWriteFilter { +class DiskMemUsageFilter : public IResourceWriteFilter, + public IDiskMemUsageNotifier { public: using space_info = boost::filesystem::space_info; using Mutex = std::mutex; @@ -47,10 +50,13 @@ private: Config _config; State _state; std::atomic<bool> _acceptWrite; + DiskMemUsageState _dmstate; + std::vector<IDiskMemUsageListener *> _listeners; void recalcState(const Guard &guard); // called with _lock held double getMemoryUsedRatio(const Guard &guard) const; double getDiskUsedRatio(const Guard &guard) const; + void notifyDiskMemUsage(const Guard &guard, DiskMemUsageState state); public: DiskMemUsageFilter(uint64_t physicalMememory_in); @@ -65,6 +71,8 @@ public: double getDiskUsedRatio() const; virtual bool acceptWriteOperation() const override; virtual State getAcceptState() const override; + virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override; + virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp new file mode 100644 index 00000000000..a6c339076a5 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include "disk_mem_usage_forwarder.h" +#include <vespa/searchlib/common/lambdatask.h> + +using search::makeLambdaTask; + +namespace proton +{ + +DiskMemUsageForwarder::DiskMemUsageForwarder(searchcorespi::index::IThreadService &executor) + : IDiskMemUsageNotifier(), + IDiskMemUsageListener(), + _executor(executor), + _listeners(), + _state() +{ +} + +DiskMemUsageForwarder::~DiskMemUsageForwarder() +{ +} + +void +DiskMemUsageForwarder::addDiskMemUsageListener(IDiskMemUsageListener *listener) +{ + assert(_executor.isCurrentThread()); + _listeners.push_back(listener); + listener->notifyDiskMemUsage(_state); +} + +void +DiskMemUsageForwarder::removeDiskMemUsageListener(IDiskMemUsageListener *listener) +{ + assert(_executor.isCurrentThread()); + for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) { + if (*itr == listener) { + _listeners.erase(itr); + break; + } + } +} + +void +DiskMemUsageForwarder::notifyDiskMemUsage(DiskMemUsageState state) +{ + _executor.execute(makeLambdaTask([=]() { forward(state); })); +} + + +void +DiskMemUsageForwarder::forward(DiskMemUsageState state) +{ + if (_state != state) { + _state = state; + for (const auto &listener : _listeners) { + listener->notifyDiskMemUsage(state); + } + } +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h new file mode 100644 index 00000000000..8c5b28854a1 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcorespi/index/i_thread_service.h> +#include "i_disk_mem_usage_notifier.h" +#include "i_disk_mem_usage_listener.h" +//#include "disk_mem_usage_state.h" + +namespace proton +{ + +/** + * Forwarder for disk/memory usage state changes. + */ +class DiskMemUsageForwarder : public IDiskMemUsageNotifier, + public IDiskMemUsageListener +{ + searchcorespi::index::IThreadService &_executor; + std::vector<IDiskMemUsageListener *> _listeners; + DiskMemUsageState _state; + void forward(DiskMemUsageState state); +public: + DiskMemUsageForwarder(searchcorespi::index::IThreadService &executor); + virtual ~DiskMemUsageForwarder(); + virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override; + virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override; + virtual void notifyDiskMemUsage(DiskMemUsageState state) override; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h index 89dab8c4ff8..0e95a52ba46 100644 --- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h @@ -48,6 +48,7 @@ public: void setConfig(const Config &config); const DiskMemUsageFilter &writeFilter() const { return _filter; } + IDiskMemUsageNotifier ¬ifier() { return _filter; } }; diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h new file mode 100644 index 00000000000..51e977ea380 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h @@ -0,0 +1,37 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace proton { + +class DiskMemUsageState +{ + bool _aboveDiskLimit; + bool _aboveMemoryLimit; + +public: + DiskMemUsageState() + : _aboveDiskLimit(false), + _aboveMemoryLimit(false) + { + } + + DiskMemUsageState(bool aboveDiskLimit_in, bool aboveMemoryLimit_in) + : _aboveDiskLimit(aboveDiskLimit_in), + _aboveMemoryLimit(aboveMemoryLimit_in) + { + } + + bool operator==(const DiskMemUsageState &rhs) const { + return ((_aboveDiskLimit == rhs._aboveDiskLimit) && + (_aboveMemoryLimit == rhs._aboveMemoryLimit)); + } + bool operator!=(const DiskMemUsageState &rhs) const { + return ((_aboveDiskLimit != rhs._aboveDiskLimit) || + (_aboveMemoryLimit != rhs._aboveMemoryLimit)); + } + bool aboveDiskLimit() const { return _aboveDiskLimit; } + bool aboveMemoryLimit() const { return _aboveMemoryLimit; } +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index f2d54d8fbf2..2f105fc8315 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -121,6 +121,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _syncFeedViewEnabled(false), _owner(owner), _state(), + _dmUsageForwarder(_writeService.master()), _writeFilter(), _feedHandler(_writeService, tlsSpec, @@ -1075,6 +1076,7 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config) _clusterStateHandler, // IClusterStateChangedNotifier _bucketHandler, // IBucketStateChangedNotifier _calc, // IBucketStateCalculator::SP + _dmUsageForwarder, _jobTrackers, _visibility, // ICommitable _subDBs.getReadySubDB()->getAttributeManager(), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index cabb4434591..9f592a292fd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -38,6 +38,7 @@ #include <vespa/searchlib/transactionlog/syncproxy.h> #include <vespa/vespalib/util/varholder.h> #include <vespa/searchcore/proton/attribute/attribute_usage_filter.h> +#include "disk_mem_usage_forwarder.h" using vespa::config::search::core::ProtonConfig; @@ -126,6 +127,7 @@ private: bool _syncFeedViewEnabled; IDocumentDBOwner &_owner; DDBState _state; + DiskMemUsageForwarder _dmUsageForwarder; AttributeUsageFilter _writeFilter; FeedHandler _feedHandler; @@ -469,7 +471,7 @@ public: void enterReprocessState(); void enterOnlineState(); void waitForOnlineState(); - + IDiskMemUsageListener *diskMemUsageListener() { return &_dmUsageForwarder; } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h new file mode 100644 index 00000000000..3482d046289 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "disk_mem_usage_state.h" + +namespace proton { + +/** + * Interface used to receive notification when disk/memory usage state + * has changed. + */ +class IDiskMemUsageListener +{ +public: + virtual ~IDiskMemUsageListener() {} + virtual void notifyDiskMemUsage(DiskMemUsageState state) = 0; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h new file mode 100644 index 00000000000..d62027ad30a --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace proton +{ + +class IDiskMemUsageListener; + +/** + * Interface used to request notification when disk/memory usage state + * has changed. + */ +class IDiskMemUsageNotifier +{ +public: + virtual ~IDiskMemUsageNotifier() {} + virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) = 0; + virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) = 0; +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index d56fff31bee..5cb1dd5d170 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -58,7 +58,8 @@ injectBucketMoveJob(MaintenanceController &controller, IClusterStateChangedNotifier &clusterStateChangedNotifier, IBucketStateChangedNotifier &bucketStateChangedNotifier, const std::shared_ptr<IBucketStateCalculator> &calc, - DocumentDBJobTrackers &jobTrackers) + DocumentDBJobTrackers &jobTrackers, + IDiskMemUsageNotifier &diskMemUsageNotifier) { IMaintenanceJob::UP bmj; bmj.reset(new BucketMoveJob(calc, @@ -69,6 +70,7 @@ injectBucketMoveJob(MaintenanceController &controller, fbHandler, clusterStateChangedNotifier, bucketStateChangedNotifier, + diskMemUsageNotifier, docTypeName)); controller.registerJob(std::move(trackJob(jobTrackers.getBucketMove(), std::move(bmj)))); @@ -95,6 +97,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, IBucketStateChangedNotifier & bucketStateChangedNotifier, const std::shared_ptr<IBucketStateCalculator> & calc, + IDiskMemUsageNotifier &diskMemUsageNotifier, DocumentDBJobTrackers &jobTrackers, ICommitable & commit, IAttributeManagerSP readyAttributeManager, @@ -116,7 +119,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, injectLidSpaceCompactionJobs(controller, config, lscHandlers, opStorer, fbHandler, jobTrackers.getLidSpaceCompact()); injectBucketMoveJob(controller, fbHandler, docTypeName, moveHandler, bucketModifiedHandler, - clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers); + clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers, diskMemUsageNotifier); controller.registerJob(std::make_unique<SampleAttributeUsageJob> (readyAttributeManager, notReadyAttributeManager, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h index 423459af162..dfad0947c8c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h @@ -21,6 +21,7 @@ class IBucketStateChangedNotifier; class IBucketStateCalculator; class IAttributeManager; class AttributeUsageFilter; +class IDiskMemUsageNotifier; /** * Class that injects all concrete maintenance jobs used in document db @@ -46,6 +47,7 @@ struct MaintenanceJobsInjector IBucketStateChangedNotifier & bucketStateChangedNotifier, const std::shared_ptr<IBucketStateCalculator> &calc, + IDiskMemUsageNotifier &diskMemUsageNotifier, DocumentDBJobTrackers &jobTrackers, ICommitable & commit, IAttributeManagerSP readyAttributeManager, diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 2750515661f..424029ba8a4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -797,6 +797,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, _matchEngine->putSearchHandler(docTypeName, searchHandler); FlushHandlerProxy::SP flushHandler(new FlushHandlerProxy(ret)); _flushEngine->putFlushHandler(docTypeName, flushHandler); + _diskMemUsageSampler->notifier().addDiskMemUsageListener(ret->diskMemUsageListener()); return ret; } @@ -834,6 +835,7 @@ Proton::removeDocumentDB(const DocTypeName &docTypeName) _flushEngine->removeFlushHandler(docTypeName); _metricsEngine->removeMetricsHook(old->getMetricsUpdateHook()); _metricsEngine->removeDocumentDBMetrics(old->getMetricsCollection()); + _diskMemUsageSampler->notifier().removeDiskMemUsageListener(old->diskMemUsageListener()); // Caller should have removed & drained relevant timer tasks old->close(); } diff --git a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h new file mode 100644 index 00000000000..4e3e455fa4d --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h> +#include <vespa/searchcore/proton/server/i_disk_mem_usage_listener.h> + +namespace proton +{ + +namespace test +{ + +/** + * Test notifier for disk/mem usage. + */ +class DiskMemUsageNotifier : public IDiskMemUsageNotifier +{ + std::vector<IDiskMemUsageListener *> _listeners; + DiskMemUsageState _state; +public: + DiskMemUsageNotifier() : IDiskMemUsageNotifier(), _listeners(), _state() { } + virtual ~DiskMemUsageNotifier() { } + virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override { + _listeners.push_back(listener); + listener->notifyDiskMemUsage(_state); + } + virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override { + for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) { + if (*itr == listener) { + _listeners.erase(itr); + break; + } + } + } + void notify(DiskMemUsageState state) { + if (_state != state) { + _state = state; + for (const auto &listener : _listeners) { + listener->notifyDiskMemUsage(state); + } + } + } +}; + +} // namespace proton::test +} // namespace proton |