1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
#include <vespa/storageapi/defs.h>
#include <vespa/storage/distributor/operations/operation.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/storageframework/generic/clock/timer.h>
namespace document { class Document; }
namespace storage {
namespace api { class GetCommand; }
class PersistenceOperationMetricSet;
namespace distributor {
class DistributorComponent;
class DistributorBucketSpace;
class GetOperation : public Operation
{
public:
GetOperation(DistributorComponent& manager,
const DistributorBucketSpace &bucketSpace,
std::shared_ptr<BucketDatabase::ReadGuard> read_guard,
std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric);
void onClose(DistributorMessageSender& sender) override;
void onStart(DistributorMessageSender& sender) override;
void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) override;
const char* getName() const override { return "get"; }
std::string getStatus() const override { return ""; }
bool hasConsistentCopies() const;
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }
private:
class GroupId {
public:
// Node should be set only if bucket is incomplete
GroupId(const document::BucketId& id, uint32_t checksum, int node);
bool operator<(const GroupId& other) const;
bool operator==(const GroupId& other) const;
const document::BucketId& getBucketId() const { return _id; }
int getNode() const { return _node; }
private:
document::BucketId _id;
uint32_t _checksum;
int _node;
};
class BucketChecksumGroup {
public:
BucketChecksumGroup(const BucketCopy& c) :
copy(c),
sent(0), received(false), returnCode(api::ReturnCode::OK)
{}
BucketCopy copy;
api::StorageMessage::Id sent;
bool received;
api::ReturnCode returnCode;
};
typedef std::vector<BucketChecksumGroup> GroupVector;
// Organize the different copies by bucket/checksum pairs. We should
// try to request GETs from each bucket and each different checksum
// within that bucket.
std::map<GroupId, GroupVector> _responses;
DistributorComponent& _manager;
const DistributorBucketSpace &_bucketSpace;
std::shared_ptr<api::GetCommand> _msg;
api::ReturnCode _returnCode;
std::shared_ptr<document::Document> _doc;
api::Timestamp _lastModified;
PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
void sendReply(DistributorMessageSender& sender);
bool sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res);
void assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard);
bool copyIsOnLocalNode(const BucketCopy&) const;
/**
* Returns the vector index of the target to send to, or -1 if none
* could be found (i.e. all targets have already been sent to).
*/
int findBestUnsentTarget(const GroupVector& candidates) const;
void update_internal_metrics();
};
}
}
|