aboutsummaryrefslogtreecommitdiffstats
path: root/vespalog/src/test/threads/testthreads.cpp
blob: d23e00930fd494046c8c41716bb5cde2e34a480f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/fastos/app.h>
#include <vespa/fastos/timestamp.h>
#include <vespa/fastos/thread.h>
#include <vespa/log/bufferedlogger.h>
#include <array>
#include <iostream>
#include <thread>
#include <chrono>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>

using std::string;
using namespace std::chrono_literals;

LOG_SETUP(".threadtest");

class FileThread : public FastOS_Runnable
{
    bool _done;
    string _file;
public:
    FileThread(string file) : _done(false), _file(file) {}
    void Run(FastOS_ThreadInterface *thread, void *arg) override;
    void stop() {_done = true; }
};

class LoggerThread : public FastOS_Runnable
{
    bool _done;
public:
    bool _useLogBuffer;
    LoggerThread() : _done(false), _useLogBuffer(false) {}
    void Run(FastOS_ThreadInterface *thread, void *arg) override;
    void stop() {_done = true; }
};

void
FileThread::Run(FastOS_ThreadInterface *, void *)
{
    unlink(_file.c_str());
    while (!_done) {
        int fd = open(_file.c_str(), O_RDWR | O_CREAT | O_APPEND, 0644);
        if (fd == -1) {
            fprintf(stderr, "open failed: %s\n", strerror(errno));
            exit(1);
        }
        std::this_thread::sleep_for(5ms);
        struct stat buf;
        fstat(fd, &buf);
        if (buf.st_size != 0) {
            fprintf(stderr, "%s isn't empty anymore\n", _file.c_str());
            exit(1);
        }
        if (close(fd) != 0) {
            fprintf(stderr, "close of %d failed: %s\n", fd, strerror(errno));
            exit(1);
        }
    }
}


void
LoggerThread::Run(FastOS_ThreadInterface *, void *)
{
    int counter = 0;
    while (!_done) {
        if (_useLogBuffer) {
            LOGBM(info, "bla bla bla %u", ++counter);
        } else {
            LOG(info, "bla bla bla");
        }
    }
}


class ThreadTester : public FastOS_Application
{
public:
    int Main() override;
};

int
ThreadTester::Main()
{
    std::cerr << "Testing that logging is threadsafe. 30 sec test.\n";
    FastOS_ThreadPool pool(128 * 1024);

    const int numWriters = 30;
    const int numLoggers = 10;

    auto writers = std::array<std::unique_ptr<FileThread>, numWriters>();
    auto loggers = std::array<std::unique_ptr<LoggerThread>, numLoggers>();

    for (int i = 0; i < numWriters; i++) {
        char filename[100];
        sprintf(filename, "empty.%d", i);
        writers[i] = std::make_unique<FileThread>(filename);
        pool.NewThread(writers[i].get());
    }
    for (int i = 0; i < numLoggers; i++) {
        loggers[i] = std::make_unique<LoggerThread>();
        pool.NewThread(loggers[i].get());
    }

    fastos::StopWatch timer;
    // Reduced runtime to half as the test now repeats itself to test with
    // buffering. (To avoid test taking a minute)
    while (timer.elapsed().ms() < 15 * 1000) {
        unlink(_argv[1]);
        std::this_thread::sleep_for(1ms);
    }
        // Then set to use logbuffer and continue
    for (int i = 0; i < numLoggers; i++) {
        loggers[i]->_useLogBuffer = true;
    }
    timer.restart();
    while (timer.elapsed().ms() < 15 * 1000) {
        unlink(_argv[1]);
        std::this_thread::sleep_for(1ms);
    }

    for (int i = 0; i < numLoggers; i++) {
        loggers[i]->stop();
    }
    for (int i = 0; i < numWriters; i++) {
        writers[i]->stop();
    }

    pool.Close();

    return 0;
}

int main(int argc, char **argv)
{
    ThreadTester app;
    return app.Entry(argc, argv);
}