summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-24 12:40:19 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-24 12:40:19 +0000
commitff645a32f1a216f66bd8805e7ae2ced7b6544cd8 (patch)
tree466a3de8fc15e7a48daee2552c9f4bd0fce1b219 /storage
parent823100d9d380c2749c466bdfe3ef640fa5d57cae (diff)
Use document::Bucket instead of document::BucketId as key for
locking buckets and remapping queued operations after split/join.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp6
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/common/bucketmessages.cpp6
-rw-r--r--storage/src/vespa/storage/common/bucketmessages.h1
-rw-r--r--storage/src/vespa/storage/common/messagebucket.cpp (renamed from storage/src/vespa/storage/common/messagebucketid.cpp)62
-rw-r--r--storage/src/vespa/storage/common/messagebucket.h (renamed from storage/src/vespa/storage/common/messagebucketid.h)4
-rw-r--r--storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp166
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h36
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp56
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h4
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp6
17 files changed, 200 insertions, 191 deletions
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index bbfe2c19c7b..67b93108139 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -905,9 +905,9 @@ FileStorManagerTest::testRemapSplit()
"BucketId(0x40000000000011d7): Put(BucketId(0x40000000000011d7), userdoc:footype:4567:bar, timestamp 13, size 108) (priority: 127)\n"),
filestorHandler.dumpQueue(0));
- FileStorHandler::RemapInfo a(document::BucketId(17, 1234), 0);
- FileStorHandler::RemapInfo b(document::BucketId(17, 1234 | 1 << 16), 0);
- filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(bucket1, 0), a, b);
+ FileStorHandler::RemapInfo a(makeDocumentBucket(document::BucketId(17, 1234)), 0);
+ FileStorHandler::RemapInfo b(makeDocumentBucket(document::BucketId(17, 1234 | 1 << 16)), 0);
+ filestorHandler.remapQueueAfterSplit(FileStorHandler::RemapInfo(makeDocumentBucket(bucket1), 0), a, b);
CPPUNIT_ASSERT(a.foundInQueue);
CPPUNIT_ASSERT(!b.foundInQueue);
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 9014c9fdd46..aff675b2a65 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -5,7 +5,7 @@ vespa_add_library(storage_common OBJECT
storagelink.cpp
storagelinkqueued.cpp
bucketoperationlogger.cpp
- messagebucketid.cpp
+ messagebucket.cpp
bucketmessages.cpp
statusmessages.cpp
messagesender.cpp
diff --git a/storage/src/vespa/storage/common/bucketmessages.cpp b/storage/src/vespa/storage/common/bucketmessages.cpp
index 1317f5e0814..1d9d64ad24f 100644
--- a/storage/src/vespa/storage/common/bucketmessages.cpp
+++ b/storage/src/vespa/storage/common/bucketmessages.cpp
@@ -15,6 +15,12 @@ ReadBucketList::ReadBucketList(BucketSpace bucketSpace, spi::PartitionId partiti
ReadBucketList::~ReadBucketList() { }
+document::Bucket
+ReadBucketList::getBucket() const
+{
+ return document::Bucket(_bucketSpace, document::BucketId());
+}
+
void
ReadBucketList::print(std::ostream& out, bool verbose, const std::string& indent) const {
out << "ReadBucketList(" << _partition << ")";
diff --git a/storage/src/vespa/storage/common/bucketmessages.h b/storage/src/vespa/storage/common/bucketmessages.h
index 36e3a18c6b7..0ff7a22aa4d 100644
--- a/storage/src/vespa/storage/common/bucketmessages.h
+++ b/storage/src/vespa/storage/common/bucketmessages.h
@@ -28,6 +28,7 @@ public:
~ReadBucketList();
document::BucketSpace getBucketSpace() const { return _bucketSpace; }
spi::PartitionId getPartition() const { return _partition; }
+ document::Bucket getBucket() const override;
std::unique_ptr<api::StorageReply> makeReply() override;
diff --git a/storage/src/vespa/storage/common/messagebucketid.cpp b/storage/src/vespa/storage/common/messagebucket.cpp
index 8cb72040ed2..f23bece015f 100644
--- a/storage/src/vespa/storage/common/messagebucketid.cpp
+++ b/storage/src/vespa/storage/common/messagebucket.cpp
@@ -1,6 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "messagebucketid.h"
+#include "messagebucket.h"
#include "statusmessages.h"
#include "bucketmessages.h"
#include <vespa/storageapi/message/bucket.h>
@@ -16,76 +16,76 @@
namespace storage {
-document::BucketId
-getStorageMessageBucketId(const api::StorageMessage& msg)
+document::Bucket
+getStorageMessageBucket(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
- return static_cast<const api::GetCommand&>(msg).getBucketId();
+ return static_cast<const api::GetCommand&>(msg).getBucket();
case api::MessageType::PUT_ID:
- return static_cast<const api::PutCommand&>(msg).getBucketId();
+ return static_cast<const api::PutCommand&>(msg).getBucket();
case api::MessageType::UPDATE_ID:
- return static_cast<const api::UpdateCommand&>(msg).getBucketId();
+ return static_cast<const api::UpdateCommand&>(msg).getBucket();
case api::MessageType::REMOVE_ID:
- return static_cast<const api::RemoveCommand&>(msg).getBucketId();
+ return static_cast<const api::RemoveCommand&>(msg).getBucket();
case api::MessageType::REVERT_ID:
- return static_cast<const api::RevertCommand&>(msg).getBucketId();
+ return static_cast<const api::RevertCommand&>(msg).getBucket();
case api::MessageType::STATBUCKET_ID:
- return static_cast<const api::StatBucketCommand&>(msg).getBucketId();
+ return static_cast<const api::StatBucketCommand&>(msg).getBucket();
case api::MessageType::MULTIOPERATION_ID:
return static_cast<const api::MultiOperationCommand&>(msg)
- .getBucketId();
+ .getBucket();
case api::MessageType::BATCHPUTREMOVE_ID:
return static_cast<const api::BatchPutRemoveCommand&>(msg)
- .getBucketId();
+ .getBucket();
case api::MessageType::REMOVELOCATION_ID:
return static_cast<const api::RemoveLocationCommand&>(msg)
- .getBucketId();
+ .getBucket();
case api::MessageType::CREATEBUCKET_ID:
- return static_cast<const api::CreateBucketCommand&>(msg).getBucketId();
+ return static_cast<const api::CreateBucketCommand&>(msg).getBucket();
case api::MessageType::DELETEBUCKET_ID:
- return static_cast<const api::DeleteBucketCommand&>(msg).getBucketId();
+ return static_cast<const api::DeleteBucketCommand&>(msg).getBucket();
case api::MessageType::MERGEBUCKET_ID:
- return static_cast<const api::MergeBucketCommand&>(msg).getBucketId();
+ return static_cast<const api::MergeBucketCommand&>(msg).getBucket();
case api::MessageType::GETBUCKETDIFF_ID:
- return static_cast<const api::GetBucketDiffCommand&>(msg).getBucketId();
+ return static_cast<const api::GetBucketDiffCommand&>(msg).getBucket();
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
- return static_cast<const api::GetBucketDiffReply&>(msg).getBucketId();
+ return static_cast<const api::GetBucketDiffReply&>(msg).getBucket();
case api::MessageType::APPLYBUCKETDIFF_ID:
return static_cast<const api::ApplyBucketDiffCommand&>(msg)
- .getBucketId();
+ .getBucket();
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- return static_cast<const api::ApplyBucketDiffReply&>(msg).getBucketId();
+ return static_cast<const api::ApplyBucketDiffReply&>(msg).getBucket();
case api::MessageType::JOINBUCKETS_ID:
- return static_cast<const api::JoinBucketsCommand&>(msg).getBucketId();
+ return static_cast<const api::JoinBucketsCommand&>(msg).getBucket();
case api::MessageType::SPLITBUCKET_ID:
- return static_cast<const api::SplitBucketCommand&>(msg).getBucketId();
+ return static_cast<const api::SplitBucketCommand&>(msg).getBucket();
case api::MessageType::SETBUCKETSTATE_ID:
- return static_cast<const api::SetBucketStateCommand&>(msg).getBucketId();
+ return static_cast<const api::SetBucketStateCommand&>(msg).getBucket();
case api::MessageType::INTERNAL_ID:
switch(static_cast<const api::InternalCommand&>(msg).getType()) {
case RequestStatusPage::ID:
- return document::BucketId();
+ return document::Bucket();
case GetIterCommand::ID:
- return static_cast<const GetIterCommand&>(msg).getBucketId();
+ return static_cast<const GetIterCommand&>(msg).getBucket();
case CreateIteratorCommand::ID:
return static_cast<const CreateIteratorCommand&>(msg)
- .getBucketId();
+ .getBucket();
case ReadBucketList::ID:
- return document::BucketId();
+ return static_cast<const ReadBucketList&>(msg).getBucket();
case ReadBucketInfo::ID:
- return static_cast<const ReadBucketInfo&>(msg).getBucketId();
+ return static_cast<const ReadBucketInfo&>(msg).getBucket();
case RepairBucketCommand::ID:
- return static_cast<const RepairBucketCommand&>(msg).getBucketId();
+ return static_cast<const RepairBucketCommand&>(msg).getBucket();
case BucketDiskMoveCommand::ID:
- return static_cast<const BucketDiskMoveCommand&>(msg).getBucketId();
+ return static_cast<const BucketDiskMoveCommand&>(msg).getBucket();
case InternalBucketJoinCommand::ID:
return static_cast<const InternalBucketJoinCommand&>(msg)
- .getBucketId();
+ .getBucket();
case RecheckBucketInfoCommand::ID:
- return static_cast<const RecheckBucketInfoCommand&>(msg).getBucketId();
+ return static_cast<const RecheckBucketInfoCommand&>(msg).getBucket();
default:
break;
}
diff --git a/storage/src/vespa/storage/common/messagebucketid.h b/storage/src/vespa/storage/common/messagebucket.h
index 863ee234f82..c383a14d541 100644
--- a/storage/src/vespa/storage/common/messagebucketid.h
+++ b/storage/src/vespa/storage/common/messagebucket.h
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
namespace storage {
@@ -14,7 +14,7 @@ class StorageMessage;
* @throws vespalib::IllegalArgumentException if msg does not
* have a bucket id.
*/
-document::BucketId getStorageMessageBucketId(
+document::Bucket getStorageMessageBucket(
const api::StorageMessage& msg);
}
diff --git a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
index 299917138db..4f4d47cacac 100644
--- a/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
+++ b/storage/src/vespa/storage/persistence/diskmoveoperationhandler.cpp
@@ -72,7 +72,7 @@ DiskMoveOperationHandler::handleBucketDiskMove(BucketDiskMoveCommand& cmd,
// Move queued operations in bucket to new thread. Hold bucket lock
// while doing it, so filestor manager can't put in other operations
// first, such that operations change order.
- _env._fileStorHandler.remapQueueAfterDiskMove(bucket.getBucketId(), deviceIndex, targetDisk);
+ _env._fileStorHandler.remapQueueAfterDiskMove(bucket, deviceIndex, targetDisk);
if (entry.exist()) {
entry->setBucketInfo(bInfo);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 27831d5edff..7286b6e3586 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -77,14 +77,14 @@ FileStorHandler::getNextMessage(uint16_t thread,
}
FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::BucketId& bucket, uint16_t disk)
+FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk)
{
return _impl->lock(bucket, disk);
}
void
FileStorHandler::remapQueueAfterDiskMove(
- const document::BucketId& bucket,
+ const document::Bucket& bucket,
uint16_t sourceDisk, uint16_t targetDisk)
{
RemapInfo target(bucket, targetDisk);
@@ -111,10 +111,10 @@ FileStorHandler::remapQueueAfterSplit(
}
void
-FileStorHandler::failOperations(const document::BucketId& bid,
+FileStorHandler::failOperations(const document::Bucket &bucket,
uint16_t fromDisk, const api::ReturnCode& err)
{
- _impl->failOperations(bid, fromDisk, err);
+ _impl->failOperations(bucket, fromDisk, err);
}
void
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 04eec67a8c4..f7e32554c18 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -14,7 +14,7 @@
#pragma once
#include "mergestatus.h"
-#include <vespa/document/bucket/bucketid.h>
+#include <vespa/document/bucket/bucket.h>
#include <ostream>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
@@ -43,12 +43,12 @@ class AbortBucketOperationsCommand;
class FileStorHandler : public MessageSender {
public:
struct RemapInfo {
- document::BucketId bid;
+ document::Bucket bucket;
uint16_t diskIndex;
bool foundInQueue;
- RemapInfo(const document::BucketId& bucketId, uint16_t diskIdx)
- : bid(bucketId),
+ RemapInfo(const document::Bucket &bucket_, uint16_t diskIdx)
+ : bucket(bucket_),
diskIndex(diskIdx),
foundInQueue(false)
{}
@@ -58,7 +58,7 @@ public:
public:
typedef std::shared_ptr<BucketLockInterface> SP;
- virtual const document::BucketId& getBucketId() const = 0;
+ virtual const document::Bucket &getBucket() const = 0;
virtual ~BucketLockInterface() {};
};
@@ -156,7 +156,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::BucketId&, uint16_t disk);
+ BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk);
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
@@ -168,7 +168,7 @@ public:
* requeststatus - Ignore
* readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors
*/
- void remapQueueAfterDiskMove(const document::BucketId& bucket,
+ void remapQueueAfterDiskMove(const document::Bucket &bucket,
uint16_t sourceDisk, uint16_t targetDisk);
/**
@@ -219,7 +219,7 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::BucketId&, uint16_t fromDisk,
+ void failOperations(const document::Bucket&, uint16_t fromDisk,
const api::ReturnCode&);
/**
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index e7fa4eadda0..a887d7d3791 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -10,7 +10,7 @@
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/statusmessages.h>
#include <vespa/storage/common/bucketoperationlogger.h>
-#include <vespa/storage/common/messagebucketid.h>
+#include <vespa/storage/common/messagebucket.h>
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/storageapi/message/batch.h>
@@ -20,6 +20,8 @@
#include <vespa/log/log.h>
LOG_SETUP(".persistence.filestor.handler.impl");
+using document::BucketSpace;
+
namespace storage {
FileStorHandlerImpl::FileStorHandlerImpl(
@@ -248,7 +250,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg,
{
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
- MessageEntry messageEntry(msg, getStorageMessageBucketId(*msg));
+ MessageEntry messageEntry(msg, getStorageMessageBucket(*msg));
vespalib::MonitorGuard lockGuard(t.lock);
if (t.getState() == FileStorHandler::AVAILABLE) {
@@ -335,11 +337,11 @@ FileStorHandlerImpl::abortQueuedCommandsForBuckets(
"bucket operation was bound to");
for (iter_t it(t.queue.begin()), e(t.queue.end()); it != e;) {
api::StorageMessage& msg(*it->_command);
- if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucketId)) {
+ if (messageMayBeAborted(msg) && cmd.shouldAbort(it->_bucket.getBucketId())) {
LOG(debug,
"Aborting operation %s as it is bound for bucket %s",
msg.toString().c_str(),
- it->_bucketId.toString().c_str());
+ it->_bucket.getBucketId().toString().c_str());
std::shared_ptr<api::StorageReply> msgReply(
static_cast<api::StorageCommand&>(msg).makeReply().release());
msgReply->setResult(abortedCode);
@@ -358,7 +360,7 @@ FileStorHandlerImpl::diskHasActiveOperationForAbortedBucket(
const AbortBucketOperationsCommand& cmd) const
{
for (auto& lockedBucket : disk.lockedBuckets) {
- if (cmd.shouldAbort(lockedBucket.first)) {
+ if (cmd.shouldAbort(lockedBucket.first.getBucketId())) {
LOG(spam,
"Disk had active operation for aborted bucket %s, "
"waiting for it to complete...",
@@ -423,12 +425,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
FileStorHandler::LockedMessage& lck,
uint8_t maxPriority)
{
- document::BucketId id(lck.first->getBucketId());
+ document::Bucket bucket(lck.first->getBucket());
LOG(spam,
"Disk %d retrieving message for buffered bucket %s",
disk,
- id.toString().c_str());
+ bucket.getBucketId().toString().c_str());
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
@@ -440,7 +442,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
vespalib::MonitorGuard lockGuard(t.lock);
BucketIdx& idx = boost::multi_index::get<2>(t.queue);
- std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(id);
+ std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket);
// No more for this bucket.
if (range.first == range.second) {
@@ -467,7 +469,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk,
LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), "
"timeout %d",
- m.toString().c_str(), waitTime, id.toString().c_str(),
+ m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(),
static_cast<api::StorageCommand&>(m).getTimeout());
if (m.getType().isReply() ||
@@ -540,11 +542,11 @@ std::unique_ptr<FileStorHandler::BucketLockInterface>
FileStorHandlerImpl::takeDiskBucketLockOwnership(
const vespalib::MonitorGuard & guard,
Disk& disk,
- const document::BucketId& id,
+ const document::Bucket &bucket,
const api::StorageMessage& msg)
{
return std::unique_ptr<FileStorHandler::BucketLockInterface>(
- new BucketLock(guard, disk, id, msg.getPriority(), msg.getSummary()));
+ new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary()));
}
std::unique_ptr<api::StorageReply>
@@ -561,8 +563,8 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) const
namespace {
bool
- bucketIsLockedOnDisk(const document::BucketId &id, const FileStorHandlerImpl::Disk &t) {
- return (id.getRawId() != 0 && t.isLocked(id));
+ bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) {
+ return (id.getBucketId().getRawId() != 0 && t.isLocked(id));
}
/**
@@ -598,7 +600,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority)
PriorityIdx& idx(boost::multi_index::get<1>(t.queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
- while (iter != end && bucketIsLockedOnDisk(iter->_bucketId, t)) {
+ while (iter != end && bucketIsLockedOnDisk(iter->_bucket, t)) {
iter++;
}
if (iter != end) {
@@ -632,11 +634,11 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori
m.toString().c_str(), waitTime, static_cast<api::StorageCommand &>(m).getTimeout());
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
- document::BucketId bucketId(iter->_bucketId);
+ document::Bucket bucket(iter->_bucket);
idx.erase(iter); // iter not used after this point.
if (!messageTimedOutInQueue(*msg, waitTime)) {
- auto locker = takeDiskBucketLockOwnership(guard, t, bucketId, *msg);
+ auto locker = takeDiskBucketLockOwnership(guard, t, bucket, *msg);
guard.unlock();
MBUS_TRACE(trace, 9, "FileStorHandler: Got lock on bucket");
return std::move(FileStorHandler::LockedMessage(std::move(locker), std::move(msg)));
@@ -650,22 +652,22 @@ FileStorHandlerImpl::getMessage(vespalib::MonitorGuard & guard, Disk & t, Priori
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
-FileStorHandlerImpl::lock(const document::BucketId& bucket, uint16_t disk)
+FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk)
{
assert(disk < _diskInfo.size());
Disk& t(_diskInfo[disk]);
LOG(spam,
"Acquiring filestor lock for %s on disk %d",
- bucket.toString().c_str(),
+ bucket.getBucketId().toString().c_str(),
disk);
vespalib::MonitorGuard lockGuard(t.lock);
- while (bucket.getRawId() != 0 && t.isLocked(bucket)) {
+ while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) {
LOG(spam,
"Contending for filestor lock for %s",
- bucket.toString().c_str());
+ bucket.getBucketId().toString().c_str());
lockGuard.wait(100);
}
@@ -740,10 +742,12 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId(
std::vector<RemapInfo*>& targets)
{
document::DocumentId id(getDocId(msg));
- document::BucketId bucket(_bucketIdFactory.getBucketId(id));
+ document::Bucket bucket(BucketSpace::placeHolder(), _bucketIdFactory.getBucketId(id));
for (uint32_t i = 0; i < targets.size(); i++) {
- if (targets[i]->bid.getRawId() != 0 && targets[i]->bid.contains(bucket)) {
+ if (targets[i]->bucket.getBucketId().getRawId() != 0 &&
+ targets[i]->bucket.getBucketSpace() == bucket.getBucketSpace() &&
+ targets[i]->bucket.getBucketId().contains(bucket.getBucketId())) {
return i;
}
}
@@ -751,15 +755,15 @@ FileStorHandlerImpl::calculateTargetBasedOnDocId(
return -1;
}
-document::BucketId
+document::Bucket
FileStorHandlerImpl::remapMessage(
api::StorageMessage& msg,
- const document::BucketId& source,
+ const document::Bucket& source,
Operation op,
std::vector<RemapInfo*>& targets,
uint16_t& targetDisk, api::ReturnCode& returnCode)
{
- document::BucketId newBucketId = source;
+ document::Bucket newBucket = source;
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
@@ -771,12 +775,12 @@ FileStorHandlerImpl::remapMessage(
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op == SPLIT) {
int idx = calculateTargetBasedOnDocId(msg, targets);
if (idx > -1) {
- cmd.remapBucketId(targets[idx]->bid);
+ cmd.remapBucketId(targets[idx]->bucket.getBucketId());
targets[idx]->foundInQueue = true;
targetDisk = targets[idx]->diskIndex;
#if defined(ENABLE_BUCKET_OPERATION_LOGGING)
@@ -789,19 +793,19 @@ FileStorHandlerImpl::remapMessage(
LOG_BUCKET_OPERATION_NO_LOCK(targets[idx]->bid, desc);
}
#endif
- newBucketId = targets[idx]->bid;
+ newBucket = targets[idx]->bucket;
} else {
document::DocumentId did(getDocId(msg));
document::BucketId bucket = _bucketIdFactory.getBucketId(did);
uint32_t commonBits(
- findCommonBits(targets[0]->bid, bucket));
- if (commonBits < source.getUsedBits()) {
+ findCommonBits(targets[0]->bucket.getBucketId(), bucket));
+ if (commonBits < source.getBucketId().getUsedBits()) {
std::ostringstream ost;
ost << bucket << " belongs in neither "
- << targets[0]->bid << " nor " << targets[1]->bid
+ << targets[0]->bucket.getBucketId() << " nor " << targets[1]->bucket.getBucketId()
<< ". Cannot remap it after split. It "
<< "did not belong in the original "
- << "bucket " << source;
+ << "bucket " << source.getBucketId();
LOG(error, "Error remapping %s after split %s",
cmd.getType().toString().c_str(),
ost.str().c_str());
@@ -810,9 +814,9 @@ FileStorHandlerImpl::remapMessage(
} else {
std::ostringstream ost;
assert(targets.size() == 2);
- ost << "Bucket " << source << " was split and "
- << "neither bucket " << targets[0]->bid << " nor "
- << targets[1]->bid << " fit for this operation. "
+ ost << "Bucket " << source.getBucketId() << " was split and "
+ << "neither bucket " << targets[0]->bucket.getBucketId() << " nor "
+ << targets[1]->bucket.getBucketId() << " fit for this operation. "
<< "Failing operation so distributor can create "
<< "bucket on correct node.";
LOG(debug, "%s", ost.str().c_str());
@@ -823,9 +827,9 @@ FileStorHandlerImpl::remapMessage(
}
} else {
LOG(debug, "Remapping %s operation to bucket %s",
- cmd.toString().c_str(), targets[0]->bid.toString().c_str());
- cmd.remapBucketId(targets[0]->bid);
- newBucketId = targets[0]->bid;
+ cmd.toString().c_str(), targets[0]->bucket.getBucketId().toString().c_str());
+ cmd.remapBucketId(targets[0]->bucket.getBucketId());
+ newBucket = targets[0]->bucket;
targetDisk = targets[0]->diskIndex;
#ifdef ENABLE_BUCKET_OPERATION_LOGGING
{
@@ -857,7 +861,7 @@ FileStorHandlerImpl::remapMessage(
{
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op != MOVE) {
std::ostringstream ost;
ost << "Bucket " << (op == SPLIT ? "split" : "joined")
@@ -877,7 +881,7 @@ FileStorHandlerImpl::remapMessage(
{
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else if (op == SPLIT) {
@@ -903,7 +907,7 @@ FileStorHandlerImpl::remapMessage(
// Fail with bucket not found if op != MOVE
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
} else {
@@ -922,7 +926,7 @@ FileStorHandlerImpl::remapMessage(
{
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op == MOVE) {
targetDisk = targets[0]->diskIndex;
}
@@ -933,13 +937,13 @@ FileStorHandlerImpl::remapMessage(
{
const api::InternalCommand& icmd(
static_cast<const api::InternalCommand&>(msg));
- document::BucketId bucket;
+ document::Bucket bucket;
switch(icmd.getType()) {
case RequestStatusPage::ID:
// Ignore
break;
case CreateIteratorCommand::ID:
- bucket = static_cast<CreateIteratorCommand&>(msg).getBucketId();
+ bucket = static_cast<CreateIteratorCommand&>(msg).getBucket();
// Move to correct queue if op == MOVE
// Fail with bucket not found if op != MOVE
if (bucket == source) {
@@ -955,12 +959,12 @@ FileStorHandlerImpl::remapMessage(
}
break;
case GetIterCommand::ID:
- bucket = static_cast<GetIterCommand&>(msg).getBucketId();
+ bucket = static_cast<GetIterCommand&>(msg).getBucket();
//@fallthrough@
case RepairBucketCommand::ID:
- if (bucket.getRawId() == 0) {
+ if (bucket.getBucketId().getRawId() == 0) {
bucket = static_cast<RepairBucketCommand&>(msg)
- .getBucketId();
+ .getBucket();
}
// Move to correct queue if op == MOVE
// Fail with bucket not found if op != MOVE
@@ -982,7 +986,7 @@ FileStorHandlerImpl::remapMessage(
{
api::BucketCommand& cmd(
static_cast<api::BucketCommand&>(msg));
- if (cmd.getBucketId() == source) {
+ if (cmd.getBucket() == source) {
if (op == MOVE) {
returnCode = api::ReturnCode(
api::ReturnCode::INTERNAL_FAILURE,
@@ -1005,7 +1009,7 @@ FileStorHandlerImpl::remapMessage(
{
LOG(debug, "While remapping load for bucket %s for reason %u, "
"we abort read bucket info request for this bucket.",
- source.toString().c_str(), op);
+ source.getBucketId().toString().c_str(), op);
break;
}
case InternalBucketJoinCommand::ID:
@@ -1035,7 +1039,7 @@ FileStorHandlerImpl::remapMessage(
}
} // End of switch
- return newBucketId;
+ return newBucket;
}
void
@@ -1047,13 +1051,13 @@ FileStorHandlerImpl::remapQueueNoLock(
{
BucketIdx& idx(boost::multi_index::get<2>(from.queue));
std::pair<BucketIdx::iterator, BucketIdx::iterator> range(
- idx.equal_range(source.bid));
+ idx.equal_range(source.bucket));
std::vector<MessageEntry> entriesFound;
// Find all the messages for the given bucket.
for (BucketIdx::iterator i = range.first; i != range.second; ++i) {
- assert(i->_bucketId == source.bid);
+ assert(i->_bucket == source.bucket);
entriesFound.push_back(std::move(*i));
}
@@ -1071,14 +1075,14 @@ FileStorHandlerImpl::remapQueueNoLock(
// If not OK, reply to this message with the following message
api::ReturnCode returnCode(api::ReturnCode::OK);
api::StorageMessage& msg(*entry._command);
- assert(entry._bucketId == source.bid);
+ assert(entry._bucket == source.bucket);
- document::BucketId bid = remapMessage(msg,
- source.bid,
- op,
- targets,
- targetDisk,
- returnCode);
+ document::Bucket bucket = remapMessage(msg,
+ source.bucket,
+ op,
+ targets,
+ targetDisk,
+ returnCode);
if (returnCode.getResult() != api::ReturnCode::OK) {
// Fail message if errorcode set
@@ -1094,7 +1098,7 @@ FileStorHandlerImpl::remapQueueNoLock(
_messageSender.sendReply(rep);
}
} else {
- entry._bucketId = bid;
+ entry._bucket = bucket;
// Move to correct disk queue if needed
_diskInfo[targetDisk].queue.emplace_back(std::move(entry));
}
@@ -1115,7 +1119,7 @@ FileStorHandlerImpl::remapQueue(
guard.addLock(from.lock, source.diskIndex);
Disk& to1(_diskInfo[target.diskIndex]);
- if (target.bid.getRawId() != 0) {
+ if (target.bucket.getBucketId().getRawId() != 0) {
guard.addLock(to1.lock, target.diskIndex);
}
@@ -1142,12 +1146,12 @@ FileStorHandlerImpl::remapQueue(
guard.addLock(from.lock, source.diskIndex);
Disk& to1(_diskInfo[target1.diskIndex]);
- if (target1.bid.getRawId() != 0) {
+ if (target1.bucket.getBucketId().getRawId() != 0) {
guard.addLock(to1.lock, target1.diskIndex);
}
Disk& to2(_diskInfo[target2.diskIndex]);
- if (target2.bid.getRawId() != 0) {
+ if (target2.bucket.getBucketId().getRawId() != 0) {
guard.addLock(to2.lock, target2.diskIndex);
}
@@ -1162,7 +1166,7 @@ FileStorHandlerImpl::remapQueue(
void
FileStorHandlerImpl::failOperations(
- const document::BucketId& bucket, uint16_t fromDisk,
+ const document::Bucket &bucket, uint16_t fromDisk,
const api::ReturnCode& err)
{
Disk& from(_diskInfo[fromDisk]);
@@ -1204,10 +1208,10 @@ FileStorHandlerImpl::sendReply(const std::shared_ptr<api::StorageReply>& msg)
}
FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd,
- const document::BucketId& bId)
+ const document::Bucket &bucket)
: _command(cmd),
_timer(),
- _bucketId(bId),
+ _bucket(bucket),
_priority(cmd->getPriority())
{ }
@@ -1215,7 +1219,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::Stora
FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry)
: _command(entry._command),
_timer(entry._timer),
- _bucketId(entry._bucketId),
+ _bucket(entry._bucket),
_priority(entry._priority)
{ }
@@ -1223,7 +1227,7 @@ FileStorHandlerImpl::MessageEntry::MessageEntry(const MessageEntry& entry)
FileStorHandlerImpl::MessageEntry::MessageEntry(MessageEntry && entry) noexcept
: _command(std::move(entry._command)),
_timer(entry._timer),
- _bucketId(entry._bucketId),
+ _bucket(entry._bucket),
_priority(entry._priority)
{ }
@@ -1240,7 +1244,7 @@ FileStorHandlerImpl::Disk::Disk()
FileStorHandlerImpl::Disk::~Disk() { }
bool
-FileStorHandlerImpl::Disk::isLocked(const document::BucketId& bucket) const noexcept
+FileStorHandlerImpl::Disk::isLocked(const document::Bucket& bucket) const noexcept
{
return (lockedBuckets.find(bucket) != lockedBuckets.end());
}
@@ -1262,25 +1266,25 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const
FileStorHandlerImpl::BucketLock::BucketLock(
const vespalib::MonitorGuard & guard,
Disk& disk,
- const document::BucketId& id,
+ const document::Bucket &bucket,
uint8_t priority,
const vespalib::stringref & statusString)
: _disk(disk),
- _id(id)
+ _bucket(bucket)
{
(void) guard;
- if (_id.getRawId() != 0) {
+ if (_bucket.getBucketId().getRawId() != 0) {
// Lock the bucket and wait until it is not the current operation for
// the disk itself.
_disk.lockedBuckets.insert(
- std::make_pair(_id, Disk::LockEntry(priority, statusString)));
+ std::make_pair(_bucket, Disk::LockEntry(priority, statusString)));
LOG(debug,
"Locked bucket %s with priority %u",
- id.toString().c_str(),
+ bucket.getBucketId().toString().c_str(),
priority);
LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _id, "acquired filestor lock", false,
+ _bucket.getBucketId(), "acquired filestor lock", false,
debug::BucketOperationLogger::State::BUCKET_LOCKED);
}
}
@@ -1288,12 +1292,12 @@ FileStorHandlerImpl::BucketLock::BucketLock(
FileStorHandlerImpl::BucketLock::~BucketLock()
{
- if (_id.getRawId() != 0) {
+ if (_bucket.getBucketId().getRawId() != 0) {
vespalib::MonitorGuard lockGuard(_disk.lock);
- _disk.lockedBuckets.erase(_id);
- LOG(debug, "Unlocked bucket %s", _id.toString().c_str());
+ _disk.lockedBuckets.erase(_bucket);
+ LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str());
LOG_BUCKET_OPERATION_SET_LOCK_STATE(
- _id, "released filestor lock", true,
+ _bucket.getBucketId(), "released filestor lock", true,
debug::BucketOperationLogger::State::BUCKET_UNLOCKED);
lockGuard.broadcast();
}
@@ -1312,7 +1316,7 @@ FileStorHandlerImpl::dumpQueue(uint16_t disk) const
it != idx.end();
it++)
{
- ost << it->_bucketId << ": " << it->_command->toString() << " (priority: "
+ ost << it->_bucket.getBucketId() << ": " << it->_command->toString() << " (priority: "
<< (int)it->_command->getPriority() << ")\n";
}
@@ -1339,7 +1343,7 @@ FileStorHandlerImpl::getStatus(std::ostream& out,
out << "<h4>Active operations</h4>\n";
for (const auto& lockedBucket : t.lockedBuckets) {
out << lockedBucket.second.statusString
- << " (" << lockedBucket.first
+ << " (" << lockedBucket.first.getBucketId()
<< ") Running for "
<< (_component.getClock().getTimeInSeconds().getTime()
- lockedBucket.second.timestamp)
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 7d051a3a631..58e93d733fd 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -47,10 +47,10 @@ public:
struct MessageEntry {
std::shared_ptr<api::StorageMessage> _command;
metrics::MetricTimer _timer;
- document::BucketId _bucketId;
+ document::Bucket _bucket;
uint8_t _priority;
- MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::BucketId& bId);
+ MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::Bucket &bId);
MessageEntry(MessageEntry &&) noexcept ;
MessageEntry(const MessageEntry &);
MessageEntry & operator = (const MessageEntry &) = delete;
@@ -66,8 +66,8 @@ public:
typedef boost::multi_index::ordered_non_unique<
boost::multi_index::member<MessageEntry,
- document::BucketId,
- &MessageEntry::_bucketId> > BucketOrder;
+ document::Bucket,
+ &MessageEntry::_bucket> > BucketOrder;
typedef boost::multi_index::multi_index_container<
MessageEntry,
@@ -101,7 +101,7 @@ public:
{ }
};
- typedef vespalib::hash_map<document::BucketId, LockEntry, document::BucketId::hash> LockedBuckets;
+ typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets;
LockedBuckets lockedBuckets;
FileStorDiskMetrics* metrics;
@@ -123,7 +123,7 @@ public:
Disk();
~Disk();
- bool isLocked(const document::BucketId&) const noexcept;
+ bool isLocked(const document::Bucket&) const noexcept;
uint32_t getQueueSize() const noexcept;
private:
std::atomic<DiskState> state;
@@ -131,15 +131,15 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
- BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::BucketId& id, uint8_t priority,
+ BucketLock(const vespalib::MonitorGuard & guard, Disk& disk, const document::Bucket &bucket, uint8_t priority,
const vespalib::stringref & statusString);
~BucketLock();
- const document::BucketId& getBucketId() const override { return _id; }
+ const document::Bucket &getBucket() const override { return _bucket; }
private:
Disk& _disk;
- document::BucketId _id;
+ document::Bucket _bucket;
};
FileStorHandlerImpl(MessageSender&,
@@ -170,7 +170,7 @@ public:
void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op);
- void failOperations(const document::BucketId&, uint16_t fromDisk, const api::ReturnCode&);
+ void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&);
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
@@ -180,7 +180,7 @@ public:
uint32_t getQueueSize(uint16_t disk) const;
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::BucketId&, uint16_t disk);
+ lock(const document::Bucket&, uint16_t disk);
void addMergeStatus(const document::BucketId&, MergeStatus::SP);
MergeStatus& editMergeStatus(const document::BucketId&);
@@ -255,7 +255,7 @@ private:
*/
std::unique_ptr<FileStorHandler::BucketLockInterface>
takeDiskBucketLockOwnership(const vespalib::MonitorGuard & guard,
- Disk& disk, const document::BucketId& id, const api::StorageMessage& msg);
+ Disk& disk, const document::Bucket &bucket, const api::StorageMessage& msg);
/**
* Creates and returns a reply with api::TIMEOUT return code for msg.
@@ -271,12 +271,12 @@ private:
// Update hook
void updateMetrics(const MetricLockGuard &) override;
- document::BucketId remapMessage(api::StorageMessage& msg,
- const document::BucketId& source,
- Operation op,
- std::vector<RemapInfo*>& targets,
- uint16_t& targetDisk,
- api::ReturnCode& returnCode);
+ document::Bucket remapMessage(api::StorageMessage& msg,
+ const document::Bucket &source,
+ Operation op,
+ std::vector<RemapInfo*>& targets,
+ uint16_t& targetDisk,
+ api::ReturnCode& returnCode);
void remapQueueNoLock(Disk& from, const RemapInfo& source, std::vector<RemapInfo*>& targets, Operation op);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index ac3d901fd65..77f6d912306 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -11,7 +11,7 @@
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/storageutil/log.h>
-#include <vespa/storage/common/messagebucketid.h>
+#include <vespa/storage/common/messagebucket.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storageapi/message/batch.h>
#include <vespa/storage/common/bucketoperationlogger.h>
@@ -259,7 +259,7 @@ FileStorManager::handlePersistenceMessage(
msg->getType().getName().c_str(), disk);
LOG_BUCKET_OPERATION_NO_LOCK(
- getStorageMessageBucketId(*msg),
+ getStorageMessageBucketId(*msg).getBucketId(),
vespalib::make_string("Attempting to queue %s to disk %u",
msg->toString().c_str(), disk));
@@ -514,7 +514,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
disk = entry->disk;
entry.remove();
}
- _filestorHandler->failOperations(cmd->getBucketId(), disk,
+ _filestorHandler->failOperations(cmd->getBucket(), disk,
api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
vespalib::make_string("Bucket %s about to be deleted anyway",
cmd->getBucketId().toString().c_str())));
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index d0255405dda..2f1dafbb388 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -545,8 +545,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
LOG(debug, "split(%s -> %s, %s)", cmd.getBucketId().toString().c_str(),
target1.getBucketId().toString().c_str(), target2.getBucketId().toString().c_str());
- PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1.getBucketId()));
- PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2.getBucketId()));
+ PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(target1));
+ PersistenceUtil::LockResult lock2(_env.lockAndGetDisk(target2));
#ifdef ENABLE_BUCKET_OPERATION_LOGGING
{
@@ -590,7 +590,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
_env.getBucketDatabase().get(
target.getBucketId(), "PersistenceThread::handleSplitBucket - Target",
StorBucketDatabase::CREATE_IF_NONEXISTING),
- FileStorHandler::RemapInfo(target.getBucketId(), disk)));
+ FileStorHandler::RemapInfo(target, disk)));
targets.back().first->setBucketInfo(
_env.getBucketInfo(target, disk));
targets.back().first->disk = disk;
@@ -606,7 +606,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
target2.getBucketId().toString().c_str(),
targ2.getMetaCount());
}
- FileStorHandler::RemapInfo source(cmd.getBucketId(), _env._partition);
+ FileStorHandler::RemapInfo source(cmd.getBucket(), _env._partition);
_env._fileStorHandler.remapQueueAfterSplit(
source, targets[0].second, targets[1].second);
bool ownershipChanged(
@@ -615,7 +615,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// Now release all the bucketdb locks.
for (uint32_t i = 0; i < targets.size(); i++) {
if (ownershipChanged) {
- notifyGuard.notifyAlways(document::Bucket(spiBucket.getBucketSpace(), targets[i].second.bid),
+ notifyGuard.notifyAlways(targets[i].second.bucket,
targets[i].first->getBucketInfo());
}
// The entries vector has the source bucket in element zero, so indexing
@@ -630,7 +630,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
// to an empty target bucket, since the provider will have
// implicitly erased it by this point.
spi::Bucket createTarget(
- spi::Bucket(document::Bucket(spiBucket.getBucketSpace(), targets[i].second.bid),
+ spi::Bucket(targets[i].second.bucket,
spi::PartitionId(targets[i].second.diskIndex)));
LOG(debug,
"Split target %s was empty, but re-creating it since "
@@ -640,7 +640,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd)
}
splitReply->getSplitInfo().push_back(
api::SplitBucketReply::Entry(
- targets[i].second.bid,
+ targets[i].second.bucket.getBucketId(),
targets[i].first->getBucketInfo()));
targets[i].first.write();
} else {
@@ -715,8 +715,9 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
entry.write();
}
- document::BucketId firstBucket(cmd.getSourceBuckets()[0]);
- document::BucketId secondBucket(cmd.getSourceBuckets()[1]);
+ document::Bucket destBucket = cmd.getBucket();
+ document::Bucket firstBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[0]);
+ document::Bucket secondBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[1]);
PersistenceUtil::LockResult lock1(_env.lockAndGetDisk(firstBucket));
PersistenceUtil::LockResult lock2;
@@ -729,20 +730,19 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
vespalib::string desc(
vespalib::make_string(
"join(%s, %s -> %s)",
- firstBucket.toString().c_str(),
- secondBucket.toString().c_str(),
+ firstBucket.getBucketId().toString().c_str(),
+ secondBucket.getBucketId().toString().c_str(),
cmd.getBucketId().toString().c_str()));
LOG_BUCKET_OPERATION(cmd.getBucketId(), desc);
- LOG_BUCKET_OPERATION(firstBucket, desc);
+ LOG_BUCKET_OPERATION(firstBucket.getBucketId(), desc);
if (firstBucket != secondBucket) {
- LOG_BUCKET_OPERATION(secondBucket, desc);
+ LOG_BUCKET_OPERATION(secondBucket.getBucketId(), desc);
}
}
#endif
- document::Bucket destBucket = cmd.getBucket();
spi::Result result =
- _spi.join(spi::Bucket(document::Bucket(destBucket.getBucketSpace(), firstBucket), spi::PartitionId(lock1.disk)),
- spi::Bucket(document::Bucket(destBucket.getBucketSpace(), secondBucket), spi::PartitionId(lock2.disk)),
+ _spi.join(spi::Bucket(firstBucket, spi::PartitionId(lock1.disk)),
+ spi::Bucket(secondBucket, spi::PartitionId(lock2.disk)),
spi::Bucket(destBucket, spi::PartitionId(_env._partition)),
_context);
if (!checkForError(result, *tracker)) {
@@ -754,17 +754,17 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd)
}
uint64_t lastModified = 0;
for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) {
- document::BucketId bId = cmd.getSourceBuckets()[i];
+ document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]);
uint16_t disk = (i == 0) ? lock1.disk : lock2.disk;
- FileStorHandler::RemapInfo target(cmd.getBucketId(),
+ FileStorHandler::RemapInfo target(cmd.getBucket(),
_env._partition);
_env._fileStorHandler.remapQueueAfterJoin(
- FileStorHandler::RemapInfo(bId, disk),
+ FileStorHandler::RemapInfo(srcBucket, disk),
target);
// Remove source from bucket db.
StorBucketDatabase::WrappedEntry entry(
_env.getBucketDatabase().get(
- bId, "join-remove-source"));
+ srcBucket.getBucketId(), "join-remove-source"));
if (entry.exist()) {
lastModified = std::max(lastModified,
entry->info.getLastModified());
@@ -1101,7 +1101,7 @@ bool hasBucketInfo(const api::StorageMessage& msg)
void
PersistenceThread::flushAllReplies(
- const document::BucketId& bucketId,
+ const document::Bucket& bucket,
std::vector<std::unique_ptr<MessageTracker> >& replies)
{
if (replies.empty()) {
@@ -1128,14 +1128,14 @@ PersistenceThread::flushAllReplies(
}
}
LOG_BUCKET_OPERATION(
- bucketId,
+ bucket.getBucketId(),
vespalib::make_string(
"flushing %zu operations (%zu puts, %zu removes, "
"%zu other)",
replies.size(), nputs, nremoves, nother));
}
#endif
- spi::Bucket b(document::Bucket(document::BucketSpace::placeHolder(), bucketId), spi::PartitionId(_env._partition));
+ spi::Bucket b(bucket, spi::PartitionId(_env._partition));
spi::Result result = _spi.flush(b, _context);
uint32_t errorCode = _env.convertErrorCode(result);
if (errorCode != 0) {
@@ -1167,7 +1167,7 @@ PersistenceThread::flushAllReplies(
void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
{
std::vector<MessageTracker::UP> trackers;
- document::BucketId bucketId = lock.first->getBucketId();
+ document::Bucket bucket = lock.first->getBucket();
while (lock.second.get() != 0) {
LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p",
@@ -1178,7 +1178,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
// If the next operation wasn't batchable, we should flush
// everything that came before.
if (!batchable) {
- flushAllReplies(bucketId, trackers);
+ flushAllReplies(bucket, trackers);
}
std::unique_ptr<MessageTracker> tracker = processMessage(*msg);
@@ -1189,15 +1189,13 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
if (hasBucketInfo(*msg)) {
if (tracker->getReply()->getResult().success()) {
- document::Bucket bucket = msg->getBucket();
- assert(bucket.getBucketId() == bucketId);
_env.setBucketInfo(*tracker, bucket);
}
}
if (batchable) {
LOG(spam, "Adding reply %s to batch for bucket %s",
tracker->getReply()->toString().c_str(),
- bucketId.toString().c_str());
+ bucket.getBucketId().toString().c_str());
trackers.push_back(std::move(tracker));
@@ -1220,7 +1218,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock)
}
}
- flushAllReplies(bucketId, trackers);
+ flushAllReplies(bucket, trackers);
}
void
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 4d714dc878a..1e844735fff 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -87,7 +87,7 @@ private:
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const;
- void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers);
+ void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers);
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index e28281a5819..25578048f69 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -113,13 +113,13 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket,
}
uint16_t
-PersistenceUtil::getPreferredAvailableDisk(const document::BucketId& id) const
+PersistenceUtil::getPreferredAvailableDisk(const document::Bucket &bucket) const
{
- return _component.getPreferredAvailablePartition(id);
+ return _component.getPreferredAvailablePartition(bucket.getBucketId());
}
PersistenceUtil::LockResult
-PersistenceUtil::lockAndGetDisk(const document::BucketId& bucket,
+PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket,
StorBucketDatabase::Flag flags)
{
// To lock the bucket, we need to ensure that we don't conflict with
@@ -135,7 +135,7 @@ PersistenceUtil::lockAndGetDisk(const document::BucketId& bucket,
_fileStorHandler.lock(bucket, result.disk));
StorBucketDatabase::WrappedEntry entry(getBucketDatabase().get(
- bucket, "join-lockAndGetDisk-1", flags));
+ bucket.getBucketId(), "join-lockAndGetDisk-1", flags));
if (entry.exist() && entry->disk != result.disk) {
result.disk = entry->disk;
continue;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index c8ec6767c4b..49ea5c82ab4 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -91,7 +91,7 @@ struct PersistenceUtil {
void updateBucketDatabase(const document::Bucket &bucket,
const api::BucketInfo& info);
- uint16_t getPreferredAvailableDisk(const document::BucketId& id) const;
+ uint16_t getPreferredAvailableDisk(const document::Bucket &bucket) const;
/** Lock the given bucket in the file stor handler. */
struct LockResult {
@@ -104,7 +104,7 @@ struct PersistenceUtil {
};
LockResult lockAndGetDisk(
- const document::BucketId& bucket,
+ const document::Bucket &bucket,
StorBucketDatabase::Flag flags = StorBucketDatabase::NONE);
api::BucketInfo getBucketInfo(const document::Bucket &bucket, int disk = -1) const;
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index c69a8cc06f7..e981e29e806 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -3,7 +3,7 @@
#include "changedbucketownershiphandler.h"
#include <vespa/storageapi/message/state.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
-#include <vespa/storage/common/messagebucketid.h>
+#include <vespa/storage/common/messagebucket.h>
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -323,8 +323,8 @@ ChangedBucketOwnershipHandler::sendingDistributorOwnsBucketInCurrentState(
}
try {
- document::BucketId opBucket(getStorageMessageBucketId(cmd));
- return (current->ownerOf(opBucket) == cmd.getSourceIndex());
+ document::Bucket opBucket(getStorageMessageBucket(cmd));
+ return (current->ownerOf(opBucket.getBucketId()) == cmd.getSourceIndex());
} catch (vespalib::IllegalArgumentException& e) {
LOG(error,
"Precondition violation: unable to get bucket from "