summaryrefslogtreecommitdiffstats
path: root/fbench
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2017-05-05 14:38:10 +0200
committerArne H Juul <arnej@yahoo-inc.com>2017-05-08 10:38:23 +0200
commite81817db16aa9ef5470cacb2a760ff78f62532de (patch)
tree5c70750fe2dd8bc2befbcf00f20ad633549ff867 /fbench
parent7c4e89e0a4f39f715933eb0794c078d1b50406f8 (diff)
super ugly add post mode
* also, fix some logic with byte offsets and -r option
Diffstat (limited to 'fbench')
-rw-r--r--fbench/src/fbench/client.cpp142
-rw-r--r--fbench/src/fbench/client.h14
-rw-r--r--fbench/src/fbench/fbench.cpp24
-rw-r--r--fbench/src/fbench/fbench.h3
-rw-r--r--fbench/src/httpclient/httpclient.cpp142
-rw-r--r--fbench/src/httpclient/httpclient.h8
-rw-r--r--fbench/src/util/filereader.cpp9
-rw-r--r--fbench/src/util/filereader.h6
8 files changed, 307 insertions, 41 deletions
diff --git a/fbench/src/fbench/client.cpp b/fbench/src/fbench/client.cpp
index f539e990ad9..8dc6cb3b8de 100644
--- a/fbench/src/fbench/client.cpp
+++ b/fbench/src/fbench/client.cpp
@@ -17,6 +17,8 @@ Client::Client(ClientArguments *args)
_output(),
_linebufsize(args->_maxLineSize),
_linebuf(new char[_linebufsize]),
+ _contentbufsize(16 * args->_maxLineSize),
+ _contentbuf(NULL),
_stop(false),
_done(false),
_thread()
@@ -27,6 +29,7 @@ Client::Client(ClientArguments *args)
Client::~Client()
{
+ delete [] _contentbuf;
delete [] _linebuf;
}
@@ -34,6 +37,96 @@ void Client::runMe(Client * me) {
me->run();
}
+
+class UrlReader {
+ FileReader &_reader;
+ const ClientArguments &_args;
+ int _restarts;
+ char *_leftOvers;
+ int _leftOversLen;
+public:
+ UrlReader(FileReader& reader, const ClientArguments &args)
+ : _reader(reader), _args(args), _restarts(args._restartLimit),
+ _leftOvers(NULL), _leftOversLen(0)
+ {}
+ int nextUrl(char *buf, int bufLen);
+ int getContent(char *buf, int bufLen);
+ ~UrlReader() {}
+};
+
+int UrlReader::nextUrl(char *buf, int buflen)
+{
+ if (_leftOvers) {
+ if (_leftOversLen < buflen) {
+ strncpy(buf, _leftOvers, _leftOversLen);
+ buf[_leftOversLen] = '\0';
+ } else {
+ strncpy(buf, _leftOvers, buflen);
+ buf[buflen-1] = '\0';
+ }
+ _leftOvers = NULL;
+ return _leftOversLen;
+ }
+ // Read maximum to _queryfileOffsetEnd
+ if ( _args._singleQueryFile && _reader.GetFilePos() >= _args._queryfileEndOffset ) {
+ _reader.SetFilePos(_args._queryfileOffset);
+ if (_restarts == 0) {
+ return 0;
+ } else if (_restarts > 0) {
+ _restarts--;
+ }
+ }
+ int ll = _reader.ReadLine(buf, buflen);
+ while (ll > 0 && _args._usePostMode && buf[0] != '/') {
+ ll = _reader.ReadLine(buf, buflen);
+ }
+ if (ll > 0 && (buf[0] == '/' || !_args._usePostMode)) {
+ return ll;
+ }
+ if (_restarts == 0) {
+ return 0;
+ } else if (_restarts > 0) {
+ _restarts--;
+ }
+ if (ll < 0) {
+ _reader.Reset();
+ // Start reading from offset
+ if (_args._singleQueryFile) {
+ _reader.SetFilePos(_args._queryfileOffset);
+ }
+ }
+ ll = _reader.ReadLine(buf, buflen);
+ while (ll > 0 && _args._usePostMode && buf[0] != '/') {
+ ll = _reader.ReadLine(buf, buflen);
+ }
+ if (ll > 0 && (buf[0] == '/' || !_args._usePostMode)) {
+ return ll;
+ }
+ return 0;
+}
+
+int UrlReader::getContent(char *buf, int bufLen)
+{
+ int totLen = 0;
+ while (totLen < bufLen) {
+ int len = _reader.ReadLine(buf, bufLen);
+ if (len > 0) {
+ if (buf[0] == '/') {
+ _leftOvers = buf;
+ _leftOversLen = len;
+ return totLen;
+ }
+ buf += len;
+ bufLen -= len;
+ totLen += len;
+ } else {
+ return totLen;
+ }
+ }
+ return totLen;
+}
+
+
void
Client::run()
{
@@ -43,6 +136,10 @@ Client::run()
int linelen;
/// int reslen;
+ if (_args->_usePostMode) {
+ _contentbuf = new char[_contentbufsize];
+ }
+
std::this_thread::sleep_for(std::chrono::milliseconds(_args->_delay));
// open query file
@@ -73,37 +170,24 @@ Client::run()
if ( _args->_singleQueryFile )
_reader->SetFilePos(_args->_queryfileOffset);
+ UrlReader urlSource(*_reader, *_args);
+ size_t urlNumber = 0;
+
// run queries
while (!_stop) {
_cycleTimer->Start();
- linelen = _reader->ReadLine(_linebuf, _linebufsize);
-
- // Read maximum to _queryfileOffsetEnd
- if ( _args->_singleQueryFile && _reader->GetBufPos() >= _args->_queryfileBytes ) {
- _reader->SetFilePos(_args->_queryfileOffset);
- }
-
- if (linelen < 0) {
- _reader->Reset();
- // Start reading from offset
- if ( _args->_singleQueryFile ) {
- _reader->SetFilePos(_args->_queryfileOffset);
- }
-
- linelen = _reader->ReadLine(_linebuf, _linebufsize);
- if (linelen < 0) {
+ linelen = urlSource.nextUrl(_linebuf, _linebufsize);
+ if (linelen > 0) {
+ ++urlNumber;
+ } else {
+ if (urlNumber == 0) {
fprintf(stderr, "Client %d: ERROR: could not read any lines from '%s'\n",
_args->_myNum, inputFilename);
_status->SetError("Could not read any lines from query file.");
- break;
- }
- if (_args->_restartLimit == 0) {
- break;
- } else if (_args->_restartLimit > 0) {
- _args->_restartLimit--;
}
+ break;
}
if (linelen < _linebufsize) {
if (_output) {
@@ -114,9 +198,17 @@ Client::run()
if (linelen + (int)_args->_queryStringToAppend.length() < _linebufsize) {
strcat(_linebuf, _args->_queryStringToAppend.c_str());
}
- _reqTimer->Start();
- auto fetch_status = _http->Fetch(_linebuf, _output.get());
- _reqTimer->Stop();
+ HTTPClient::FetchStatus fetch_status(false, -1, -1, -1);
+ if (_args->_usePostMode) {
+ int cLen = urlSource.getContent(_contentbuf, _contentbufsize);
+ _reqTimer->Start();
+ fetch_status = _http->Post(_linebuf, _contentbuf, cLen, _output.get());
+ _reqTimer->Stop();
+ } else {
+ _reqTimer->Start();
+ fetch_status = _http->Fetch(_linebuf, _output.get());
+ _reqTimer->Stop();
+ }
_status->AddRequestStatus(fetch_status.RequestStatus());
if (fetch_status.Ok() && fetch_status.TotalHitCount() == 0)
++_status->_zeroHitQueries;
diff --git a/fbench/src/fbench/client.h b/fbench/src/fbench/client.h
index 4e78d2d6adc..4b33edb8939 100644
--- a/fbench/src/fbench/client.h
+++ b/fbench/src/fbench/client.h
@@ -95,13 +95,16 @@ struct ClientArguments
**/
bool _keepAlive;
+ /** Whether we should use POST in requests */
+ bool _usePostMode;
+
/**
* Indicate whether to add benchmark data coverage headers
**/
bool _headerBenchmarkdataCoverage;
uint64_t _queryfileOffset;
- uint64_t _queryfileBytes;
+ uint64_t _queryfileEndOffset;
bool _singleQueryFile;
std::string _queryStringToAppend;
std::string _extraHeaders;
@@ -115,9 +118,9 @@ struct ClientArguments
int ignoreCount, int byteLimit,
int restartLimit, int maxLineSize,
bool keepAlive, bool headerBenchmarkdataCoverage,
- uint64_t queryfileOffset, uint64_t queryfileBytes, bool singleQueryFile,
+ uint64_t queryfileOffset, uint64_t queryfileEndOffset, bool singleQueryFile,
const std::string & queryStringToAppend, const std::string & extraHeaders,
- const std::string &authority)
+ const std::string &authority, bool postMode)
: _myNum(myNum),
_totNum(totNum),
_filenamePattern(filenamePattern),
@@ -131,9 +134,10 @@ struct ClientArguments
_restartLimit(restartLimit),
_maxLineSize(maxLineSize),
_keepAlive(keepAlive),
+ _usePostMode(postMode),
_headerBenchmarkdataCoverage(headerBenchmarkdataCoverage),
_queryfileOffset(queryfileOffset),
- _queryfileBytes(queryfileBytes),
+ _queryfileEndOffset(queryfileEndOffset),
_singleQueryFile(singleQueryFile),
_queryStringToAppend(queryStringToAppend),
_extraHeaders(extraHeaders),
@@ -165,6 +169,8 @@ private:
std::unique_ptr<std::ofstream> _output;
int _linebufsize;
char *_linebuf;
+ int _contentbufsize;
+ char *_contentbuf;
std::atomic<bool> _stop;
std::atomic<bool> _done;
std::thread _thread;
diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp
index 14a0fd4106d..f473eb608d5 100644
--- a/fbench/src/fbench/fbench.cpp
+++ b/fbench/src/fbench/fbench.cpp
@@ -20,6 +20,7 @@ FBench::FBench()
_restartLimit(0),
_maxLineSize(0),
_keepAlive(true),
+ _usePostMode(false),
_headerBenchmarkdataCoverage(false),
_seconds(60),
_singleQueryFile(false)
@@ -39,7 +40,7 @@ FBench::InitBenchmark(int numClients, int ignoreCount, int cycle,
int byteLimit, int restartLimit, int maxLineSize,
bool keepAlive, bool headerBenchmarkdataCoverage, int seconds,
bool singleQueryFile, const std::string & queryStringToAppend, const std::string & extraHeaders,
- const std::string &authority)
+ const std::string &authority, bool postMode)
{
_clients.resize(numClients);
_ignoreCount = ignoreCount;
@@ -57,6 +58,7 @@ FBench::InitBenchmark(int numClients, int ignoreCount, int cycle,
_restartLimit = restartLimit;
_maxLineSize = maxLineSize;
_keepAlive = keepAlive;
+ _usePostMode = postMode;
_headerBenchmarkdataCoverage = headerBenchmarkdataCoverage;
_seconds = seconds;
_singleQueryFile = singleQueryFile;
@@ -69,6 +71,12 @@ FBench::CreateClients()
int i(0);
for(auto & client : _clients) {
+ uint64_t off_beg = 0;
+ uint64_t off_end = 0;
+ if (_singleQueryFile) {
+ off_beg = _queryfileOffset[i];
+ off_end = _queryfileOffset[i+1];
+ }
client = std::make_unique<Client>(
new ClientArguments(i, _clients.size(), _filenamePattern,
_outputPattern, _hostnames[i % _hostnames.size()].c_str(),
@@ -76,9 +84,8 @@ FBench::CreateClients()
random() % spread, _ignoreCount,
_byteLimit, _restartLimit, _maxLineSize,
_keepAlive, _headerBenchmarkdataCoverage,
- _queryfileOffset[i % _queryfileOffset.size()],
- _queryfileOffset[i+1 % _queryfileOffset.size()]-_queryfileOffset[i % _queryfileOffset.size()],
- _singleQueryFile, _queryStringToAppend, _extraHeaders, _authority));
+ off_beg, off_end,
+ _singleQueryFile, _queryStringToAppend, _extraHeaders, _authority, _usePostMode));
++i;
}
}
@@ -205,6 +212,7 @@ FBench::Usage()
printf(" [-r restartLimit] [-m maxLineSize] [-k] <hostname> <port>\n\n");
printf(" -H <str> : append extra header to each get request.\n");
printf(" -A <str> : assign autority. <str> should be hostname:port format. Overrides Host: header sent.\n");
+ printf(" -P : use POST for requests instead of get.\n");
printf(" -a <str> : append string to each query\n");
printf(" -n <num> : run with <num> parallel clients [10]\n");
printf(" -c <num> : each client will make a request each <num> milliseconds [1000]\n");
@@ -258,6 +266,7 @@ FBench::Main(int argc, char *argv[])
int restartLimit = -1;
bool keepAlive = true;
bool headerBenchmarkdataCoverage = false;
+ bool usePostMode = false;
bool singleQueryFile = false;
std::string authority;
@@ -272,7 +281,7 @@ FBench::Main(int argc, char *argv[])
idx = 1;
optError = false;
- while((opt = GetOpt(argc, argv, "H:A:a:n:c:l:i:s:q:o:r:m:p:kxyz", arg, idx)) != -1) {
+ while((opt = GetOpt(argc, argv, "H:A:a:n:c:l:i:s:q:o:r:m:p:kxyzP", arg, idx)) != -1) {
switch(opt) {
case 'A':
authority = arg;
@@ -317,6 +326,9 @@ FBench::Main(int argc, char *argv[])
maxLineSize = minLineSize;
}
break;
+ case 'P':
+ usePostMode = true;
+ break;
case 'p':
printInterval = atoi(arg);
if (printInterval < 0)
@@ -410,7 +422,7 @@ FBench::Main(int argc, char *argv[])
keepAlive,
headerBenchmarkdataCoverage, seconds,
singleQueryFile, queryStringToAppend, extraHeaders,
- authority);
+ authority, usePostMode);
CreateClients();
StartClients();
diff --git a/fbench/src/fbench/fbench.h b/fbench/src/fbench/fbench.h
index 636d2b7e8b7..dcf56ce6336 100644
--- a/fbench/src/fbench/fbench.h
+++ b/fbench/src/fbench/fbench.h
@@ -22,6 +22,7 @@ private:
int _restartLimit;
int _maxLineSize;
bool _keepAlive;
+ bool _usePostMode;
bool _headerBenchmarkdataCoverage;
int _seconds;
std::vector<uint64_t> _queryfileOffset;
@@ -36,7 +37,7 @@ private:
int byteLimit, int restartLimit, int maxLineSize,
bool keepAlive, bool headerBenchmarkdataCoverage, int seconds,
bool singleQueryFile, const std::string & queryStringToAppend, const std::string & extraHeaders,
- const std::string &authority);
+ const std::string &authority, bool postMode);
void CreateClients();
void StartClients();
diff --git a/fbench/src/httpclient/httpclient.cpp b/fbench/src/httpclient/httpclient.cpp
index ce2157335e4..1a9564c5dd4 100644
--- a/fbench/src/httpclient/httpclient.cpp
+++ b/fbench/src/httpclient/httpclient.cpp
@@ -187,6 +187,103 @@ HTTPClient::Connect(const char *url)
return false;
}
+bool
+HTTPClient::ConnectForPost(const char *url, const char *content, int cLen)
+{
+ char tmp[4096];
+ char *req = NULL;
+ uint32_t req_max = 0;
+ uint32_t url_len = strlen(url);
+ uint32_t host_len = _hostname.size();
+
+ // Add additional headers
+ std::string headers = _extraHeaders;
+
+ // this is always requested to get robust info on total hit count.
+ headers += "X-Yahoo-Vespa-Benchmarkdata: true\r\n";
+
+ if ( _headerBenchmarkdataCoverage ) {
+ headers += "X-Yahoo-Vespa-Benchmarkdata-Coverage: true\r\n";
+ }
+
+ if (url_len + host_len + headers.length() + FIXED_REQ_MAX < sizeof(tmp)) {
+ req = tmp;
+ req_max = sizeof(tmp);
+ } else {
+ req_max = url_len + host_len + headers.length() + FIXED_REQ_MAX;
+ req = new char[req_max];
+ assert(req != NULL);
+ }
+
+ // create request
+ if(_keepAlive) {
+ snprintf(req, req_max,
+ "POST %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Content-Length: %d\r\n"
+ "User-Agent: fbench/4.2.10\r\n"
+ "%s"
+ "\r\n",
+ url, _authority.c_str(), cLen, headers.c_str());
+ } else {
+ snprintf(req, req_max,
+ "POST %s HTTP/1.1\r\n"
+ "Host: %s\r\n"
+ "Connection: close\r\n"
+ "Content-Length: %d\r\n"
+ "User-Agent: fbench/4.2.10\r\n"
+ "%s"
+ "\r\n",
+ url, _authority.c_str(), cLen, headers.c_str());
+ }
+
+ // try to reuse connection if keep-alive is enabled
+ if (_keepAlive
+ && _socket->IsOpened()
+ && _socket->Write(req, strlen(req)) == (ssize_t)strlen(req)
+ && _socket->Write(content, cLen) == (ssize_t)cLen
+ && FillBuffer() > 0) {
+
+ // DEBUG
+ // printf("Socket Connection reused!\n");
+ _reuseCount++;
+ if (req != tmp) {
+ delete [] req;
+ }
+ return true;
+ } else {
+ _socket->Close();
+ ResetBuffer();
+ }
+
+ // try to open new connection to server
+ if (_socket->SetSoBlocking(true)
+ && _socket->Connect()
+ && _socket->SetNoDelay(true)
+ && _socket->SetSoLinger(false, 0)
+ && _socket->Write(req, strlen(req)) == (ssize_t)strlen(req)
+ && _socket->Write(content, cLen) == (ssize_t)cLen)
+ {
+
+ // DEBUG
+ // printf("New Socket connection!\n");
+ if (req != tmp) {
+ delete [] req;
+ }
+ return true;
+ } else {
+ _socket->Close();
+ }
+
+ // DEBUG
+ // printf("Connect FAILED!\n");
+ if (req != tmp) {
+ delete [] req;
+ }
+ return false;
+}
+
+
char *
HTTPClient::SplitString(char *input, int &argc, char **argv, int maxargs)
{
@@ -338,7 +435,7 @@ HTTPClient::ReadChunkHeader()
}
bool
-HTTPClient::Open(const char *url)
+HTTPClient::Open(const char *url, bool usePost, const char *content, int cLen)
{
if (_isOpen)
Close();
@@ -346,7 +443,7 @@ HTTPClient::Open(const char *url)
ResetBuffer();
_dataRead = 0;
_dataDone = false;
- _isOpen = Connect(url);
+ _isOpen = usePost ? ConnectForPost(url, content, cLen) : Connect(url);
if(!_isOpen || !ReadHTTPHeader()) {
Close();
return false;
@@ -547,3 +644,44 @@ HTTPClient::Fetch(const char *url, std::ostream *file)
_totalHitCount,
written);
}
+
+HTTPClient::FetchStatus
+HTTPClient::Post(const char *url, const char *content, int contentLen, std::ostream *file)
+{
+ size_t buflen = FETCH_BUFLEN;
+ char buf[FETCH_BUFLEN]; // NB: ensure big enough thread stack.
+ ssize_t readRes = 0;
+ ssize_t written = 0;
+
+ if (!Open(url, true, content, contentLen)) {
+ return FetchStatus(false, _requestStatus, _totalHitCount, 0);
+ }
+
+ // Write headerinfo
+ if (file) {
+ file->write(_headerinfo.c_str(), _headerinfo.length());
+ if (file->fail()) {
+ Close();
+ return FetchStatus(false, _requestStatus, _totalHitCount, 0);
+ }
+ file->write("\r\n", 2);
+ // Reset header data.
+ _headerinfo = "";
+ }
+
+ while((readRes = Read(buf, buflen)) > 0) {
+ if(file != NULL) {
+ if (!file->write(buf, readRes)) {
+ Close();
+ return FetchStatus(false, _requestStatus, _totalHitCount, written);
+ }
+ }
+ written += readRes;
+ }
+ Close();
+
+ return FetchStatus(_requestStatus == 200 && readRes == 0 && _totalHitCount >= 0,
+ _requestStatus,
+ _totalHitCount,
+ written);
+}
diff --git a/fbench/src/httpclient/httpclient.h b/fbench/src/httpclient/httpclient.h
index 831ff83651d..52a9511fc76 100644
--- a/fbench/src/httpclient/httpclient.h
+++ b/fbench/src/httpclient/httpclient.h
@@ -159,6 +159,9 @@ protected:
**/
bool Connect(const char *url);
+ /** connect for post */
+ bool ConnectForPost(const char *url, const char *content, int cLen);
+
/**
* Read the next line of text from the data stream into 'buf'. If
* the line is longer than ('bufsize' - 1), the first ('bufsize' -
@@ -244,7 +247,7 @@ public:
* @return success(true)/failure(false)
* @param url the url you want to connect to
**/
- bool Open(const char *url);
+ bool Open(const char *url, bool usePost = false, const char *content = 0, int cLen = 0);
/**
* Read data from the url we are currently connected to. This method
@@ -328,5 +331,8 @@ public:
* is NULL, the content will be read and then discarded.
**/
FetchStatus Fetch(const char *url, std::ostream *file = NULL);
+
+ /** post some content to URL */
+ FetchStatus Post(const char *url, const char *content, int contentLen, std::ostream *file = NULL);
};
diff --git a/fbench/src/util/filereader.cpp b/fbench/src/util/filereader.cpp
index b1eebbcb2f0..30469e18e0c 100644
--- a/fbench/src/util/filereader.cpp
+++ b/fbench/src/util/filereader.cpp
@@ -20,6 +20,8 @@ FileReader::FileReader()
_file(&std::cin),
_bufsize(1024*1024),
_buf(_bufsize),
+ _lastReadPos(0),
+ _nextReadPos(0),
_bufused(0),
_bufpos(0)
{
@@ -49,6 +51,7 @@ FileReader::Reset()
{
_file->clear();
_file->seekg(0);
+ _nextReadPos = 0;
return bool(*_file);
}
@@ -57,6 +60,7 @@ FileReader::SetFilePos(int64_t pos)
{
_bufpos = 0;
_file->seekg(pos);
+ _nextReadPos = pos;
return bool(*_file);
}
@@ -81,8 +85,11 @@ FileReader::FindNewline(int64_t pos)
void
FileReader::FillBuffer()
{
+ _lastReadPos = _nextReadPos;
_file->read(&_buf[0], _bufsize);
- _bufused = _file->gcount(); // may be -1
+ auto wasRead = _file->gcount(); // may be -1
+ _nextReadPos += wasRead;
+ _bufused = wasRead;
_bufpos = 0;
}
diff --git a/fbench/src/util/filereader.h b/fbench/src/util/filereader.h
index b553c73a262..6b85ad3ba51 100644
--- a/fbench/src/util/filereader.h
+++ b/fbench/src/util/filereader.h
@@ -21,6 +21,8 @@ private:
std::istream *_file;
int _bufsize;
std::vector<char> _buf;
+ uint64_t _lastReadPos;
+ uint64_t _nextReadPos;
int _bufused;
int _bufpos;
@@ -57,7 +59,7 @@ public:
{
if(_bufpos == _bufused)
FillBuffer();
- return (_bufused > _bufpos) ? _buf[_bufpos++] & 0x0ff : -1;
+ return (_bufused > _bufpos) ? (_buf[_bufpos++] & 0x0ff) : -1;
}
/**
@@ -98,6 +100,8 @@ public:
**/
uint64_t GetBufPos() const { return _bufpos; }
+ uint64_t GetFilePos() const { return _lastReadPos + _bufpos; }
+
/**
* @returns offset of next newline from pos
**/