summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2017-06-28 16:03:53 +0200
committerGitHub <noreply@github.com>2017-06-28 16:03:53 +0200
commit2242acb1197103febb9524d4eb3e9f66e8f7458f (patch)
treefd655639b0cdbd132ffccb547065f4d585bcf089 /searchcore
parent6a1a7705ca63ef26e87f66b9c6da41e68477a564 (diff)
parent7e9fd7da532597e719ceaeecb3a8c3e5edd808d9 (diff)
Merge pull request #2898 from yahoo/geirst/consolidate-handling-of-blockable-maintenance-jobs
Geirst/consolidate handling of blockable maintenance jobs
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp29
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp97
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h56
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp95
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h38
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h42
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp56
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h5
18 files changed, 316 insertions, 194 deletions
diff --git a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
index 63de9f156ff..0a4b83350ba 100644
--- a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
@@ -2,18 +2,20 @@
#include <vespa/log/log.h>
LOG_SETUP("job_tracked_maintenance_test");
+#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h>
#include <vespa/searchcore/proton/server/job_tracked_maintenance_job.h>
#include <vespa/searchcore/proton/test/simple_job_tracker.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/closuretask.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
using namespace proton;
using namespace vespalib;
using test::SimpleJobTracker;
-typedef std::unique_ptr<Gate> GateUP;
-typedef std::vector<GateUP> GateVector;
+using GateUP = std::unique_ptr<Gate>;
+using GateVector = std::vector<GateUP>;
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
GateVector
getGateVector(size_t size)
@@ -25,18 +27,22 @@ getGateVector(size_t size)
return retval;
}
-struct MyMaintenanceJob : public IMaintenanceJob
+struct MyMaintenanceJob : public IBlockableMaintenanceJob
{
GateVector _runGates;
size_t _runIdx;
+ bool _blocked;
MyMaintenanceJob(size_t numRuns)
- : IMaintenanceJob("myjob", 10, 20),
+ : IBlockableMaintenanceJob("myjob", 10, 20),
_runGates(getGateVector(numRuns)),
- _runIdx(0)
+ _runIdx(0),
+ _blocked(false)
{}
- void block() {
- setBlocked(true);
- }
+ void block() { setBlocked(BlockedReason::RESOURCE_LIMITS); }
+ void unBlock() { unBlock(BlockedReason::RESOURCE_LIMITS); }
+ virtual void setBlocked(BlockedReason) override { _blocked = true; }
+ virtual void unBlock(BlockedReason) override { _blocked = false; }
+ virtual bool isBlocked() const override { return _blocked; }
virtual bool run() override {
_runGates[_runIdx++]->await(5000);
return _runIdx == _runGates.size();
@@ -119,13 +125,14 @@ TEST_F("require that maintenance job that is destroyed is tracked", Fixture(2))
f.assertTracker(0, 0);
}
-TEST_F("require that block calls are sent to underlying job", Fixture)
+TEST_F("require that block calls are sent to underlying jobs", Fixture)
{
EXPECT_FALSE(f._trackedJob->isBlocked());
+ EXPECT_TRUE(f._trackedJob->asBlockable() != nullptr);
f._myJob->block();
EXPECT_TRUE(f._myJob->isBlocked());
EXPECT_TRUE(f._trackedJob->isBlocked());
- f._trackedJob->unBlock();
+ f._myJob->unBlock();
EXPECT_FALSE(f._myJob->isBlocked());
EXPECT_FALSE(f._trackedJob->isBlocked());
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
index f2f0029b924..494ae406484 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
@@ -20,6 +20,7 @@ using namespace search;
using namespace search::index;
using namespace vespalib;
using storage::spi::Timestamp;
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
const uint32_t SUBDB_ID = 2;
const double JOB_DELAY = 1.0;
@@ -393,7 +394,7 @@ TEST_F("require that job is blocked if trying to move document for frozen bucket
EXPECT_TRUE(f._job.isBlocked());
f._frozenHandler._bucket = BUCKET_ID_2;
- f._job.unBlock();
+ f._job.unBlock(BlockedReason::FROZEN_BUCKET);
EXPECT_FALSE(f.run()); // unblocked
TEST_DO(f.assertJobContext(2, 9, 1, 0, 0));
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 4b0c8d4b1b5..222e9e1a6fb 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -8,6 +8,7 @@
#include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h>
#include <vespa/searchcore/proton/feedoperation/putoperation.h>
#include <vespa/searchcore/proton/feedoperation/removeoperation.h>
+#include <vespa/searchcore/proton/server/blockable_maintenance_job.h>
#include <vespa/searchcore/proton/server/executor_thread_service.h>
#include <vespa/searchcore/proton/server/i_operation_storer.h>
#include <vespa/searchcore/proton/server/ibucketmodifiedhandler.h>
@@ -32,6 +33,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/log/log.h>
+
LOG_SETUP("maintenancecontroller_test");
using namespace proton;
@@ -52,6 +54,8 @@ using vespalib::makeClosure;
using vespalib::Slime;
using proton::matching::ISessionCachePruner;
+using BlockedReason = IBlockableMaintenanceJob::BlockedReason;
+
typedef BucketId::List BucketIdVector;
typedef std::set<BucketId> BucketIdSet;
@@ -277,7 +281,7 @@ public:
}
};
-struct MySimpleJob : public IMaintenanceJob
+struct MySimpleJob : public BlockableMaintenanceJob
{
vespalib::CountDownLatch _latch;
size_t _runCnt;
@@ -285,12 +289,12 @@ struct MySimpleJob : public IMaintenanceJob
MySimpleJob(double delay,
double interval,
uint32_t finishCount)
- : IMaintenanceJob("my_job", delay, interval),
+ : BlockableMaintenanceJob("my_job", delay, interval),
_latch(finishCount),
_runCnt(0)
{
}
- void block() { setBlocked(true); }
+ void block() { setBlocked(BlockedReason::FROZEN_BUCKET); }
virtual bool run() override {
LOG(info, "MySimpleJob::run()");
_latch.countDown();
@@ -315,17 +319,17 @@ struct MySplitJob : public MySimpleJob
}
};
-struct MyLongRunningJob : public IMaintenanceJob
+struct MyLongRunningJob : public BlockableMaintenanceJob
{
vespalib::Gate _firstRun;
MyLongRunningJob(double delay,
double interval)
- : IMaintenanceJob("long_running_job", delay, interval),
+ : BlockableMaintenanceJob("long_running_job", delay, interval),
_firstRun()
{
}
- void block() { setBlocked(true); }
+ void block() { setBlocked(BlockedReason::FROZEN_BUCKET); }
virtual bool run() override {
_firstRun.countDown();
usleep(10000);
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index c98bc6be8f4..7208a3695e7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(searchcore_server STATIC
SOURCES
+ blockable_maintenance_job.cpp
bootstrapconfig.cpp
bootstrapconfigmanager.cpp
buckethandler.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp
new file mode 100644
index 00000000000..b7dfd8196c8
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.cpp
@@ -0,0 +1,97 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "blockable_maintenance_job.h"
+#include "disk_mem_usage_state.h"
+#include "imaintenancejobrunner.h"
+
+namespace proton {
+
+void
+BlockableMaintenanceJob::updateBlocked(const LockGuard &)
+{
+ _blocked = !_blockReasons.empty();
+}
+
+void
+BlockableMaintenanceJob::internalNotifyDiskMemUsage(const DiskMemUsageState &state)
+{
+ bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor);
+ if (resourcesOK) {
+ if (isBlocked(BlockedReason::RESOURCE_LIMITS)) {
+ unBlock(BlockedReason::RESOURCE_LIMITS);
+ }
+ } else {
+ setBlocked(BlockedReason::RESOURCE_LIMITS);
+ }
+}
+
+BlockableMaintenanceJob::BlockableMaintenanceJob(const vespalib::string &name,
+ double delay,
+ double interval)
+ : BlockableMaintenanceJob(name, delay, interval, 1.0)
+{
+}
+
+BlockableMaintenanceJob::BlockableMaintenanceJob(const vespalib::string &name,
+ double delay,
+ double interval,
+ double resourceLimitFactor)
+ : IBlockableMaintenanceJob(name, delay, interval),
+ _mutex(),
+ _blockReasons(),
+ _blocked(false),
+ _runner(nullptr),
+ _resourceLimitFactor(resourceLimitFactor)
+{
+}
+
+BlockableMaintenanceJob::~BlockableMaintenanceJob()
+{
+}
+
+bool
+BlockableMaintenanceJob::isBlocked(BlockedReason reason)
+{
+ LockGuard guard(_mutex);
+ return (_blockReasons.find(reason) != _blockReasons.end());
+}
+
+void
+BlockableMaintenanceJob::considerRun()
+{
+ if (_runner && !isBlocked()) {
+ _runner->run();
+ }
+}
+
+void
+BlockableMaintenanceJob::setBlocked(BlockedReason reason)
+{
+ LockGuard guard(_mutex);
+ _blockReasons.insert(reason);
+ updateBlocked(guard);
+}
+
+void
+BlockableMaintenanceJob::unBlock(BlockedReason reason)
+{
+ bool considerRun = false;
+ {
+ LockGuard guard(_mutex);
+ _blockReasons.erase(reason);
+ updateBlocked(guard);
+ considerRun = !_blocked;
+ }
+ if (_runner && considerRun) {
+ _runner->run();
+ }
+}
+
+bool
+BlockableMaintenanceJob::isBlocked() const
+{
+ LockGuard guard(_mutex);
+ return _blocked;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h
new file mode 100644
index 00000000000..a2f84ebe8fc
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h
@@ -0,0 +1,56 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "i_blockable_maintenance_job.h"
+#include <mutex>
+#include <unordered_set>
+
+namespace proton {
+
+class DiskMemUsageState;
+class IMaintenanceJobRunner;
+
+/**
+ * Implementation of a maintenance job that can be blocked and unblocked due to various external reasons.
+ * A blocked job is not executed by the IMaintenanceJobRunner wrapping the job.
+ * When unblocked for a , the job is scheduled for execution again if it is totally unblocked.
+ */
+class BlockableMaintenanceJob : public IBlockableMaintenanceJob {
+private:
+ using LockGuard = std::lock_guard<std::mutex>;
+ using ReasonSet = std::unordered_set<BlockedReason>;
+
+ mutable std::mutex _mutex;
+ ReasonSet _blockReasons;
+ bool _blocked;
+ IMaintenanceJobRunner *_runner;
+ double _resourceLimitFactor;
+
+ void updateBlocked(const LockGuard &guard);
+
+protected:
+ void internalNotifyDiskMemUsage(const DiskMemUsageState &state);
+
+public:
+ BlockableMaintenanceJob(const vespalib::string &name,
+ double delay,
+ double interval);
+
+ BlockableMaintenanceJob(const vespalib::string &name,
+ double delay,
+ double interval,
+ double resourceLimitFactor);
+
+ virtual ~BlockableMaintenanceJob();
+
+ bool isBlocked(BlockedReason reason);
+ void considerRun();
+
+ virtual void setBlocked(BlockedReason reason) override;
+ virtual void unBlock(BlockedReason reason) override;
+ virtual bool isBlocked() const override;
+ virtual void registerRunner(IMaintenanceJobRunner *runner) override { _runner = runner; }
+
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index cb2932e3066..4972c33fdd5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -25,7 +25,6 @@ const char * bool2str(bool v) { return (v ? "T" : "F"); }
}
-
BucketMoveJob::ScanIterator::
ScanIterator(BucketDBOwner::Guard db, uint32_t pass, BucketId lastBucket, BucketId endBucket)
: _db(std::move(db)),
@@ -35,7 +34,6 @@ ScanIterator(BucketDBOwner::Guard db, uint32_t pass, BucketId lastBucket, Bucket
{
}
-
BucketMoveJob::ScanIterator::
ScanIterator(BucketDBOwner::Guard db, BucketId bucket)
: _db(std::move(db)),
@@ -44,7 +42,6 @@ ScanIterator(BucketDBOwner::Guard db, BucketId bucket)
{
}
-
BucketMoveJob::ScanIterator::ScanIterator(ScanIterator &&rhs)
: _db(std::move(rhs._db)),
_itr(rhs._itr),
@@ -95,7 +92,6 @@ BucketMoveJob::checkBucket(const BucketId &bucket,
_moveHandler, _ready._metaStore->getBucketDB());
}
-
BucketMoveJob::ScanResult
BucketMoveJob::scanBuckets(size_t maxBucketsToScan, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard)
{
@@ -119,7 +115,6 @@ BucketMoveJob::scanBuckets(size_t maxBucketsToScan, IFrozenBucketHandler::Exclus
return ScanResult(bucketsScanned, passDone);
}
-
void
BucketMoveJob::moveDocuments(DocumentBucketMover &mover,
size_t maxDocsToMove,
@@ -139,6 +134,18 @@ BucketMoveJob::moveDocuments(DocumentBucketMover &mover,
}
}
+namespace {
+
+bool
+blockedDueToClusterState(const IBucketStateCalculator::SP &calc)
+{
+ bool clusterUp = calc.get() != nullptr && calc->clusterUp();
+ bool nodeUp = calc.get() != nullptr && calc->nodeUp();
+ bool nodeInitializing = calc.get() != nullptr && calc->nodeInitializing();
+ return !(clusterUp && nodeUp && !nodeInitializing);
+}
+
+}
BucketMoveJob::
BucketMoveJob(const IBucketStateCalculator::SP &calc,
@@ -152,7 +159,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
IDiskMemUsageNotifier &diskMemUsageNotifier,
double resourceLimitFactor,
const vespalib::string &docTypeName)
- : IMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0),
+ : BlockableMaintenanceJob("move_buckets." + docTypeName, 0.0, 0.0, resourceLimitFactor),
IClusterStateChangedHandler(),
IBucketFreezeListener(),
_calc(calc),
@@ -169,26 +176,20 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc,
_delayedBucketsFrozen(),
_frozenBuckets(frozenBuckets),
_delayedMover(),
- _runner(nullptr),
- _clusterUp(false),
- _nodeUp(false),
- _nodeInitializing(false),
- _resourcesOK(false),
- _runnable(false),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
_bucketStateChangedNotifier(bucketStateChangedNotifier),
- _diskMemUsageNotifier(diskMemUsageNotifier),
- _resourceLimitFactor(resourceLimitFactor)
+ _diskMemUsageNotifier(diskMemUsageNotifier)
{
- refreshDerivedClusterState();
-
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
+
_frozenBuckets.addListener(this);
_clusterStateChangedNotifier.addClusterStateChangedHandler(this);
_bucketStateChangedNotifier.addBucketStateChangedHandler(this);
_diskMemUsageNotifier.addDiskMemUsageListener(this);
}
-
BucketMoveJob::~BucketMoveJob()
{
_frozenBuckets.removeListener(this);
@@ -197,21 +198,19 @@ BucketMoveJob::~BucketMoveJob()
_diskMemUsageNotifier.removeDiskMemUsageListener(this);
}
-
void
BucketMoveJob::maybeCancelMover(DocumentBucketMover &mover)
{
// Cancel bucket if moving in wrong direction
if (!mover.bucketDone()) {
bool ready = mover.getSource() == &_ready;
- if (!_runnable ||
+ if (isBlocked() ||
_calc->shouldBeReady(mover.getBucket()) == ready) {
mover.cancel();
}
}
}
-
void
BucketMoveJob::maybeDelayMover(DocumentBucketMover &mover, BucketId bucket)
{
@@ -228,20 +227,16 @@ BucketMoveJob::notifyThawedBucket(const BucketId &bucket)
{
if (_delayedBucketsFrozen.erase(bucket) != 0u) {
_delayedBuckets.insert(bucket);
- if (_runner && _runnable) {
- _runner->run();
- }
+ considerRun();
}
}
-
void
BucketMoveJob::deactivateBucket(BucketId bucket)
{
_delayedBuckets.insert(bucket);
}
-
void
BucketMoveJob::activateBucket(BucketId bucket)
{
@@ -268,7 +263,6 @@ BucketMoveJob::changedCalculator()
maybeCancelMover(_delayedMover);
}
-
void
BucketMoveJob::scanAndMove(size_t maxBucketsToScan,
size_t maxDocsToMove)
@@ -318,47 +312,26 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan,
}
}
-void
-BucketMoveJob::registerRunner(IMaintenanceJobRunner *runner)
-{
- _runner = runner;
-}
-
-
bool
BucketMoveJob::run()
{
- if (!_runnable)
+ if (isBlocked()) {
return true; // indicate work is done, since node state is bad
+ }
scanAndMove(200, 1);
return done();
}
void
-BucketMoveJob::refreshRunnable()
-{
- _runnable = _clusterUp && _nodeUp && !_nodeInitializing && _resourcesOK;
-}
-
-void
-BucketMoveJob::refreshDerivedClusterState()
-{
- _clusterUp = _calc.get() != NULL && _calc->clusterUp();
- _nodeUp = _calc.get() != NULL && _calc->nodeUp();
- _nodeInitializing = _calc.get() != NULL && _calc->nodeInitializing();
- refreshRunnable();
-}
-
-void
-BucketMoveJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &
- newCalc)
+BucketMoveJob::notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc)
{
// Called by master write thread
_calc = newCalc;
- refreshDerivedClusterState();
changedCalculator();
- if (_runner && _runnable) {
- _runner->run();
+ if (blockedDueToClusterState(_calc)) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ } else {
+ unBlock(BlockedReason::CLUSTER_STATE);
}
}
@@ -372,20 +345,16 @@ BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId,
} else {
activateBucket(bucketId);
}
- if (!done() && _runner && _runnable) {
- _runner->run();
+ if (!done()) {
+ considerRun();
}
}
-void BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state)
+void
+BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state)
{
// Called by master write thread
- bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor);
- _resourcesOK = resourcesOK;
- refreshRunnable();
- if (_runner && _runnable) {
- _runner->run();
- }
+ internalNotifyDiskMemUsage(state);
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
index 16e3ad5ad93..ffbcf68f769 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
@@ -2,15 +2,15 @@
#pragma once
-#include "ibucketstatecalculator.h"
-#include "ibucketmodifiedhandler.h"
-#include "ifrozenbuckethandler.h"
+#include "blockable_maintenance_job.h"
#include "documentbucketmover.h"
-#include "i_maintenance_job.h"
-#include "iclusterstatechangedhandler.h"
+#include "i_disk_mem_usage_listener.h"
#include "ibucketfreezelistener.h"
+#include "ibucketmodifiedhandler.h"
+#include "ibucketstatecalculator.h"
#include "ibucketstatechangedhandler.h"
-#include "i_disk_mem_usage_listener.h"
+#include "iclusterstatechangedhandler.h"
+#include "ifrozenbuckethandler.h"
#include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h>
#include <set>
@@ -24,9 +24,9 @@ class IDiskMemUsageNotifier;
/**
* Class used to control the moving of buckets between the ready and
- * not ready sub databases.
+ * not ready sub databases based on the readiness of buckets according to the cluster state.
*/
-class BucketMoveJob : public IMaintenanceJob,
+class BucketMoveJob : public BlockableMaintenanceJob,
public IClusterStateChangedHandler,
public IBucketFreezeListener,
public IBucketStateChangedHandler,
@@ -97,16 +97,9 @@ private:
DelayedBucketSet _delayedBucketsFrozen;
IFrozenBucketHandler &_frozenBuckets;
DocumentBucketMover _delayedMover;
- IMaintenanceJobRunner *_runner;
- bool _clusterUp;
- bool _nodeUp;
- bool _nodeInitializing;
- bool _resourcesOK;
- bool _runnable; // can try to perform work
IClusterStateChangedNotifier &_clusterStateChangedNotifier;
IBucketStateChangedNotifier &_bucketStateChangedNotifier;
IDiskMemUsageNotifier &_diskMemUsageNotifier;
- double _resourceLimitFactor;
ScanResult
scanBuckets(size_t maxBucketsToScan,
@@ -126,9 +119,6 @@ private:
DocumentBucketMover &mover,
IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard);
- void refreshRunnable();
- void refreshDerivedClusterState();
-
/**
* Signal that the given bucket should be de-activated.
* An active bucket is not considered for moving from ready to not ready sub database.
@@ -169,25 +159,17 @@ public:
}
// IMaintenanceJob API
- virtual void registerRunner(IMaintenanceJobRunner *runner) override;
-
- // IMaintenanceJob API
virtual bool run() override;
// IClusterStateChangedHandler API
virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override;
-
// IBucketFreezeListener API
- /**
- * Signal that the given bucket has been thawed.
- * A thawed bucket can be considered for moving.
- */
virtual void notifyThawedBucket(const document::BucketId &bucket) override;
// IBucketStateChangedHandler API
- void notifyBucketStateChanged(const document::BucketId &bucketId,
- storage::spi::BucketInfo::ActiveState newState) override;
+ virtual void notifyBucketStateChanged(const document::BucketId &bucketId,
+ storage::spi::BucketInfo::ActiveState newState) override;
virtual void notifyDiskMemUsage(DiskMemUsageState state) override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
index d6353784fd7..3b3437b4346 100644
--- a/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/clusterstatehandler.cpp
@@ -25,32 +25,27 @@ class ClusterStateAdapter : public IBucketStateCalculator
{
private:
ClusterState _calc;
+ bool _clusterUp;
+ bool _nodeUp;
+ bool _nodeInitializing;
+ bool _nodeRetired;
public:
ClusterStateAdapter(const ClusterState &calc)
- : _calc(calc)
+ : _calc(calc),
+ _clusterUp(_calc.clusterUp()),
+ _nodeUp(_calc.nodeUp()),
+ _nodeInitializing(_calc.nodeInitializing()),
+ _nodeRetired(_calc.nodeRetired())
{
}
-
bool shouldBeReady(const document::BucketId &bucket) const override {
return _calc.shouldBeReady(Bucket(bucket, PartitionId(0)));
}
-
- bool clusterUp() const override {
- return _calc.clusterUp();
- }
-
- bool nodeUp() const override {
- return _calc.nodeUp();
- }
-
- bool nodeInitializing() const override {
- return _calc.nodeInitializing();
- }
-
- bool nodeRetired() const override {
- return _calc.nodeRetired();
- }
+ bool clusterUp() const override { return _clusterUp; }
+ bool nodeUp() const override { return _nodeUp; }
+ bool nodeInitializing() const override { return _nodeInitializing; }
+ bool nodeRetired() const override { return _nodeRetired; }
};
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h
new file mode 100644
index 00000000000..aa4d5ed79b6
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/i_blockable_maintenance_job.h
@@ -0,0 +1,42 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "i_maintenance_job.h"
+#include <string>
+
+namespace proton {
+
+/**
+ * Interface for a maintenance job that can be blocked and unblocked due to various external reasons.
+ * A blocked job is not executed. When unblocked, the job should be scheduled for execution again.
+ */
+class IBlockableMaintenanceJob : public IMaintenanceJob {
+public:
+ enum class BlockedReason {
+ RESOURCE_LIMITS = 0,
+ FROZEN_BUCKET = 1,
+ CLUSTER_STATE = 2
+ };
+
+ IBlockableMaintenanceJob(const vespalib::string &name,
+ double delay,
+ double interval)
+ : IMaintenanceJob(name, delay, interval)
+ {}
+
+ /**
+ * Block this job due to the given reason.
+ * Should be called from the same executor thread as the one used in IMaintenanceJobRunner.
+ */
+ virtual void setBlocked(BlockedReason reason) = 0;
+
+ /**
+ * Unblock this job for the given reason and consider running the job again if not blocked anymore.
+ * Can be called from any thread.
+ */
+ virtual void unBlock(BlockedReason reason) = 0;
+
+ virtual IBlockableMaintenanceJob *asBlockable() override { return this; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
index 1c83e2022c6..e32fd01df63 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
@@ -5,6 +5,7 @@
namespace proton {
+class IBlockableMaintenanceJob;
class IMaintenanceJobRunner;
/**
@@ -17,10 +18,6 @@ private:
const vespalib::string _name;
const double _delay;
const double _interval;
- bool _blocked;
-
-protected:
- void setBlocked(bool v) { _blocked = v; }
public:
typedef std::unique_ptr<IMaintenanceJob> UP;
@@ -30,8 +27,7 @@ public:
double interval)
: _name(name),
_delay(delay),
- _interval(interval),
- _blocked(false)
+ _interval(interval)
{}
virtual ~IMaintenanceJob() {}
@@ -39,8 +35,8 @@ public:
virtual const vespalib::string &getName() const { return _name; }
virtual double getDelay() const { return _delay; }
virtual double getInterval() const { return _interval; }
- virtual bool isBlocked() const { return _blocked; }
- virtual void unBlock() { setBlocked(false); }
+ virtual bool isBlocked() const { return false; }
+ virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; }
/**
* Register maintenance job runner, in case event passed to the
diff --git a/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h b/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h
index b6c2898d5e4..719e7746326 100644
--- a/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h
+++ b/searchcore/src/vespa/searchcore/proton/server/ibucketfreezelistener.h
@@ -14,6 +14,11 @@ class IBucketFreezeListener
{
public:
virtual ~IBucketFreezeListener() {}
+
+ /**
+ * Signal that the given bucket has been thawed.
+ * A thawed bucket can be considered for moving.
+ */
virtual void notifyThawedBucket(const document::BucketId &bucket) = 0;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
index 8358d65087f..37b4426ee18 100644
--- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
@@ -23,7 +23,7 @@ public:
// Implements IMaintenanceJob
virtual bool isBlocked() const override { return _job->isBlocked(); }
- virtual void unBlock() override { _job->unBlock(); }
+ virtual IBlockableMaintenanceJob *asBlockable() override { return _job->asBlockable(); }
virtual void registerRunner(IMaintenanceJobRunner *runner) override {
_job->registerRunner(runner);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index be5225c5acf..3cddfcc6510 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -48,7 +48,7 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(document.bucketId);
if ( ! bucketGuard ) {
// the job is blocked until the bucket for this document is thawed
- setBlocked(true);
+ setBlocked(BlockedReason::FROZEN_BUCKET);
_retryFrozenDocument = true;
return true;
} else {
@@ -88,8 +88,8 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC
double resourceLimitFactor,
IClusterStateChangedNotifier &clusterStateChangedNotifier,
bool nodeRetired)
- : IMaintenanceJob("lid_space_compaction." + handler.getName(),
- config.getDelay(), config.getInterval()),
+ : BlockableMaintenanceJob("lid_space_compaction." + handler.getName(),
+ config.getDelay(), config.getInterval(), resourceLimitFactor),
_cfg(config),
_handler(handler),
_opStorer(opStorer),
@@ -97,17 +97,14 @@ LidSpaceCompactionJob::LidSpaceCompactionJob(const DocumentDBLidSpaceCompactionC
_scanItr(),
_retryFrozenDocument(false),
_shouldCompactLidSpace(false),
- _resourcesOK(true),
- _nodeRetired(nodeRetired),
- _runnable(true),
- _runner(nullptr),
_diskMemUsageNotifier(diskMemUsageNotifier),
- _resourceLimitFactor(resourceLimitFactor),
_clusterStateChangedNotifier(clusterStateChangedNotifier)
{
_diskMemUsageNotifier.addDiskMemUsageListener(this);
_clusterStateChangedNotifier.addClusterStateChangedHandler(this);
- refreshRunnable();
+ if (nodeRetired) {
+ setBlocked(BlockedReason::CLUSTER_STATE);
+ }
}
LidSpaceCompactionJob::~LidSpaceCompactionJob()
@@ -119,7 +116,7 @@ LidSpaceCompactionJob::~LidSpaceCompactionJob()
bool
LidSpaceCompactionJob::run()
{
- if (!_runnable) {
+ if (isBlocked()) {
return true; // indicate work is done since no work can be done
}
LidUsageStats stats = _handler.getLidStatus();
@@ -136,28 +133,10 @@ LidSpaceCompactionJob::run()
}
void
-LidSpaceCompactionJob::refreshRunnable()
-{
- _runnable = (_resourcesOK && !_nodeRetired);
-}
-
-void
-LidSpaceCompactionJob::refreshAndConsiderRunnable()
-{
- bool oldRunnable = _runnable;
- refreshRunnable();
- if (_runner && _runnable && !oldRunnable) {
- _runner->run();
- }
-}
-
-void
LidSpaceCompactionJob::notifyDiskMemUsage(DiskMemUsageState state)
{
// Called by master write thread
- bool resourcesOK = !state.aboveDiskLimit(_resourceLimitFactor) && !state.aboveMemoryLimit(_resourceLimitFactor);
- _resourcesOK = resourcesOK;
- refreshAndConsiderRunnable();
+ internalNotifyDiskMemUsage(state);
}
void
@@ -165,20 +144,15 @@ LidSpaceCompactionJob::notifyClusterStateChanged(const IBucketStateCalculator::S
{
// Called by master write thread
bool nodeRetired = newCalc->nodeRetired();
- if (nodeRetired && !_nodeRetired) {
+ if (!nodeRetired) {
+ if (isBlocked(BlockedReason::CLUSTER_STATE)) {
+ LOG(info, "notifyClusterStateChanged(): Node is no longer retired -> lid space compaction job re-enabled");
+ unBlock(BlockedReason::CLUSTER_STATE);
+ }
+ } else {
LOG(info, "notifyClusterStateChanged(): Node is retired -> lid space compaction job disabled");
+ setBlocked(BlockedReason::CLUSTER_STATE);
}
- if (!nodeRetired && _nodeRetired) {
- LOG(info, "notifyClusterStateChanged(): Node is no longer retired -> lid space compaction job re-enabled");
- }
- _nodeRetired = nodeRetired;
- refreshAndConsiderRunnable();
-}
-
-void
-LidSpaceCompactionJob::registerRunner(IMaintenanceJobRunner *runner)
-{
- _runner = runner;
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
index 32178ac69d8..f260671344c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.h
@@ -1,10 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "blockable_maintenance_job.h"
#include "document_db_maintenance_config.h"
#include "i_disk_mem_usage_listener.h"
#include "i_lid_space_compaction_handler.h"
-#include "i_maintenance_job.h"
#include "i_operation_storer.h"
#include "ibucketstatecalculator.h"
#include "iclusterstatechangedhandler.h"
@@ -23,7 +23,7 @@ class IClusterStateChangedNotifier;
* Compaction is handled by moving documents from high lids to low free lids.
* A handler is typically working over a single document sub db.
*/
-class LidSpaceCompactionJob : public IMaintenanceJob,
+class LidSpaceCompactionJob : public BlockableMaintenanceJob,
public IDiskMemUsageListener,
public IClusterStateChangedHandler
{
@@ -35,12 +35,7 @@ private:
IDocumentScanIterator::UP _scanItr;
bool _retryFrozenDocument;
bool _shouldCompactLidSpace;
- bool _resourcesOK;
- bool _nodeRetired;
- bool _runnable; // can try to perform work
- IMaintenanceJobRunner *_runner;
IDiskMemUsageNotifier &_diskMemUsageNotifier;
- double _resourceLimitFactor;
IClusterStateChangedNotifier &_clusterStateChangedNotifier;
bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const;
@@ -70,7 +65,6 @@ public:
// Implements IMaintenanceJob
virtual bool run() override;
- virtual void registerRunner(IMaintenanceJobRunner *runner) override;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index 3d029c26982..79135d27e5c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -3,6 +3,7 @@
#include "maintenancecontroller.h"
#include "maintenancejobrunner.h"
#include "document_db_maintenance_config.h"
+#include "i_blockable_maintenance_job.h"
#include <vespa/searchcorespi/index/i_thread_service.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/timer.h>
@@ -219,10 +220,9 @@ MaintenanceController::notifyThawedBucket(const BucketId &bucket)
(void) bucket;
// No need to take _jobsLock as modification of _jobs also happens in master write thread.
for (const auto &jw : _jobs) {
- IMaintenanceJob &job = jw->getJob();
- if (job.isBlocked()) {
- job.unBlock();
- jw->run();
+ IBlockableMaintenanceJob *job = jw->getJob().asBlockable();
+ if (job && job->isBlocked()) {
+ job->unBlock(IBlockableMaintenanceJob::BlockedReason::FROZEN_BUCKET);
}
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
index 637fa3363b6..96952cf9f13 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp
@@ -22,7 +22,7 @@ PruneRemovedDocumentsJob(const Config &config,
const vespalib::string &docTypeName,
IPruneRemovedDocumentsHandler &handler,
IFrozenBucketHandler &frozenHandler)
- : IMaintenanceJob("prune_removed_documents." + docTypeName,
+ : BlockableMaintenanceJob("prune_removed_documents." + docTypeName,
config.getDelay(), config.getInterval()),
_metaStore(metaStore),
_subDbId(subDbId),
@@ -85,7 +85,7 @@ PruneRemovedDocumentsJob::run()
BucketId bucket(metaData.getBucketId());
IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard = _frozenHandler.acquireExclusiveBucket(bucket);
if ( ! bucketGuard ) {
- setBlocked(true);
+ setBlocked(BlockedReason::FROZEN_BUCKET);
_nextLid = lid;
flush(olid, lid, ageLimit);
return true;
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
index 4f97268cd35..e2621bc6118 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h
@@ -1,10 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "blockable_maintenance_job.h"
#include "document_db_maintenance_config.h"
-#include "i_maintenance_job.h"
#include <persistence/spi/types.h>
-// #include <vespa/searchlib/common/idocumentmetastore.h>
namespace proton
{
@@ -17,7 +16,7 @@ class IFrozenBucketHandler;
* Job that regularly checks whether old removed documents should be
* forgotten.
*/
-class PruneRemovedDocumentsJob : public IMaintenanceJob
+class PruneRemovedDocumentsJob : public BlockableMaintenanceJob
{
private:
const IDocumentMetaStore &_metaStore; // external ownership