summaryrefslogtreecommitdiffstats
path: root/searchlib/src/vespa/searchlib/transactionlog/translogclient.h
blob: e219f5cffbc05c739b6353d2d4b1cbe5f4ec301c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once

#include "common.h"
#include <vespa/document/util/bytebuffer.h>
#include <vespa/fnet/frt/frt.h>
#include <map>
#include <vector>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/buffer.h>

namespace search {
namespace transactionlog {

class TransLogClient : private FRT_Invokable
{
private:
    TransLogClient(const TransLogClient &);
    TransLogClient& operator=(const TransLogClient &);

public:
    class Session
    {
    public:
        class Callback {
        public:
            virtual ~Callback() { }
            virtual RPC::Result receive(const Packet & packet) = 0;
            virtual void inSync() { }
            virtual void eof() { }
        };
    public:
        typedef std::unique_ptr<Session> UP;
        typedef std::shared_ptr<Session> SP;

        Session(const vespalib::string & domain, TransLogClient & tlc);
        virtual ~Session();
        /// You can commit data of any registered type to any channel.
        bool commit(const vespalib::ConstBufferRef & packet);
        /// Will erase all entries prior to <to>
        bool erase(const SerialNum & to);
        bool status(SerialNum & b, SerialNum & e, size_t & count);

        bool sync(const SerialNum &syncTo, SerialNum &syncedTo);

        virtual RPC::Result visit(const Packet & ) { return RPC::OK; }
        virtual void inSync() { }
        virtual void eof()    { }
        bool close();
        void clear();
        const vespalib::string & getDomain() const { return _domain; }
        const TransLogClient & getTLC() const { return _tlc; }
    protected:
        bool init(FRT_RPCRequest * req);
        bool run();
        TransLogClient & _tlc;
        vespalib::string _domain;
        int              _sessionId;
    };
    /// Here you connect to the incomming data getting everything from <from>
    class Subscriber : public Session
    {
    public:
        typedef std::unique_ptr<Subscriber> UP;
        typedef std::shared_ptr<Subscriber> SP;

        Subscriber(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
        bool subscribe(const SerialNum & from);
        virtual ~Subscriber();
        virtual RPC::Result visit(const Packet & packet) override { return _callback.receive(packet); }
        virtual void inSync() override { _callback.inSync(); }
        virtual void eof()    override { _callback.eof(); }
    private:
        Callback & _callback;
    };
    /// Here you read the incomming data getting everything from <from>
    class Visitor : public Subscriber
    {
    public:
        typedef std::unique_ptr<Visitor> UP;
        typedef std::shared_ptr<Visitor> SP;

        Visitor(const vespalib::string & domain, TransLogClient & tlc, Callback & callBack);
        bool visit(const SerialNum & from, const SerialNum & to);
        virtual ~Visitor();
    };
public:
    typedef std::unique_ptr<TransLogClient> UP;

    TransLogClient(const vespalib::string & rpctarget);
    virtual ~TransLogClient();

    /// Here you create a new domain
    bool create(const vespalib::string & domain);
    /// Here you remove a domain
    bool remove(const vespalib::string & domain);
    /// Here you open an existing domain
    Session::UP open(const vespalib::string & domain);
    /// Here you can get a list of available domains.
    bool listDomains(std::vector<vespalib::string> & dir);
    /// Here you get a subscriber
    Subscriber::UP createSubscriber(const vespalib::string & domain, Session::Callback & callBack);
    Visitor::UP createVisitor(const vespalib::string & domain, Session::Callback & callBack);

    bool isConnected()               const { return (_target != NULL) && _target->IsValid(); }
    void disconnect();
    bool reconnect();
    const vespalib::string &getRPCTarget() const { return _rpcTarget; }
private:
    void exportRPC(FRT_Supervisor & supervisor);
    void visitCallbackRPC(FRT_RPCRequest *req);
    void syncCallbackRPC(FRT_RPCRequest *req);
    void eofCallbackRPC(FRT_RPCRequest *req);
    int32_t rpc(FRT_RPCRequest * req);
    Session * findSession(const vespalib::string & domain, int sessionId);

    class SessionKey
    {
    public:
        SessionKey(const vespalib::string & domain, int sessionId) : _domain(domain), _sessionId(sessionId) { }
        bool operator < (const SessionKey & b) const { return cmp(b) < 0; }
    private:
        int cmp(const SessionKey & b) const;
        vespalib::string _domain;
        int         _sessionId;
    };

    typedef std::map< SessionKey, Session * > SessionMap;

    vespalib::string _rpcTarget;
    SessionMap       _sessions;
    //Brute force lock for subscriptions. For multithread safety.
    vespalib::Lock   _lock;
    FRT_Supervisor   _supervisor;
    FRT_Target     * _target;
};

}
}