aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/visiting/visitorthread.h
blob: 4463a62fdd935a012e31b3d0e291a08487369cdd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * @class VisitorThread
 * @ingroup visiting
 *
 * @brief Thread running visitors.
 *
 * This thread ensures that everything concerning one visitor runs in a
 * single thread. This simplifies the visitors as they don't have to
 * worry about locking, and it is a lot easier to abort visitors when you
 * know other threads isn't using the visitors.
 */

#pragma once

#include "visitor.h"
#include "visitormetrics.h"
#include "visitormessagesessionfactory.h"
#include <vespa/storage/persistence/messages.h>
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/thread/runnable.h>
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/metrics/metrictimer.h>
#include <atomic>
#include <deque>

namespace storage {

namespace framework { class HttpUrlPath; }

class VisitorThread : public framework::Runnable,
                      private api::MessageHandler,
                      private framework::MetricUpdateHook
{
    using LibMap = std::map<std::string, std::shared_ptr<VisitorEnvironment>>;
    LibMap _libs;

    using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>;
    VisitorMap _visitors;
    std::deque<std::pair<api::VisitorId, vespalib::steady_time>> _recentlyCompleted;

    struct Event {
        enum class Type {
            MBUS,
            PERSISTENCE,
            NONE
        };

        api::VisitorId _visitorId;
        std::shared_ptr<api::StorageMessage> _message;
        mbus::Reply::UP _mbusReply;
        metrics::MetricTimer _timer;
        Type _type;

        Event() noexcept : _visitorId(0), _message(), _timer(), _type(Type::NONE) {}
        Event(Event&& other) noexcept;
        Event& operator= (Event&& other) noexcept;
        Event(const Event& other) = delete;
        Event& operator= (const Event& other) = delete;
        Event(api::VisitorId visitor, mbus::Reply::UP reply);
        Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg);
        ~Event();

        [[nodiscard]] bool empty() const noexcept {
            return (_type == Type::NONE);
        }
    };

    std::deque<Event>       _queue;
    std::mutex              _lock;
    std::condition_variable _cond;

    VisitorMap::iterator _currentlyRunningVisitor;
    VisitorMessageHandler& _messageSender;
    VisitorThreadMetrics& _metrics;
    uint32_t _threadIndex;
    uint32_t _disconnectedVisitorTimeout;
    uint32_t _ignoreNonExistingVisitorTimeLimit;
    uint32_t _defaultParallelIterators;
    uint32_t _iteratorsPerBucket;
    uint32_t _defaultPendingMessages;
    uint32_t _defaultDocBlockSize;
    uint32_t _visitorMemoryUsageLimit;
    vespalib::duration _defaultDocBlockTimeout;
    vespalib::duration _defaultVisitorInfoTimeout;
    std::atomic<uint32_t> _timeBetweenTicks;
    StorageComponent _component;
    std::unique_ptr<framework::Thread> _thread;
    VisitorMessageSessionFactory& _messageSessionFactory;
    VisitorFactory::Map& _visitorFactories;

public:
    VisitorThread(uint32_t threadIndex,
                  StorageComponentRegister&,
                  VisitorMessageSessionFactory&,
                  VisitorFactory::Map&,
                  VisitorThreadMetrics& metrics,
                  VisitorMessageHandler& sender);
    ~VisitorThread() override;

    void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg);
    void shutdown();
    void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); }
    void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor);

    const VisitorThreadMetrics& getMetrics() const noexcept {
        return _metrics;
    }

private:
    void run(framework::ThreadHandle&) override;
    /**
     * Attempt to fetch an event from the visitor thread's queue. If an event
     * was available, pop it from the queue and return it. If not, return
     * an empty event. This may be checked with the .empty() method on
     * the returned event object.
     */
    Event popNextQueuedEventIfAvailable();
    void tick();
    void trimRecentlyCompletedList(vespalib::steady_time currentTime);
    void handleNonExistingVisitorCall(const Event& entry, api::ReturnCode& code);

    std::shared_ptr<Visitor> createVisitor(vespalib::stringref libName,
                                           const vdslib::Parameters& params,
                                           vespalib::asciistream & error);

    bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;
    bool onInternal(const std::shared_ptr<api::InternalCommand>&) override;
    bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override;

    /** Deletes a visitor instance. */
    void close();
    void getStatus(vespalib::asciistream & out, const framework::HttpUrlPath& path) const;
    void updateMetrics(const MetricLockGuard &) override;

};

} // storage