aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
blob: 0a916d208adb6e23c1ea8c5233f2085cebfd6a86 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "removebucketoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>

#include <vespa/log/log.h>

LOG_SETUP(".distributor.operation.idealstate.remove");

namespace storage::distributor {

RemoveBucketOperation::~RemoveBucketOperation() = default;

bool
RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender)
{
    std::vector<std::pair<uint16_t, std::shared_ptr<api::DeleteBucketCommand> > > msgs;

    BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());

    for (uint16_t node : getNodes()) {
        const BucketCopy* copy(entry->getNode(node));
        if (!copy) {
            LOG(debug, "Node %u was removed between scheduling remove operation and starting it; not sending DeleteBucket to it", node);
            continue;
        }
        LOG(debug, "Sending DeleteBucket for %s to node %u", getBucketId().toString().c_str(), node);
        auto msg = std::make_shared<api::DeleteBucketCommand>(getBucket());
        setCommandMeta(*msg);
        msg->setBucketInfo(copy->getBucketInfo());
        msgs.emplace_back(node, msg);
    }

    _ok = true;
    if (!getNodes().empty()) {
        _manager->operation_context().remove_nodes_from_bucket_database(getBucket(), getNodes());
        for (auto & msg : msgs) {
            _tracker.queueCommand(std::move(msg.second), msg.first);
        }
        _tracker.flushQueue(sender);
    }

    return _tracker.finished();
}


void
RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
{
    if (onStartInternal(sender)) {
        done();
    }
}

bool
RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg)
{
    auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());

    const uint16_t node = _tracker.handleReply(*rep);

    LOG(debug, "Got DeleteBucket reply for %s from node %u", getBucketId().toString().c_str(), node);

    if (_cancel_scope.node_is_cancelled(node)) {
        LOG(debug, "DeleteBucket operation for %s has been cancelled", getBucketId().toString().c_str());
        _ok = false;
    } else if (rep->getResult().failed()) {
        if (rep->getResult().getResult() == api::ReturnCode::REJECTED
            && rep->getBucketInfo().valid())
        {
            LOG(debug, "Got DeleteBucket rejection reply from storage for "
                "%s on node %u: %s. Reinserting node into bucket db with %s",
                getBucketId().toString().c_str(),
                node,
                vespalib::string(rep->getResult().getMessage()).c_str(),
                rep->getBucketInfo().toString().c_str());

            _manager->operation_context().update_bucket_database(
                    getBucket(),
                    BucketCopy(_manager->operation_context().generate_unique_timestamp(),
                               node,
                               rep->getBucketInfo()),
                    DatabaseUpdate::CREATE_IF_NONEXISTING);
        } else {
            LOG(info,
                "Remove operation on bucket %s failed. This distributor "
                "has already removed the bucket from the bucket database, "
                "so it is not possible to retry this operation. Failure code: %s",
                getBucketId().toString().c_str(),
                rep->getResult().toString().c_str());
        }

        _ok = false;
    }

    return _tracker.finished();
}


void
RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::shared_ptr<api::StorageReply> &msg)
{
    if (onReceiveInternal(msg)) {
        done();
    }
}

bool
RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const
{
    // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded
    // number in the worst case), so a simple linear scan suffices.
    for (uint16_t node : getNodes()) {
        if (target_node == node) {
            return true;
        }
    }
    return false;
}

} // storage::distributor