summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/testvisitormessagesession.h
blob: 193b6be133f3f953f99d12704fca913785d4d99b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#pragma once

#include <vespa/storage/visiting/visitormessagesession.h>
#include <vespa/storage/visiting/visitorthread.h>
#include <vespa/documentapi/messagebus/messages/documentmessage.h>
#include <vespa/storage/storageserver/priorityconverter.h>
#include <vespa/config/subscription/configuri.h>
#include <deque>

namespace storage {

class TestVisitorMessageSession : public VisitorMessageSession
{
private:
    vespalib::Monitor _waitMonitor;
    mbus::Error _autoReplyError;
    bool _autoReply;

public:
    typedef std::unique_ptr<TestVisitorMessageSession> UP;

    VisitorThread& thread;
    Visitor& visitor;
    uint32_t pendingCount;

    ~TestVisitorMessageSession();

    std::deque<std::unique_ptr<documentapi::DocumentMessage> > sentMessages;

    TestVisitorMessageSession(VisitorThread& t,
                              Visitor& v,
                              const mbus::Error& autoReplyError,
                              bool autoReply);

    void reply(mbus::Reply::UP rep);
    uint32_t pending() override { return pendingCount; }
    mbus::Result send(std::unique_ptr<documentapi::DocumentMessage> message) override;
    void waitForMessages(unsigned int msgCount);
    vespalib::Monitor& getMonitor() { return _waitMonitor; }
};

struct TestVisitorMessageSessionFactory : public VisitorMessageSessionFactory
{
    vespalib::Lock _accessLock;
    std::vector<TestVisitorMessageSession*> _visitorSessions;
    mbus::Error _autoReplyError;
    bool _createAutoReplyVisitorSessions;
    PriorityConverter _priConverter;

    TestVisitorMessageSessionFactory(vespalib::stringref configId = "")
        : _createAutoReplyVisitorSessions(false),
          _priConverter(configId) {}

    VisitorMessageSession::UP createSession(Visitor& v, VisitorThread& vt) override {
        vespalib::LockGuard lock(_accessLock);
        TestVisitorMessageSession::UP session(new TestVisitorMessageSession(vt, v, _autoReplyError,
                                                                            _createAutoReplyVisitorSessions));
        _visitorSessions.push_back(session.get());
        return VisitorMessageSession::UP(std::move(session));
    }

    documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override {
        return _priConverter.toDocumentPriority(storagePriority);
    }

};

} // storage