aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/server/feedstates.h
blob: 865d53b4799a33b90a8d4ca2c5af3708c4fd876b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include "feedhandler.h"
#include "feedstate.h"
#include "packetwrapper.h"
#include "ireplaypackethandler.h"
#include <vespa/searchcore/proton/common/commit_time_tracker.h>

namespace proton {

/**
 * There are 3 feed states, and one paths through the state machine:
 *
 *  INIT -> REPLAY_TRANSACTION_LOG -> NORMAL.
 */


/**
 * The feed handler owner is initializing components.
 */
class InitState : public FeedState {
    vespalib::string _doc_type_name;

public:
    InitState(const vespalib::string &name) noexcept
        : FeedState(INIT),
          _doc_type_name(name)
    {
    }

    void handleOperation(FeedToken, FeedOperationUP op) override {
        throwExceptionInHandleOperation(_doc_type_name, *op);
    }

    void receive(const PacketWrapperSP &wrap, vespalib::Executor &) override {
        throwExceptionInReceive(_doc_type_name.c_str(), wrap->packet.range().from(),
                                wrap->packet.range().to(), wrap->packet.size());
    }
};


/**
 * The feed handler is replaying the transaction log.
 * Replayed messages from the transaction log are sent to the active feed view.
 */
class ReplayTransactionLogState : public FeedState {
    vespalib::string _doc_type_name;
    std::unique_ptr<IReplayPacketHandler> _packet_handler;

public:
    ReplayTransactionLogState(const vespalib::string &name,
            IFeedView *& feed_view_ptr,
            bucketdb::IBucketDBHandler &bucketDBHandler,
            IReplayConfig &replay_config,
            FeedConfigStore &config_store,
            const ReplayThrottlingPolicy &replay_throttling_policy,
            IIncSerialNum &inc_serial_num);

    ~ReplayTransactionLogState() override;
    void handleOperation(FeedToken, FeedOperationUP op) override {
        throwExceptionInHandleOperation(_doc_type_name, *op);
    }

    void receive(const PacketWrapperSP &wrap, vespalib::Executor &executor) override;
};


/**
 * Normal feed state.
 * Incoming messages are sent to the active feed view.
 */
class NormalState : public FeedState {
    FeedHandler         &_handler;

public:
    NormalState(FeedHandler &handler) noexcept
        : FeedState(NORMAL),
          _handler(handler) {
    }

    void handleOperation(FeedToken token, FeedOperationUP op) override {
        _handler.performOperation(std::move(token), std::move(op));
    }

    void receive(const PacketWrapperSP &wrap, vespalib::Executor &) override {
        throwExceptionInReceive(_handler.getDocTypeName().c_str(), wrap->packet.range().from(),
                                wrap->packet.range().to(), wrap->packet.size());
    }
};
}  // namespace proton