aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/network/oosmanager.cpp
blob: 97d2fc9e9600e08a6aaf1043eb2bb4ede2b16b93 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fastos/fastos.h>
#include <algorithm>
#include <vespa/fnet/frt/frt.h>
#include "oosmanager.h"
#include "rpcnetwork.h"

namespace mbus {

OOSClient::SP
OOSManager::getClient(const string &spec)
{
    for (uint32_t i = 0; i < _clients.size(); ++i) {
        if (_clients[i]->getSpec() == spec) {
            return _clients[i];
        }
    }
    return OOSClient::SP(new OOSClient(_orb, spec));
}

void
OOSManager::PerformTask()
{
    bool changed = false;
    if (_slobrokGen != _mirror.updates()) {
        _slobrokGen = _mirror.updates();
        SpecList newServices = _mirror.lookup(_servicePattern);
        std::sort(newServices.begin(), newServices.end());
        if (newServices != _services) {
            ClientList newClients;
            for (uint32_t i = 0; i < newServices.size(); ++i) {
                newClients.push_back(getClient(newServices[i].second));
            }
            _services.swap(newServices);
            _clients.swap(newClients);
            changed = true;
        }
    }
    bool allOk = _mirror.ready();
    for (uint32_t i = 0; i < _clients.size(); ++i) {
        if (_clients[i]->isChanged()) {
            changed = true;
        }
        if (!_clients[i]->isReady()) {
            allOk = false;
        }
    }
    if (changed) {
        OOSSet oos(new StringSet());
        for (uint32_t i = 0; i < _clients.size(); ++i) {
            _clients[i]->dumpState(*oos);
        }
        vespalib::LockGuard guard(_lock);
        _oosSet.swap(oos);
    }
    if (allOk && !_ready) {
        _ready = true;
    }
    Schedule(_ready ? 1.0 : 0.1);
}

OOSManager::OOSManager(FRT_Supervisor &orb,
                       slobrok::api::MirrorAPI &mirror,
                       const string &servicePattern)
    : FNET_Task(orb.GetScheduler()),
      _orb(orb),
      _mirror(mirror),
      _disabled(servicePattern.empty()),
      _ready(_disabled),
      _lock("mbus::OOSManager::_lock", false),
      _servicePattern(servicePattern),
      _slobrokGen(0),
      _clients(),
      _oosSet()
{
    if (!_disabled) {
        ScheduleNow();
    }
}

OOSManager::~OOSManager()
{
    Kill();
}

bool
OOSManager::isOOS(const string &service)
{
    if (_disabled) {
        return false;
    }
    vespalib::LockGuard guard(_lock);
    if (_oosSet.get() == NULL) {
        return false;
    }
    if (_oosSet->find(service) == _oosSet->end()) {
        return false;
    }
    return true;
}

} // namespace mbus