aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/common/storagelinkqueued.h
blob: 3f7a831d9fe969338f0958699dc99d9639c31bce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
 * Storage link implementing a separate thread for dispatching replies.
 * Using this class you can use dispatchReply instead of sendReply to have the
 * replies sent through another thread.
 */

#pragma once

#include "storagelink.h"
#include <vespa/storageframework/generic/thread/runnable.h>
#include <condition_variable>
#include <deque>
#include <limits>
#include <mutex>

namespace storage {

namespace framework {
    struct ComponentRegister;
    class Component;
    class Thread;
}

class StorageLinkQueued : public StorageLink {
public:
    StorageLinkQueued(const std::string& name, framework::ComponentRegister& cr);
    ~StorageLinkQueued() override;

    /**
     * Add message to internal queue, to be dispatched downstream
     * in separate thread.
     */
    void dispatchUp(const std::shared_ptr<api::StorageMessage>&);

    /** Remember to call this method if you override it. */
    void onClose() override {
        _closeState |= 1;
    }

    /** Remember to call this method if you override it. */
    void onFlush(bool downwards) override {
        if (downwards) {
            _closeState |= 2;
        } else {
            _replyDispatcher.flush();
            _closeState |= 4;
        }
    }

    void logError(const char* error);
    void logDebug(const char* error);

    framework::ComponentRegister& getComponentRegister() { return _compReg; }

private:
    template<typename Message>
    class Dispatcher : public framework::Runnable
    {
    protected:
        StorageLinkQueued&                    _parent;
        unsigned int                          _maxQueueSize;
        std::mutex                            _sync;
        std::condition_variable               _syncCond;
        std::deque<std::shared_ptr<Message>>  _messages;
        bool                                  _replyDispatcher;
        std::unique_ptr<framework::Component> _component;
        std::unique_ptr<framework::Thread>    _thread;

        void shutdown();

    public:
        Dispatcher(StorageLinkQueued& parent, unsigned int maxQueueSize, bool replyDispatcher);

        ~Dispatcher() override;

        void start();
        void run(framework::ThreadHandle&) override;

        void add(const std::shared_ptr<Message>&);
        void flush();

        virtual void send(const std::shared_ptr<Message> & ) = 0;
    };

    class ReplyDispatcher : public Dispatcher<api::StorageMessage> {
    public:
        explicit ReplyDispatcher(StorageLinkQueued& parent)
            : Dispatcher<api::StorageMessage>(
                    parent, std::numeric_limits<unsigned int>::max(), true)
        {
        }
        void send(const std::shared_ptr<api::StorageMessage> & reply) override {
            _parent.sendUp(reply);
        }
    };

    framework::ComponentRegister& _compReg;
    ReplyDispatcher               _replyDispatcher;
    uint16_t                      _closeState;
};

}