summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-03-19 11:52:04 +0100
committerHenning Baldersheim <balder@oath.com>2018-03-19 13:01:17 +0100
commitb9c175537bc743a0bf457d579d03b59867d1a1f1 (patch)
treeaed9fcb4c0ceeb66101898c2fc413821daa2c244
parentfcbaf85bc896b2664a64bda95f47c2e45f64a08b (diff)
- Use num_threads directly as it has a sane default.
- Use std::make_ insteda of explicit new. - std::shared_ptr can be assigned from std::unque_ptr. Avoid using release().
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp160
1 files changed, 55 insertions, 105 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 604acebcf38..b4735c2961a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -29,10 +29,8 @@ using document::BucketSpace;
namespace storage {
FileStorManager::
-FileStorManager(const config::ConfigUri & configUri,
- const spi::PartitionStateList& partitions,
- spi::PersistenceProvider& provider,
- ServiceLayerComponentRegister& compReg)
+FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions,
+ spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg)
: StorageLinkQueued("File store manager", compReg),
framework::HtmlStatusReporter("filestorman", "File store manager"),
_compReg(compReg),
@@ -83,8 +81,7 @@ FileStorManager::~FileStorManager()
}
}
}
- LOG(debug, "Closing all filestor queues, answering queued messages. "
- "New messages will be refused.");
+ LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused.");
_filestorHandler->close();
LOG(debug, "Deleting filestor threads. Waiting for their current operation "
"to finish. Stop their threads and delete objects.");
@@ -116,7 +113,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_disks.resize(_component.getDiskCount());
- size_t numThreads = (_config->numThreads) ? 6 : _config->numThreads;
+ size_t numThreads = _config->numThreads;
_metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads);
_filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg));
@@ -124,10 +121,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
if (_partitions[i].isUp()) {
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < numThreads; j++) {
- _disks[i].push_back(DiskThread::SP(
- new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
- *_metrics->disks[i]->threads[j], i)));
-
+ _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i));
}
} else {
_filestorHandler->disable(i);
@@ -137,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
}
void
-FileStorManager::replyDroppedOperation(api::StorageMessage& msg,
- const document::Bucket& bucket,
- api::ReturnCode::Result returnCode,
- vespalib::stringref reason)
+FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket,
+ api::ReturnCode::Result returnCode, vespalib::stringref reason)
{
std::ostringstream error;
error << "Dropping " << msg.getType() << " to bucket "
<< bucket.toString() << ". Reason: " << reason;
LOGBT(debug, bucket.toString(), "%s", error.str().c_str());
if (!msg.getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply();
reply->setResult(api::ReturnCode(returnCode, error.str()));
sendUp(reply);
}
}
void
-FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket)
{
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::BUCKET_NOT_FOUND,
- "bucket does not exist");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist");
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
- const document::Bucket& bucket)
+FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get(
bucket.getBucketId(), "FileStorManager::mapOperationToDisk"));
@@ -177,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg,
}
StorBucketDatabase::WrappedEntry
-FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
- const document::DocumentId* docId)
+FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId)
{
StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace());
StorBucketDatabase::WrappedEntry entry(database.get(
@@ -188,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
if (docId) {
specific = _bucketIdFactory.getBucketId(*docId);
}
- typedef std::map<document::BucketId,
- StorBucketDatabase::WrappedEntry> BucketMap;
+ typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap;
std::shared_ptr<api::StorageReply> reply;
{
- BucketMap results(
- database.getContained(
- specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
+ BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2"));
if (results.size() == 1) {
- LOG(debug,
- "Remapping %s operation to specific %s versus "
- "non-existing %s to %s.",
+ LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.",
cmd.toString().c_str(), specific.toString().c_str(),
cmd.getBucketId().toString().c_str(),
results.begin()->first.toString().c_str());
@@ -223,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str());
- reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release());
- reply->setResult(
- api::ReturnCode(
- api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
+ reply = static_cast<api::StorageCommand&>(cmd).makeReply();
+ reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str()));
}
sendUp(reply);
}
@@ -234,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd,
}
bool
-FileStorManager::handlePersistenceMessage(
- const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
+FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk)
{
api::ReturnCode errorCode(api::ReturnCode::OK);
do {
- LOG(spam, "Received %s. Attempting to queue it to disk %u.",
- msg->getType().getName().c_str(), disk);
+ LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk);
LOG_BUCKET_OPERATION_NO_LOCK(
getStorageMessageBucket(*msg).getBucketId(),
- vespalib::make_string("Attempting to queue %s to disk %u",
- msg->toString().c_str(), disk));
+ vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk));
if (_filestorHandler->schedule(msg, disk)) {
@@ -255,12 +231,10 @@ FileStorManager::handlePersistenceMessage(
}
switch (_filestorHandler->getDiskState(disk)) {
case FileStorHandler::DISABLED:
- errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE,
- "Disk disabled");
+ errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled");
break;
case FileStorHandler::CLOSED:
- errorCode = api::ReturnCode(api::ReturnCode::ABORTED,
- "Shutting down storage node.");
+ errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.");
break;
case FileStorHandler::AVAILABLE:
assert(false);
@@ -269,8 +243,7 @@ FileStorManager::handlePersistenceMessage(
// If we get here, we failed to schedule message. errorCode says why
// We need to reply to message (while not having bucket lock)
if (!msg->getType().isReply()) {
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply();
reply->setResult(errorCode);
LOG(spam, "Received persistence message %s. Returning reply: %s",
msg->getType().getName().c_str(), errorCode.toString().c_str());
@@ -283,7 +256,7 @@ bool
FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Put command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -291,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -303,7 +275,7 @@ bool
FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Update command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -311,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -322,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd)
bool
FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -334,7 +304,7 @@ bool
FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
{
if (cmd->getTimestamp() == 0) {
- shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ shared_ptr<api::StorageReply> reply = cmd->makeReply();
std::string msg("Remove command received without timestamp set. "
"Distributor need to set timestamp to ensure equal "
"timestamps between storage nodes. Rejecting.");
@@ -342,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
sendUp(reply);
return true;
}
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, &cmd->getDocumentId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -353,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd)
bool
FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -403,12 +371,10 @@ FileStorManager::onCreateBucket(
bucket.getBucketId(), "FileStorManager::onCreateBucket",
StorBucketDatabase::CREATE_IF_NONEXISTING));
if (entry.preExisted()) {
- LOG(debug,
- "Got create bucket request for %s which already exists: %s",
+ LOG(debug, "Got create bucket request for %s which already exists: %s",
cmd->getBucketId().toString().c_str(),
entry->getBucketInfo().toString().c_str());
- code = api::ReturnCode(api::ReturnCode::EXISTS,
- "Bucket already exist");
+ code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist");
} else {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
@@ -425,12 +391,9 @@ FileStorManager::onCreateBucket(
return true;
} else {
entry.remove();
- code = api::ReturnCode(
- api::ReturnCode::IO_FAILURE,
- vespalib::make_string(
- "Trying to create bucket %s on disabled disk %d",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
+ code = api::ReturnCode(api::ReturnCode::IO_FAILURE,
+ vespalib::make_string("Trying to create bucket %s on disabled disk %d",
+ cmd->getBucketId().toString().c_str(), entry->disk));
}
}
}
@@ -451,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
"FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
sendUp(reply);
return true;
}
@@ -473,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
<< entry->getBucketInfo().toString();
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
- std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
@@ -523,8 +486,7 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -547,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getClusterStateBundle()->getVersion()));
LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
- api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
+ auto reply = std::make_shared<api::MergeBucketReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -562,39 +524,30 @@ bool
FileStorManager::onGetBucketDiff(
const shared_ptr<api::GetBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucket(),
- *cmd,
- "FileStorManager::onGetBucketDiff"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff"));
if (!entry.exist()) {
return true;
}
if (!entry.preExisted()) {
entry->disk = _component.getIdealPartition(cmd->getBucket());
if (_partitions[entry->disk].isUp()) {
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to get bucket diff being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
entry->info.setTotalDocumentSize(0);
entry->info.setUsedFileSize(0);
entry->info.setReady(true);
- // Call before writing bucket entry as we need to have bucket
- // lock while calling
+ // Call before writing bucket entry as we need to have bucket
+ // lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
- "Trying to merge non-existing bucket %s, which "
- "can't be created because target disk %d is down",
- cmd->getBucketId().toString().c_str(),
- entry->disk));
- LOGBT(warning, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
- api::GetBucketDiffReply::SP reply(
- new api::GetBucketDiffReply(*cmd));
+ "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down",
+ cmd->getBucketId().toString().c_str(), entry->disk));
+ LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
+ auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd);
reply->setResult(code);
sendUp(reply);
return true;
@@ -614,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const
BucketSpace bucketSpace(msg.getBucket().getBucketSpace());
if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) {
document::Bucket bucket(bucketSpace, entry.getBucketId());
- replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED,
- "bucket became inconsistent during merging");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging");
return false;
}
return true;
@@ -695,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c
}
bool
-FileStorManager::onSetBucketState(
- const std::shared_ptr<api::SetBucketStateCommand>& cmd)
+FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket()));
if (entry.exist()) {
@@ -801,7 +752,7 @@ void
FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
- sendReply(api::StorageReply::SP(cmd->makeReply().release()));
+ sendReply(api::StorageReply::SP(cmd->makeReply()));
}
bool
@@ -893,8 +844,7 @@ void FileStorManager::onFlush(bool downwards)
}
void
-FileStorManager::reportHtmlStatus(std::ostream& out,
- const framework::HttpUrlPath& path) const
+FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const
{
bool showStatus = !path.hasAttribute("thread");
bool verbose = path.hasAttribute("verbose");
@@ -942,7 +892,7 @@ FileStorManager::updateState()
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
- // If edge where we go down
+ // If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;