1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "removebucketoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
LOG_SETUP(".distributor.operation.idealstate.remove");
namespace storage::distributor {
RemoveBucketOperation::~RemoveBucketOperation() = default;
bool
RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender)
{
std::vector<std::pair<uint16_t, std::shared_ptr<api::DeleteBucketCommand> > > msgs;
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
for (uint16_t node : getNodes()) {
const BucketCopy* copy(entry->getNode(node));
if (!copy) {
LOG(debug, "Node %u was removed between scheduling remove operation and starting it; not sending DeleteBucket to it", node);
continue;
}
LOG(debug, "Sending DeleteBucket for %s to node %u", getBucketId().toString().c_str(), node);
auto msg = std::make_shared<api::DeleteBucketCommand>(getBucket());
setCommandMeta(*msg);
msg->setBucketInfo(copy->getBucketInfo());
msgs.emplace_back(node, msg);
}
_ok = true;
if (!getNodes().empty()) {
_manager->operation_context().remove_nodes_from_bucket_database(getBucket(), getNodes());
for (auto & msg : msgs) {
_tracker.queueCommand(std::move(msg.second), msg.first);
}
_tracker.flushQueue(sender);
}
return _tracker.finished();
}
void
RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
{
if (onStartInternal(sender)) {
done();
}
}
bool
RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg)
{
auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
const uint16_t node = _tracker.handleReply(*rep);
LOG(debug, "Got DeleteBucket reply for %s from node %u", getBucketId().toString().c_str(), node);
if (_cancel_scope.node_is_cancelled(node)) {
LOG(debug, "DeleteBucket operation for %s has been cancelled", getBucketId().toString().c_str());
_ok = false;
} else if (rep->getResult().failed()) {
if (rep->getResult().getResult() == api::ReturnCode::REJECTED
&& rep->getBucketInfo().valid())
{
LOG(debug, "Got DeleteBucket rejection reply from storage for "
"%s on node %u: %s. Reinserting node into bucket db with %s",
getBucketId().toString().c_str(),
node,
vespalib::string(rep->getResult().getMessage()).c_str(),
rep->getBucketInfo().toString().c_str());
_manager->operation_context().update_bucket_database(
getBucket(),
BucketCopy(_manager->operation_context().generate_unique_timestamp(),
node,
rep->getBucketInfo()),
DatabaseUpdate::CREATE_IF_NONEXISTING);
} else {
LOG(info,
"Remove operation on bucket %s failed. This distributor "
"has already removed the bucket from the bucket database, "
"so it is not possible to retry this operation. Failure code: %s",
getBucketId().toString().c_str(),
rep->getResult().toString().c_str());
}
_ok = false;
}
return _tracker.finished();
}
void
RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply> &msg)
{
if (onReceiveInternal(msg)) {
done();
}
}
bool
RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const
{
// Number of nodes is expected to be 1 in the vastly common case (and a highly bounded
// number in the worst case), so a simple linear scan suffices.
for (uint16_t node : getNodes()) {
if (target_node == node) {
return true;
}
}
return false;
}
} // storage::distributor
|