aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
blob: 07112add6e365a118ce2c84ba4d9bf9700fcab55 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "removelocationoperation.h"
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/document/bucket/bucketselector.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/select/parser.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vdslib/state/clusterstate.h>

#include <vespa/log/log.h>
LOG_SETUP(".distributor.operations.external.remove_location");

using document::BucketSpace;

namespace storage::distributor {

RemoveLocationOperation::RemoveLocationOperation(
        const DistributorNodeContext& node_ctx,
        DistributorStripeOperationContext& op_ctx,
        const DocumentSelectionParser& parser,
        DistributorBucketSpace &bucketSpace,
        std::shared_ptr<api::RemoveLocationCommand> msg,
        PersistenceOperationMetricSet& metric)
    : Operation(),
      _tracker(metric, std::make_shared<api::RemoveLocationReply>(*msg), node_ctx, op_ctx, _cancel_scope),
      _msg(std::move(msg)),
      _node_ctx(node_ctx),
      _parser(parser),
      _bucketSpace(bucketSpace)
{}

RemoveLocationOperation::~RemoveLocationOperation() = default;

int
RemoveLocationOperation::getBucketId(
        const DistributorNodeContext& node_ctx,
        const DocumentSelectionParser& parser,
        const api::RemoveLocationCommand& cmd, document::BucketId& bid)
{
    document::BucketSelector bucketSel(node_ctx.bucket_id_factory());
    std::unique_ptr<document::BucketSelector::BucketVector> exprResult
        = bucketSel.select(*parser.parse_selection(cmd.getDocumentSelection()));

    if (!exprResult.get()) {
        return 0;
    } else if (exprResult->size() != 1) {
        return exprResult->size();
    } else {
        bid = (*exprResult)[0];
        return 1;
    }
}

void
RemoveLocationOperation::onStart(DistributorStripeMessageSender& sender)
{
    document::BucketId bid;
    int count = getBucketId(_node_ctx, _parser, *_msg, bid);

    if (count != 1) {
        _tracker.fail(sender,
                      api::ReturnCode(api::ReturnCode::ILLEGAL_PARAMETERS,
                                      "Document selection could not be mapped to a single location"));
    }

    std::vector<BucketDatabase::Entry> entries;
    _bucketSpace.getBucketDatabase().getAll(bid, entries);

    bool sent = false;
    for (uint32_t j = 0; j < entries.size(); ++j) {
        const BucketDatabase::Entry& e = entries[j];

        std::vector<uint16_t> nodes = e->getNodes();

        for (uint32_t i = 0; i < nodes.size(); i++) {
            auto command = std::make_shared<api::RemoveLocationCommand>(_msg->getDocumentSelection(),
                                                                        document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()));

            copyMessageSettings(*_msg, *command);
            _tracker.queueCommand(std::move(command), nodes[i]);
            sent = true;
        }
    }

    if (!sent) {
        LOG(debug,
            "Remove location %s failed since no available nodes found. "
            "System state is %s",
            _msg->toString().c_str(),
            _bucketSpace.getClusterState().toString().c_str());

        _tracker.fail(sender, api::ReturnCode(api::ReturnCode::OK));
    } else {
        _tracker.flushQueue(sender);
    }
};


void
RemoveLocationOperation::onReceive(
        DistributorStripeMessageSender& sender,
        const std::shared_ptr<api::StorageReply> & msg)
{
    _tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg));
}

void
RemoveLocationOperation::onClose(DistributorStripeMessageSender& sender)
{
    _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED,
                                          "Process is shutting down"));
}

}