aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
blob: 647812f2bde4ebe4709dfba3a5d1f4e5ef4d9152 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "splitoperation.h"
#include <vespa/storage/distributor/idealstatemanager.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>

#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".distributor.operation.idealstate.split");

using namespace storage::distributor;

SplitOperation::SplitOperation(const ClusterContext& cluster_ctx, const BucketAndNodes& nodes,
                               uint32_t maxBits, uint32_t splitCount, uint32_t splitSize)
    : IdealStateOperation(nodes),
      _tracker(cluster_ctx),
      _maxBits(maxBits),
      _splitCount(splitCount),
      _splitSize(splitSize)
{}

SplitOperation::~SplitOperation() = default;

void
SplitOperation::onStart(DistributorStripeMessageSender& sender)
{
    _ok = false;

    BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());

    for (uint32_t i = 0; i < entry->getNodeCount(); i++) {
        auto msg = std::make_shared<api::SplitBucketCommand>(getBucket());
        msg->setMaxSplitBits(_maxBits);
        msg->setMinDocCount(_splitCount);
        msg->setMinByteSize(_splitSize);
        msg->setTimeout(MAX_TIMEOUT);
        setCommandMeta(*msg);
        _tracker.queueCommand(std::move(msg), entry->getNodeRef(i).getNode());
        _ok = true;
    }

    if (!_ok) {
        LOGBP(debug, "Unable to split bucket %s, since no copies are available (some in maintenance?)", getBucketId().toString().c_str());
        done();
    } else {
        _tracker.flushQueue(sender);
    }
}

void
SplitOperation::onReceive(DistributorStripeMessageSender&, const api::StorageReply::SP& msg)
{
    auto& rep = dynamic_cast<api::SplitBucketReply&>(*msg);

    uint16_t node = _tracker.handleReply(rep);

    if (node == 0xffff) {
        LOG(debug, "Ignored reply since node was max uint16_t for unknown reasons");
        return;
    }

    std::ostringstream ost;

    if (_cancel_scope.node_is_cancelled(node)) {
        _ok = false;
    } else if (rep.getResult().success()) {
        BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(rep.getBucketId());

        if (entry.valid()) {
            entry->removeNode(node);

            if (entry->getNodeCount() == 0) {
                LOG(spam, "Removing split bucket %s", getBucketId().toString().c_str());
                _bucketSpace->getBucketDatabase().remove(rep.getBucketId());
            } else {
                _bucketSpace->getBucketDatabase().update(entry);
            }

            ost << getBucketId() << " => ";
        }

        // Add new buckets.
        for (const auto & sinfo : rep.getSplitInfo()) {
            if (!sinfo.second.valid()) {
                LOG(error, "Received invalid bucket %s from node %d as reply to split bucket",
                    sinfo.first.toString().c_str(), node);
            }

            ost << sinfo.first << ",";

            BucketCopy copy(_manager->operation_context().generate_unique_timestamp(), node, sinfo.second);

            // Must reset trusted since otherwise trustedness of inconsistent
            // copies would be arbitrarily determined by which copy managed
            // to finish its split first.
            _manager->operation_context().update_bucket_database(
                    document::Bucket(msg->getBucket().getBucketSpace(), sinfo.first), copy,
                    (DatabaseUpdate::CREATE_IF_NONEXISTING | DatabaseUpdate::RESET_TRUSTED));

        }
    } else if (
            rep.getResult().getResult() == api::ReturnCode::BUCKET_NOT_FOUND
            && _bucketSpace->getBucketDatabase().get(rep.getBucketId())->getNode(node) != nullptr)
    {
        _manager->operation_context().recheck_bucket_info(node, getBucket());
        LOGBP(debug, "Split failed for %s: bucket not found. Storage and distributor bucket databases might be out of sync: %s",
              getBucketId().toString().c_str(), vespalib::string(rep.getResult().getMessage()).c_str());
        _ok = false;
    } else if (rep.getResult().isBusy()) {
        LOG(debug, "Split failed for %s, node was busy. Will retry later", getBucketId().toString().c_str());
        _ok = false;
    } else if (rep.getResult().isCriticalForMaintenance()) {
        LOGBP(warning, "Split failed for %s: %s with error '%s'",
              getBucketId().toString().c_str(), msg->toString().c_str(),
              msg->getResult().toString().c_str());
        _ok = false;
    } else {
        LOG(debug, "Split failed for %s with non-critical failure: %s",
            getBucketId().toString().c_str(),
            rep.getResult().toString().c_str());
    }

    if (_tracker.finished()) {
        LOG(debug, "Split done on node %d: %s completed operation", node, ost.str().c_str());
        done();
    } else {
        LOG(debug, "Split done on node %d: %s still pending on other nodes", node, ost.str().c_str());
    }
}

bool
SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer& op_seq) const
{
    return checkBlockForAllNodes(getBucket(), ctx, op_seq);
}

bool
SplitOperation::shouldBlockThisOperation(uint32_t msgType,
                                         [[maybe_unused]] uint16_t node,
                                         uint8_t pri) const
{
    if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) {
        return true;
    }
    if (msgType == api::MessageType::JOINBUCKETS_ID) {
        return true;
    }

    return false;
}