1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fastos/fastos.h>
#include <algorithm>
#include <vespa/fnet/frt/frt.h>
#include "oosmanager.h"
#include "rpcnetwork.h"
namespace mbus {
OOSClient::SP
OOSManager::getClient(const string &spec)
{
for (uint32_t i = 0; i < _clients.size(); ++i) {
if (_clients[i]->getSpec() == spec) {
return _clients[i];
}
}
return OOSClient::SP(new OOSClient(_orb, spec));
}
void
OOSManager::PerformTask()
{
bool changed = false;
if (_slobrokGen != _mirror.updates()) {
_slobrokGen = _mirror.updates();
SpecList newServices = _mirror.lookup(_servicePattern);
std::sort(newServices.begin(), newServices.end());
if (newServices != _services) {
ClientList newClients;
for (uint32_t i = 0; i < newServices.size(); ++i) {
newClients.push_back(getClient(newServices[i].second));
}
_services.swap(newServices);
_clients.swap(newClients);
changed = true;
}
}
bool allOk = _mirror.ready();
for (uint32_t i = 0; i < _clients.size(); ++i) {
if (_clients[i]->isChanged()) {
changed = true;
}
if (!_clients[i]->isReady()) {
allOk = false;
}
}
if (changed) {
OOSSet oos(new StringSet());
for (uint32_t i = 0; i < _clients.size(); ++i) {
_clients[i]->dumpState(*oos);
}
vespalib::LockGuard guard(_lock);
_oosSet.swap(oos);
}
if (allOk && !_ready) {
_ready = true;
}
Schedule(_ready ? 1.0 : 0.1);
}
OOSManager::OOSManager(FRT_Supervisor &orb,
slobrok::api::MirrorAPI &mirror,
const string &servicePattern)
: FNET_Task(orb.GetScheduler()),
_orb(orb),
_mirror(mirror),
_disabled(servicePattern.empty()),
_ready(_disabled),
_lock("mbus::OOSManager::_lock", false),
_servicePattern(servicePattern),
_slobrokGen(0),
_clients(),
_oosSet()
{
if (!_disabled) {
ScheduleNow();
}
}
OOSManager::~OOSManager()
{
Kill();
}
bool
OOSManager::isOOS(const string &service)
{
if (_disabled) {
return false;
}
vespalib::LockGuard guard(_lock);
if (_oosSet.get() == NULL) {
return false;
}
if (_oosSet->find(service) == _oosSet->end()) {
return false;
}
return true;
}
} // namespace mbus
|