1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "socket.h"
#include <vespa/vespalib/net/socket_options.h>
#include <vespa/vespalib/net/socket_spec.h>
namespace vbench {
namespace {
vespalib::SocketHandle connect(const string &host, int port) {
return vespalib::SocketSpec::from_host_port(host, port).client_address().connect();
}
} // namespace vbench::<unnamed>
constexpr size_t READ_SIZE = 32768;
Socket::Socket(vespalib::SocketHandle socket)
: _socket(std::move(socket)),
_input(),
_output(),
_taint(),
_eof(false)
{
}
Socket::Socket(const string &host, int port)
: _socket(connect(host, port)),
_input(),
_output(),
_taint(),
_eof(false)
{
if (!_socket.valid() || !_socket.set_linger(false, 0)) {
_taint.reset(strfmt("socket connect failed: host: %s, port: %d",
host.c_str(), port));
_socket.reset();
}
}
Socket::~Socket() { }
Memory
Socket::obtain()
{
if ((_input.get().size == 0) && !_eof && !_taint) {
WritableMemory buf = _input.reserve(READ_SIZE);
ssize_t res = _socket.read(buf.data, buf.size);
if (res > 0) {
_input.commit(res);
} else if (res < 0) {
_taint.reset("socket read error");
} else {
_eof = true;
}
}
return _input.obtain();
}
Input &
Socket::evict(size_t bytes)
{
_input.evict(bytes);
return *this;
}
WritableMemory
Socket::reserve(size_t bytes)
{
return _output.reserve(bytes);
}
Output &
Socket::commit(size_t bytes)
{
_output.commit(bytes);
while ((_output.get().size > 0) && !_taint) {
Memory buf = _output.obtain();
ssize_t res = _socket.write(buf.data, buf.size);
if (res > 0) {
_output.evict(res);
} else {
_taint.reset("socket write error");
}
}
return *this;
}
} // namespace vbench
|