summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2016-06-23 17:41:52 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2016-06-27 09:30:48 +0000
commit272d6b7e0d7fa16f49c43c63cf514026528d5a07 (patch)
treeca3ab5d8f004b0c1cb022febda5d370e551422eb /searchcore
parenta7a363120c4f5ab96937a5f7a176ae5c62ede7bd (diff)
Stop bucket move maintenance job when disk is full.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp25
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h11
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp48
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp63
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h37
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h20
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h47
19 files changed, 353 insertions, 12 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 9062067132c..ad9ffdfbe3e 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -13,6 +13,7 @@ LOG_SETUP("documentbucketmover_test");
#include <vespa/searchcore/proton/server/idocumentmovehandler.h>
#include <vespa/searchcore/proton/test/clusterstatehandler.h>
#include <vespa/searchcore/proton/test/buckethandler.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
#include <vespa/searchcore/proton/server/maintenancedocumentsubdb.h>
#include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h>
@@ -536,6 +537,7 @@ struct ControllerFixtureBase
MySubDb _ready;
MySubDb _notReady;
MyFrozenBucketHandler _fbh;
+ test::DiskMemUsageNotifier _diskMemUsageNotifier;
BucketMoveJob _bmj;
ControllerFixtureBase()
: _builder(),
@@ -547,8 +549,10 @@ struct ControllerFixtureBase
_ready(_builder.getRepo(), _bucketDB, 1, SubDbType::READY),
_notReady(_builder.getRepo(), _bucketDB, 2, SubDbType::NOTREADY),
_fbh(),
+ _diskMemUsageNotifier(),
_bmj(_calc, _moveHandler, _modifiedHandler, _ready._subDb,
_notReady._subDb, _fbh, _clusterStateHandler, _bucketHandler,
+ _diskMemUsageNotifier,
"test")
{
}
@@ -1175,6 +1179,27 @@ TEST_F("require that thawed bucket is not moved if active as well", ControllerFi
}
+TEST_F("require that bucket move stops when disk limit is reached", ControllerFixture)
+{
+ // Bucket 1 shold be moved
+ f.addReady(f._ready.bucket(2));
+ // Note: This depends on f._bmj.run() moving max 1 documents
+ EXPECT_TRUE(!f._bmj.run());
+ EXPECT_EQUAL(1u, f.docsMoved().size());
+ EXPECT_EQUAL(0u, f.bucketsModified().size());
+ // Notify that we've over disk limit
+ f._diskMemUsageNotifier.notify(DiskMemUsageState(true, false));
+ EXPECT_TRUE(f._bmj.run());
+ EXPECT_EQUAL(1u, f.docsMoved().size());
+ EXPECT_EQUAL(0u, f.bucketsModified().size());
+ // Notify that we've under disk limit
+ f._diskMemUsageNotifier.notify(DiskMemUsageState(false, false));
+ EXPECT_TRUE(!f._bmj.run());
+ EXPECT_EQUAL(2u, f.docsMoved().size());
+ EXPECT_EQUAL(0u, f.bucketsModified().size());
+}
+
+
TEST_MAIN()
{
TEST_RUN_ALL();
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 0513d8f45d9..711e1391c05 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -26,6 +26,7 @@ LOG_SETUP("maintenancecontroller_test");
#include <vespa/searchcore/proton/feedoperation/moveoperation.h>
#include <vespa/searchcore/proton/test/clusterstatehandler.h>
#include <vespa/searchcore/proton/test/buckethandler.h>
+#include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
using namespace proton;
@@ -479,6 +480,7 @@ public:
std::shared_ptr<proton::IAttributeManager> _readyAttributeManager;
std::shared_ptr<proton::IAttributeManager> _notReadyAttributeManager;
AttributeUsageFilter _attributeUsageFilter;
+ test::DiskMemUsageNotifier _diskMemUsageNotifier;
MaintenanceController _mc;
MaintenanceControllerFixture(void);
@@ -1017,7 +1019,9 @@ MaintenanceControllerFixture::injectMaintenanceJobs()
MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _fh, _gsp, _fh,
lscHandlers, _fh, _mc, _docTypeName.getName(),
_fh, _fh, _bmc, _clusterStateHandler, _bucketHandler,
- _calc, _jobTrackers, *this,
+ _calc,
+ _diskMemUsageNotifier,
+ _jobTrackers, *this,
_readyAttributeManager,
_notReadyAttributeManager,
_attributeUsageFilter);
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index e0f885a8c6f..cc7ce7e8138 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_library(searchcore_server STATIC
ddbstate.cpp
disk_mem_usage_filter.cpp
disk_mem_usage_sampler.cpp
+ disk_mem_usage_forwarder.cpp
docstorevalidator.cpp
document_db_explorer.cpp
document_db_maintenance_config.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index b000e8b3543..46f8bc21a7e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -8,6 +8,7 @@ LOG_SETUP(".proton.server.bucketmovejob");
#include "ibucketstatechangednotifier.h"
#include "iclusterstatechangednotifier.h"
#include "maintenancedocumentsubdb.h"
+#include "i_disk_mem_usage_notifier.h"
#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h>
using document::BucketId;
@@ -143,6 +144,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
IFrozenBucketHandler &frozenBuckets,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
const vespalib::string &docTypeName)
: IMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0),
IClusterStateChangedHandler(),
@@ -165,15 +167,18 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_clusterUp(false),
_nodeUp(false),
_nodeInitializing(false),
+ _resourcesOK(false),
_runnable(false),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
- _bucketStateChangedNotifier(bucketStateChangedNotifier)
+ _bucketStateChangedNotifier(bucketStateChangedNotifier),
+ _diskMemUsageNotifier(diskMemUsageNotifier)
{
refreshDerivedClusterState();
_frozenBuckets.addListener(this);
_clusterStateChangedNotifier.addClusterStateChangedHandler(this);
_bucketStateChangedNotifier.addBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.addDiskMemUsageListener(this);
}
@@ -182,6 +187,7 @@ BucketMoveJob::~BucketMoveJob()
_frozenBuckets.removeListener(this);
_clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
_bucketStateChangedNotifier.removeBucketStateChangedHandler(this);
+ _diskMemUsageNotifier.removeDiskMemUsageListener(this);
}
@@ -321,6 +327,11 @@ BucketMoveJob::run()
return done();
}
+void
+BucketMoveJob::refreshRunnable()
+{
+ _runnable = _clusterUp && _nodeUp && !_nodeInitializing && _resourcesOK;
+}
void
BucketMoveJob::refreshDerivedClusterState()
@@ -328,7 +339,7 @@ BucketMoveJob::refreshDerivedClusterState()
_clusterUp = _calc.get() != NULL && _calc->clusterUp();
_nodeUp = _calc.get() != NULL && _calc->nodeUp();
_nodeInitializing = _calc.get() != NULL && _calc->nodeInitializing();
- _runnable = _clusterUp && _nodeUp && !_nodeInitializing;
+ refreshRunnable();
}
void
@@ -359,6 +370,15 @@ BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId,
}
}
-
+void BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ // Called by master write thread
+ bool resourcesOK = !state.aboveDiskLimit();
+ _resourcesOK = resourcesOK;
+ refreshRunnable();
+ if (_runner && _runnable) {
+ _runner->run();
+ }
+}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
index d0adc4cefb0..086bbd33c48 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
@@ -10,6 +10,7 @@
#include "iclusterstatechangedhandler.h"
#include "ibucketfreezelistener.h"
#include "ibucketstatechangedhandler.h"
+#include "i_disk_mem_usage_listener.h"
#include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h>
namespace proton
@@ -18,6 +19,7 @@ namespace proton
class IBucketStateChangedNotifier;
class IClusterStateChangedNotifier;
+class IDiskMemUsageNotifier;
/**
* Class used to control the moving of buckets between the ready and
@@ -26,8 +28,8 @@ class IClusterStateChangedNotifier;
class BucketMoveJob : public IMaintenanceJob,
public IClusterStateChangedHandler,
public IBucketFreezeListener,
- public IBucketStateChangedHandler
-
+ public IBucketStateChangedHandler,
+ public IDiskMemUsageListener
{
public:
struct ScanPosition
@@ -98,9 +100,11 @@ private:
bool _clusterUp;
bool _nodeUp;
bool _nodeInitializing;
+ bool _resourcesOK;
bool _runnable; // can try to perform work
IClusterStateChangedNotifier &_clusterStateChangedNotifier;
IBucketStateChangedNotifier &_bucketStateChangedNotifier;
+ IDiskMemUsageNotifier &_diskMemUsageNotifier;
ScanResult
scanBuckets(size_t maxBucketsToScan,
@@ -120,6 +124,7 @@ private:
DocumentBucketMover &mover,
IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard);
+ void refreshRunnable();
void refreshDerivedClusterState();
/**
@@ -143,6 +148,7 @@ public:
IFrozenBucketHandler &frozenBuckets,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
const vespalib::string &docTypeName);
virtual ~BucketMoveJob();
@@ -180,6 +186,7 @@ public:
void notifyBucketStateChanged(const document::BucketId &bucketId,
storage::spi::BucketInfo::ActiveState newState) override;
+ virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp
index ddcd038077e..6baa99cacdf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.cpp
@@ -2,6 +2,7 @@
#include <vespa/fastos/fastos.h>
#include "disk_mem_usage_filter.h"
+#include "i_disk_mem_usage_listener.h"
#include <sstream>
namespace proton {
@@ -52,12 +53,14 @@ makeDiskLimitMessage(std::ostream &os,
void
DiskMemUsageFilter::recalcState(const Guard &guard)
{
- (void) guard;
+ bool diskBlocked = false;
+ bool memoryBlocked = false;
bool hasMessage = false;
std::ostringstream message;
double memoryUsed = getMemoryUsedRatio(guard);
if (memoryUsed > _config._memoryLimit) {
hasMessage = true;
+ memoryBlocked = true;
makeMemoryLimitMessage(message, memoryUsed,
_config._memoryLimit, _memoryStats, _physicalMemory);
}
@@ -67,6 +70,7 @@ DiskMemUsageFilter::recalcState(const Guard &guard)
message << ", ";
}
hasMessage = true;
+ diskBlocked = true;
makeDiskLimitMessage(message, diskUsed, _config._diskLimit, _diskStats);
}
if (hasMessage) {
@@ -76,6 +80,8 @@ DiskMemUsageFilter::recalcState(const Guard &guard)
_state = State();
_acceptWrite = true;
}
+ DiskMemUsageState dmstate(diskBlocked, memoryBlocked);
+ notifyDiskMemUsage(guard, dmstate);
}
double
@@ -102,7 +108,9 @@ DiskMemUsageFilter::DiskMemUsageFilter(uint64_t physicalMemory_in)
_diskStats(),
_config(),
_state(),
- _acceptWrite(true)
+ _acceptWrite(true),
+ _dmstate(),
+ _listeners()
{
}
@@ -179,4 +187,40 @@ DiskMemUsageFilter::getAcceptState() const
return _state;
}
+
+void
+DiskMemUsageFilter::addDiskMemUsageListener(IDiskMemUsageListener *listener)
+{
+ Guard guard(_lock);
+ _listeners.push_back(listener);
+ listener->notifyDiskMemUsage(_dmstate);
+}
+
+void
+DiskMemUsageFilter::removeDiskMemUsageListener(IDiskMemUsageListener *listener)
+{
+ Guard guard(_lock);
+ for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) {
+ if (*itr == listener) {
+ _listeners.erase(itr);
+ break;
+ }
+ }
+}
+
+void
+DiskMemUsageFilter::notifyDiskMemUsage(const Guard &guard,
+ DiskMemUsageState state)
+{
+ (void) guard;
+ if (_dmstate == state) {
+ return;
+ }
+ _dmstate = state;
+ for (const auto &listener : _listeners) {
+ listener->notifyDiskMemUsage(_dmstate);
+ }
+}
+
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h
index 1879e7b9385..20cd436cc17 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_filter.h
@@ -7,6 +7,8 @@
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <mutex>
#include <atomic>
+#include "i_disk_mem_usage_notifier.h"
+#include "disk_mem_usage_state.h"
namespace proton {
@@ -15,7 +17,8 @@ namespace proton {
* usage. If resource limit is reached then further writes are denied
* in order to prevent entering an unrecoverable state.
*/
-class DiskMemUsageFilter : public IResourceWriteFilter {
+class DiskMemUsageFilter : public IResourceWriteFilter,
+ public IDiskMemUsageNotifier {
public:
using space_info = boost::filesystem::space_info;
using Mutex = std::mutex;
@@ -47,10 +50,13 @@ private:
Config _config;
State _state;
std::atomic<bool> _acceptWrite;
+ DiskMemUsageState _dmstate;
+ std::vector<IDiskMemUsageListener *> _listeners;
void recalcState(const Guard &guard); // called with _lock held
double getMemoryUsedRatio(const Guard &guard) const;
double getDiskUsedRatio(const Guard &guard) const;
+ void notifyDiskMemUsage(const Guard &guard, DiskMemUsageState state);
public:
DiskMemUsageFilter(uint64_t physicalMememory_in);
@@ -65,6 +71,8 @@ public:
double getDiskUsedRatio() const;
virtual bool acceptWriteOperation() const override;
virtual State getAcceptState() const override;
+ virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override;
+ virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp
new file mode 100644
index 00000000000..a6c339076a5
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.cpp
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include "disk_mem_usage_forwarder.h"
+#include <vespa/searchlib/common/lambdatask.h>
+
+using search::makeLambdaTask;
+
+namespace proton
+{
+
+DiskMemUsageForwarder::DiskMemUsageForwarder(searchcorespi::index::IThreadService &executor)
+ : IDiskMemUsageNotifier(),
+ IDiskMemUsageListener(),
+ _executor(executor),
+ _listeners(),
+ _state()
+{
+}
+
+DiskMemUsageForwarder::~DiskMemUsageForwarder()
+{
+}
+
+void
+DiskMemUsageForwarder::addDiskMemUsageListener(IDiskMemUsageListener *listener)
+{
+ assert(_executor.isCurrentThread());
+ _listeners.push_back(listener);
+ listener->notifyDiskMemUsage(_state);
+}
+
+void
+DiskMemUsageForwarder::removeDiskMemUsageListener(IDiskMemUsageListener *listener)
+{
+ assert(_executor.isCurrentThread());
+ for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) {
+ if (*itr == listener) {
+ _listeners.erase(itr);
+ break;
+ }
+ }
+}
+
+void
+DiskMemUsageForwarder::notifyDiskMemUsage(DiskMemUsageState state)
+{
+ _executor.execute(makeLambdaTask([=]() { forward(state); }));
+}
+
+
+void
+DiskMemUsageForwarder::forward(DiskMemUsageState state)
+{
+ if (_state != state) {
+ _state = state;
+ for (const auto &listener : _listeners) {
+ listener->notifyDiskMemUsage(state);
+ }
+ }
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h
new file mode 100644
index 00000000000..8c5b28854a1
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_forwarder.h
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcorespi/index/i_thread_service.h>
+#include "i_disk_mem_usage_notifier.h"
+#include "i_disk_mem_usage_listener.h"
+//#include "disk_mem_usage_state.h"
+
+namespace proton
+{
+
+/**
+ * Forwarder for disk/memory usage state changes.
+ */
+class DiskMemUsageForwarder : public IDiskMemUsageNotifier,
+ public IDiskMemUsageListener
+{
+ searchcorespi::index::IThreadService &_executor;
+ std::vector<IDiskMemUsageListener *> _listeners;
+ DiskMemUsageState _state;
+ void forward(DiskMemUsageState state);
+public:
+ DiskMemUsageForwarder(searchcorespi::index::IThreadService &executor);
+ virtual ~DiskMemUsageForwarder();
+ virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override;
+ virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override;
+ virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
index 89dab8c4ff8..0e95a52ba46 100644
--- a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_sampler.h
@@ -48,6 +48,7 @@ public:
void setConfig(const Config &config);
const DiskMemUsageFilter &writeFilter() const { return _filter; }
+ IDiskMemUsageNotifier &notifier() { return _filter; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h
new file mode 100644
index 00000000000..51e977ea380
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/disk_mem_usage_state.h
@@ -0,0 +1,37 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace proton {
+
+class DiskMemUsageState
+{
+ bool _aboveDiskLimit;
+ bool _aboveMemoryLimit;
+
+public:
+ DiskMemUsageState()
+ : _aboveDiskLimit(false),
+ _aboveMemoryLimit(false)
+ {
+ }
+
+ DiskMemUsageState(bool aboveDiskLimit_in, bool aboveMemoryLimit_in)
+ : _aboveDiskLimit(aboveDiskLimit_in),
+ _aboveMemoryLimit(aboveMemoryLimit_in)
+ {
+ }
+
+ bool operator==(const DiskMemUsageState &rhs) const {
+ return ((_aboveDiskLimit == rhs._aboveDiskLimit) &&
+ (_aboveMemoryLimit == rhs._aboveMemoryLimit));
+ }
+ bool operator!=(const DiskMemUsageState &rhs) const {
+ return ((_aboveDiskLimit != rhs._aboveDiskLimit) ||
+ (_aboveMemoryLimit != rhs._aboveMemoryLimit));
+ }
+ bool aboveDiskLimit() const { return _aboveDiskLimit; }
+ bool aboveMemoryLimit() const { return _aboveMemoryLimit; }
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index f2d54d8fbf2..2f105fc8315 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -121,6 +121,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_syncFeedViewEnabled(false),
_owner(owner),
_state(),
+ _dmUsageForwarder(_writeService.master()),
_writeFilter(),
_feedHandler(_writeService,
tlsSpec,
@@ -1075,6 +1076,7 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config)
_clusterStateHandler, // IClusterStateChangedNotifier
_bucketHandler, // IBucketStateChangedNotifier
_calc, // IBucketStateCalculator::SP
+ _dmUsageForwarder,
_jobTrackers,
_visibility, // ICommitable
_subDBs.getReadySubDB()->getAttributeManager(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index cabb4434591..9f592a292fd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -38,6 +38,7 @@
#include <vespa/searchlib/transactionlog/syncproxy.h>
#include <vespa/vespalib/util/varholder.h>
#include <vespa/searchcore/proton/attribute/attribute_usage_filter.h>
+#include "disk_mem_usage_forwarder.h"
using vespa::config::search::core::ProtonConfig;
@@ -126,6 +127,7 @@ private:
bool _syncFeedViewEnabled;
IDocumentDBOwner &_owner;
DDBState _state;
+ DiskMemUsageForwarder _dmUsageForwarder;
AttributeUsageFilter _writeFilter;
FeedHandler _feedHandler;
@@ -469,7 +471,7 @@ public:
void enterReprocessState();
void enterOnlineState();
void waitForOnlineState();
-
+ IDiskMemUsageListener *diskMemUsageListener() { return &_dmUsageForwarder; }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h
new file mode 100644
index 00000000000..3482d046289
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_listener.h
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "disk_mem_usage_state.h"
+
+namespace proton {
+
+/**
+ * Interface used to receive notification when disk/memory usage state
+ * has changed.
+ */
+class IDiskMemUsageListener
+{
+public:
+ virtual ~IDiskMemUsageListener() {}
+ virtual void notifyDiskMemUsage(DiskMemUsageState state) = 0;
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h
new file mode 100644
index 00000000000..d62027ad30a
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace proton
+{
+
+class IDiskMemUsageListener;
+
+/**
+ * Interface used to request notification when disk/memory usage state
+ * has changed.
+ */
+class IDiskMemUsageNotifier
+{
+public:
+ virtual ~IDiskMemUsageNotifier() {}
+ virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) = 0;
+ virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) = 0;
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index d56fff31bee..5cb1dd5d170 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -58,7 +58,8 @@ injectBucketMoveJob(MaintenanceController &controller,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
IBucketStateChangedNotifier &bucketStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
- DocumentDBJobTrackers &jobTrackers)
+ DocumentDBJobTrackers &jobTrackers,
+ IDiskMemUsageNotifier &diskMemUsageNotifier)
{
IMaintenanceJob::UP bmj;
bmj.reset(new BucketMoveJob(calc,
@@ -69,6 +70,7 @@ injectBucketMoveJob(MaintenanceController &controller,
fbHandler,
clusterStateChangedNotifier,
bucketStateChangedNotifier,
+ diskMemUsageNotifier,
docTypeName));
controller.registerJob(std::move(trackJob(jobTrackers.getBucketMove(),
std::move(bmj))));
@@ -95,6 +97,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
IBucketStateChangedNotifier &
bucketStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> & calc,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
DocumentDBJobTrackers &jobTrackers,
ICommitable & commit,
IAttributeManagerSP readyAttributeManager,
@@ -116,7 +119,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller,
injectLidSpaceCompactionJobs(controller, config, lscHandlers, opStorer,
fbHandler, jobTrackers.getLidSpaceCompact());
injectBucketMoveJob(controller, fbHandler, docTypeName, moveHandler, bucketModifiedHandler,
- clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers);
+ clusterStateChangedNotifier, bucketStateChangedNotifier, calc, jobTrackers, diskMemUsageNotifier);
controller.registerJob(std::make_unique<SampleAttributeUsageJob>
(readyAttributeManager,
notReadyAttributeManager,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
index 423459af162..dfad0947c8c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h
@@ -21,6 +21,7 @@ class IBucketStateChangedNotifier;
class IBucketStateCalculator;
class IAttributeManager;
class AttributeUsageFilter;
+class IDiskMemUsageNotifier;
/**
* Class that injects all concrete maintenance jobs used in document db
@@ -46,6 +47,7 @@ struct MaintenanceJobsInjector
IBucketStateChangedNotifier &
bucketStateChangedNotifier,
const std::shared_ptr<IBucketStateCalculator> &calc,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
DocumentDBJobTrackers &jobTrackers,
ICommitable & commit,
IAttributeManagerSP readyAttributeManager,
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 2750515661f..424029ba8a4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -797,6 +797,7 @@ Proton::addDocumentDB(const document::DocumentType &docType,
_matchEngine->putSearchHandler(docTypeName, searchHandler);
FlushHandlerProxy::SP flushHandler(new FlushHandlerProxy(ret));
_flushEngine->putFlushHandler(docTypeName, flushHandler);
+ _diskMemUsageSampler->notifier().addDiskMemUsageListener(ret->diskMemUsageListener());
return ret;
}
@@ -834,6 +835,7 @@ Proton::removeDocumentDB(const DocTypeName &docTypeName)
_flushEngine->removeFlushHandler(docTypeName);
_metricsEngine->removeMetricsHook(old->getMetricsUpdateHook());
_metricsEngine->removeDocumentDBMetrics(old->getMetricsCollection());
+ _diskMemUsageSampler->notifier().removeDiskMemUsageListener(old->diskMemUsageListener());
// Caller should have removed & drained relevant timer tasks
old->close();
}
diff --git a/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h
new file mode 100644
index 00000000000..4e3e455fa4d
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/test/disk_mem_usage_notifier.h
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcore/proton/server/i_disk_mem_usage_notifier.h>
+#include <vespa/searchcore/proton/server/i_disk_mem_usage_listener.h>
+
+namespace proton
+{
+
+namespace test
+{
+
+/**
+ * Test notifier for disk/mem usage.
+ */
+class DiskMemUsageNotifier : public IDiskMemUsageNotifier
+{
+ std::vector<IDiskMemUsageListener *> _listeners;
+ DiskMemUsageState _state;
+public:
+ DiskMemUsageNotifier() : IDiskMemUsageNotifier(), _listeners(), _state() { }
+ virtual ~DiskMemUsageNotifier() { }
+ virtual void addDiskMemUsageListener(IDiskMemUsageListener *listener) override {
+ _listeners.push_back(listener);
+ listener->notifyDiskMemUsage(_state);
+ }
+ virtual void removeDiskMemUsageListener(IDiskMemUsageListener *listener) override {
+ for (auto itr = _listeners.begin(); itr != _listeners.end(); ++itr) {
+ if (*itr == listener) {
+ _listeners.erase(itr);
+ break;
+ }
+ }
+ }
+ void notify(DiskMemUsageState state) {
+ if (_state != state) {
+ _state = state;
+ for (const auto &listener : _listeners) {
+ listener->notifyDiskMemUsage(state);
+ }
+ }
+ }
+};
+
+} // namespace proton::test
+} // namespace proton