aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
blob: 784bb583fb8d49fe3094a419995e87faf23f78dc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include "bucketownershipnotifier.h"
#include <vespa/storage/common/nodestateupdater.h>
#include <vespa/storage/common/content_bucket_space_repo.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/backtrace.h>

#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".persistence.bucketownershipnotifier");

using document::BucketSpace;

namespace storage {

uint16_t
BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const
{
    try {
        auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution());
        const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
        const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace());
        return (distribution->getIdealDistributorNode(clusterState, bucket.getBucketId()));
        // If we get exceptions there aren't any distributors, so they'll have
        // to explicitly fetch all bucket info eventually anyway.
    } catch (lib::TooFewBucketBitsInUseException& e) {
        LOGBP(debug, "Too few bucket bits used for %s to be assigned to a distributor."
                     " Not notifying any distributor of bucket change.",
                     bucket.toString().c_str());
    } catch (lib::NoDistributorsAvailableException& e) {
        LOGBP(debug, "No distributors available. Not notifying any distributor of bucket change.");
    } catch (const std::exception& e) {
        LOG(error, "Got unknown exception while resolving distributor: %s", e.what());
    }
    return FAILED_TO_RESOLVE;
}

bool
BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket &bucket) const
{
    return (distributor == getOwnerDistributorForBucket(bucket));
}

void
BucketOwnershipNotifier::sendNotifyBucketToDistributor(
        uint16_t distributorIndex,
        const document::Bucket &bucket,
        const api::BucketInfo& infoToSend)
{
    if (!infoToSend.valid()) {
        LOG(error,
            "Trying to send invalid bucket info to distributor %u: %s. %s",
            distributorIndex,
            infoToSend.toString().c_str(),
            vespalib::getStackTrace(0).c_str());
        return;
    }
    auto notifyCmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, infoToSend);

    const auto *cluster_np = _component.cluster_context().cluster_name_ptr();
    notifyCmd->setAddress(api::StorageMessageAddress::create(cluster_np, lib::NodeType::DISTRIBUTOR, distributorIndex));
    notifyCmd->setSourceIndex(_component.getIndex());
    LOG(debug,
        "Sending notify to distributor %u: %s",
        distributorIndex,
        notifyCmd->toString().c_str());
    _sender.sendCommand(notifyCmd);
}

void
BucketOwnershipNotifier::logNotification(const document::Bucket &bucket,
                                         uint16_t sourceIndex,
                                         uint16_t currentOwnerIndex,
                                         const api::BucketInfo& newInfo)
{
    LOG(debug,
        "%s now owned by distributor %u, but reply for operation is scheduled "
        "to go to distributor %u. Sending NotifyBucketChange with %s to ensure "
        "new owner knows bucket exists",
        bucket.getBucketId().toString().c_str(),
        currentOwnerIndex,
        sourceIndex,
        newInfo.toString().c_str());
}

void
BucketOwnershipNotifier::notifyIfOwnershipChanged(
        const document::Bucket &bucket,
        uint16_t sourceIndex,
        const api::BucketInfo& infoToSend)
{
    uint16_t distributor(getOwnerDistributorForBucket(bucket));

    if (distributor == sourceIndex || distributor == FAILED_TO_RESOLVE) {
        return;
    }
    if (sourceIndex == FAILED_TO_RESOLVE) {
        LOG(debug,
            "Got an invalid source index of %u; impossible to know if "
            "bucket ownership has changed. %s",
            sourceIndex,
            vespalib::getStackTrace(0).c_str());
        return;
    }
    logNotification(bucket, sourceIndex, distributor, infoToSend);
    sendNotifyBucketToDistributor(distributor, bucket, infoToSend);
}

void
BucketOwnershipNotifier::sendNotifyBucketToCurrentOwner(
        const document::Bucket &bucket,
        const api::BucketInfo& infoToSend)
{
    uint16_t distributor(getOwnerDistributorForBucket(bucket));
    if (distributor == FAILED_TO_RESOLVE) {
        return;
    }
    sendNotifyBucketToDistributor(distributor, bucket, infoToSend);
}

NotificationGuard::~NotificationGuard()
{
    for (uint32_t i = 0; i < _bucketsToCheck.size(); ++i) {
        const BucketToCheck& b(_bucketsToCheck[i]);
        if (b.alwaysSend) {
            _notifier.sendNotifyBucketToCurrentOwner(b.bucket, b.info);
        } else {
            _notifier.notifyIfOwnershipChanged(b.bucket, b.sourceIndex, b.info);
        }
    }
}

void
NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket,
                                            uint16_t sourceIndex,
                                            const api::BucketInfo& infoToSend)
{
    _bucketsToCheck.push_back(BucketToCheck(bucket, sourceIndex, infoToSend));
}

void
NotificationGuard::notifyAlways(const document::Bucket &bucket,
                                const api::BucketInfo& infoToSend)
{
    BucketToCheck bc(bucket, 0xffff, infoToSend);
    bc.alwaysSend = true;
    _bucketsToCheck.push_back(bc);
}

} // storage