diff options
author | Arne H Juul <arnej@yahoo-inc.com> | 2017-05-05 14:38:10 +0200 |
---|---|---|
committer | Arne H Juul <arnej@yahoo-inc.com> | 2017-05-08 10:38:23 +0200 |
commit | e81817db16aa9ef5470cacb2a760ff78f62532de (patch) | |
tree | 5c70750fe2dd8bc2befbcf00f20ad633549ff867 /fbench | |
parent | 7c4e89e0a4f39f715933eb0794c078d1b50406f8 (diff) |
super ugly add post mode
* also, fix some logic with byte offsets and -r option
Diffstat (limited to 'fbench')
-rw-r--r-- | fbench/src/fbench/client.cpp | 142 | ||||
-rw-r--r-- | fbench/src/fbench/client.h | 14 | ||||
-rw-r--r-- | fbench/src/fbench/fbench.cpp | 24 | ||||
-rw-r--r-- | fbench/src/fbench/fbench.h | 3 | ||||
-rw-r--r-- | fbench/src/httpclient/httpclient.cpp | 142 | ||||
-rw-r--r-- | fbench/src/httpclient/httpclient.h | 8 | ||||
-rw-r--r-- | fbench/src/util/filereader.cpp | 9 | ||||
-rw-r--r-- | fbench/src/util/filereader.h | 6 |
8 files changed, 307 insertions, 41 deletions
diff --git a/fbench/src/fbench/client.cpp b/fbench/src/fbench/client.cpp index f539e990ad9..8dc6cb3b8de 100644 --- a/fbench/src/fbench/client.cpp +++ b/fbench/src/fbench/client.cpp @@ -17,6 +17,8 @@ Client::Client(ClientArguments *args) _output(), _linebufsize(args->_maxLineSize), _linebuf(new char[_linebufsize]), + _contentbufsize(16 * args->_maxLineSize), + _contentbuf(NULL), _stop(false), _done(false), _thread() @@ -27,6 +29,7 @@ Client::Client(ClientArguments *args) Client::~Client() { + delete [] _contentbuf; delete [] _linebuf; } @@ -34,6 +37,96 @@ void Client::runMe(Client * me) { me->run(); } + +class UrlReader { + FileReader &_reader; + const ClientArguments &_args; + int _restarts; + char *_leftOvers; + int _leftOversLen; +public: + UrlReader(FileReader& reader, const ClientArguments &args) + : _reader(reader), _args(args), _restarts(args._restartLimit), + _leftOvers(NULL), _leftOversLen(0) + {} + int nextUrl(char *buf, int bufLen); + int getContent(char *buf, int bufLen); + ~UrlReader() {} +}; + +int UrlReader::nextUrl(char *buf, int buflen) +{ + if (_leftOvers) { + if (_leftOversLen < buflen) { + strncpy(buf, _leftOvers, _leftOversLen); + buf[_leftOversLen] = '\0'; + } else { + strncpy(buf, _leftOvers, buflen); + buf[buflen-1] = '\0'; + } + _leftOvers = NULL; + return _leftOversLen; + } + // Read maximum to _queryfileOffsetEnd + if ( _args._singleQueryFile && _reader.GetFilePos() >= _args._queryfileEndOffset ) { + _reader.SetFilePos(_args._queryfileOffset); + if (_restarts == 0) { + return 0; + } else if (_restarts > 0) { + _restarts--; + } + } + int ll = _reader.ReadLine(buf, buflen); + while (ll > 0 && _args._usePostMode && buf[0] != '/') { + ll = _reader.ReadLine(buf, buflen); + } + if (ll > 0 && (buf[0] == '/' || !_args._usePostMode)) { + return ll; + } + if (_restarts == 0) { + return 0; + } else if (_restarts > 0) { + _restarts--; + } + if (ll < 0) { + _reader.Reset(); + // Start reading from offset + if (_args._singleQueryFile) { + _reader.SetFilePos(_args._queryfileOffset); + } + } + ll = _reader.ReadLine(buf, buflen); + while (ll > 0 && _args._usePostMode && buf[0] != '/') { + ll = _reader.ReadLine(buf, buflen); + } + if (ll > 0 && (buf[0] == '/' || !_args._usePostMode)) { + return ll; + } + return 0; +} + +int UrlReader::getContent(char *buf, int bufLen) +{ + int totLen = 0; + while (totLen < bufLen) { + int len = _reader.ReadLine(buf, bufLen); + if (len > 0) { + if (buf[0] == '/') { + _leftOvers = buf; + _leftOversLen = len; + return totLen; + } + buf += len; + bufLen -= len; + totLen += len; + } else { + return totLen; + } + } + return totLen; +} + + void Client::run() { @@ -43,6 +136,10 @@ Client::run() int linelen; /// int reslen; + if (_args->_usePostMode) { + _contentbuf = new char[_contentbufsize]; + } + std::this_thread::sleep_for(std::chrono::milliseconds(_args->_delay)); // open query file @@ -73,37 +170,24 @@ Client::run() if ( _args->_singleQueryFile ) _reader->SetFilePos(_args->_queryfileOffset); + UrlReader urlSource(*_reader, *_args); + size_t urlNumber = 0; + // run queries while (!_stop) { _cycleTimer->Start(); - linelen = _reader->ReadLine(_linebuf, _linebufsize); - - // Read maximum to _queryfileOffsetEnd - if ( _args->_singleQueryFile && _reader->GetBufPos() >= _args->_queryfileBytes ) { - _reader->SetFilePos(_args->_queryfileOffset); - } - - if (linelen < 0) { - _reader->Reset(); - // Start reading from offset - if ( _args->_singleQueryFile ) { - _reader->SetFilePos(_args->_queryfileOffset); - } - - linelen = _reader->ReadLine(_linebuf, _linebufsize); - if (linelen < 0) { + linelen = urlSource.nextUrl(_linebuf, _linebufsize); + if (linelen > 0) { + ++urlNumber; + } else { + if (urlNumber == 0) { fprintf(stderr, "Client %d: ERROR: could not read any lines from '%s'\n", _args->_myNum, inputFilename); _status->SetError("Could not read any lines from query file."); - break; - } - if (_args->_restartLimit == 0) { - break; - } else if (_args->_restartLimit > 0) { - _args->_restartLimit--; } + break; } if (linelen < _linebufsize) { if (_output) { @@ -114,9 +198,17 @@ Client::run() if (linelen + (int)_args->_queryStringToAppend.length() < _linebufsize) { strcat(_linebuf, _args->_queryStringToAppend.c_str()); } - _reqTimer->Start(); - auto fetch_status = _http->Fetch(_linebuf, _output.get()); - _reqTimer->Stop(); + HTTPClient::FetchStatus fetch_status(false, -1, -1, -1); + if (_args->_usePostMode) { + int cLen = urlSource.getContent(_contentbuf, _contentbufsize); + _reqTimer->Start(); + fetch_status = _http->Post(_linebuf, _contentbuf, cLen, _output.get()); + _reqTimer->Stop(); + } else { + _reqTimer->Start(); + fetch_status = _http->Fetch(_linebuf, _output.get()); + _reqTimer->Stop(); + } _status->AddRequestStatus(fetch_status.RequestStatus()); if (fetch_status.Ok() && fetch_status.TotalHitCount() == 0) ++_status->_zeroHitQueries; diff --git a/fbench/src/fbench/client.h b/fbench/src/fbench/client.h index 4e78d2d6adc..4b33edb8939 100644 --- a/fbench/src/fbench/client.h +++ b/fbench/src/fbench/client.h @@ -95,13 +95,16 @@ struct ClientArguments **/ bool _keepAlive; + /** Whether we should use POST in requests */ + bool _usePostMode; + /** * Indicate whether to add benchmark data coverage headers **/ bool _headerBenchmarkdataCoverage; uint64_t _queryfileOffset; - uint64_t _queryfileBytes; + uint64_t _queryfileEndOffset; bool _singleQueryFile; std::string _queryStringToAppend; std::string _extraHeaders; @@ -115,9 +118,9 @@ struct ClientArguments int ignoreCount, int byteLimit, int restartLimit, int maxLineSize, bool keepAlive, bool headerBenchmarkdataCoverage, - uint64_t queryfileOffset, uint64_t queryfileBytes, bool singleQueryFile, + uint64_t queryfileOffset, uint64_t queryfileEndOffset, bool singleQueryFile, const std::string & queryStringToAppend, const std::string & extraHeaders, - const std::string &authority) + const std::string &authority, bool postMode) : _myNum(myNum), _totNum(totNum), _filenamePattern(filenamePattern), @@ -131,9 +134,10 @@ struct ClientArguments _restartLimit(restartLimit), _maxLineSize(maxLineSize), _keepAlive(keepAlive), + _usePostMode(postMode), _headerBenchmarkdataCoverage(headerBenchmarkdataCoverage), _queryfileOffset(queryfileOffset), - _queryfileBytes(queryfileBytes), + _queryfileEndOffset(queryfileEndOffset), _singleQueryFile(singleQueryFile), _queryStringToAppend(queryStringToAppend), _extraHeaders(extraHeaders), @@ -165,6 +169,8 @@ private: std::unique_ptr<std::ofstream> _output; int _linebufsize; char *_linebuf; + int _contentbufsize; + char *_contentbuf; std::atomic<bool> _stop; std::atomic<bool> _done; std::thread _thread; diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp index 14a0fd4106d..f473eb608d5 100644 --- a/fbench/src/fbench/fbench.cpp +++ b/fbench/src/fbench/fbench.cpp @@ -20,6 +20,7 @@ FBench::FBench() _restartLimit(0), _maxLineSize(0), _keepAlive(true), + _usePostMode(false), _headerBenchmarkdataCoverage(false), _seconds(60), _singleQueryFile(false) @@ -39,7 +40,7 @@ FBench::InitBenchmark(int numClients, int ignoreCount, int cycle, int byteLimit, int restartLimit, int maxLineSize, bool keepAlive, bool headerBenchmarkdataCoverage, int seconds, bool singleQueryFile, const std::string & queryStringToAppend, const std::string & extraHeaders, - const std::string &authority) + const std::string &authority, bool postMode) { _clients.resize(numClients); _ignoreCount = ignoreCount; @@ -57,6 +58,7 @@ FBench::InitBenchmark(int numClients, int ignoreCount, int cycle, _restartLimit = restartLimit; _maxLineSize = maxLineSize; _keepAlive = keepAlive; + _usePostMode = postMode; _headerBenchmarkdataCoverage = headerBenchmarkdataCoverage; _seconds = seconds; _singleQueryFile = singleQueryFile; @@ -69,6 +71,12 @@ FBench::CreateClients() int i(0); for(auto & client : _clients) { + uint64_t off_beg = 0; + uint64_t off_end = 0; + if (_singleQueryFile) { + off_beg = _queryfileOffset[i]; + off_end = _queryfileOffset[i+1]; + } client = std::make_unique<Client>( new ClientArguments(i, _clients.size(), _filenamePattern, _outputPattern, _hostnames[i % _hostnames.size()].c_str(), @@ -76,9 +84,8 @@ FBench::CreateClients() random() % spread, _ignoreCount, _byteLimit, _restartLimit, _maxLineSize, _keepAlive, _headerBenchmarkdataCoverage, - _queryfileOffset[i % _queryfileOffset.size()], - _queryfileOffset[i+1 % _queryfileOffset.size()]-_queryfileOffset[i % _queryfileOffset.size()], - _singleQueryFile, _queryStringToAppend, _extraHeaders, _authority)); + off_beg, off_end, + _singleQueryFile, _queryStringToAppend, _extraHeaders, _authority, _usePostMode)); ++i; } } @@ -205,6 +212,7 @@ FBench::Usage() printf(" [-r restartLimit] [-m maxLineSize] [-k] <hostname> <port>\n\n"); printf(" -H <str> : append extra header to each get request.\n"); printf(" -A <str> : assign autority. <str> should be hostname:port format. Overrides Host: header sent.\n"); + printf(" -P : use POST for requests instead of get.\n"); printf(" -a <str> : append string to each query\n"); printf(" -n <num> : run with <num> parallel clients [10]\n"); printf(" -c <num> : each client will make a request each <num> milliseconds [1000]\n"); @@ -258,6 +266,7 @@ FBench::Main(int argc, char *argv[]) int restartLimit = -1; bool keepAlive = true; bool headerBenchmarkdataCoverage = false; + bool usePostMode = false; bool singleQueryFile = false; std::string authority; @@ -272,7 +281,7 @@ FBench::Main(int argc, char *argv[]) idx = 1; optError = false; - while((opt = GetOpt(argc, argv, "H:A:a:n:c:l:i:s:q:o:r:m:p:kxyz", arg, idx)) != -1) { + while((opt = GetOpt(argc, argv, "H:A:a:n:c:l:i:s:q:o:r:m:p:kxyzP", arg, idx)) != -1) { switch(opt) { case 'A': authority = arg; @@ -317,6 +326,9 @@ FBench::Main(int argc, char *argv[]) maxLineSize = minLineSize; } break; + case 'P': + usePostMode = true; + break; case 'p': printInterval = atoi(arg); if (printInterval < 0) @@ -410,7 +422,7 @@ FBench::Main(int argc, char *argv[]) keepAlive, headerBenchmarkdataCoverage, seconds, singleQueryFile, queryStringToAppend, extraHeaders, - authority); + authority, usePostMode); CreateClients(); StartClients(); diff --git a/fbench/src/fbench/fbench.h b/fbench/src/fbench/fbench.h index 636d2b7e8b7..dcf56ce6336 100644 --- a/fbench/src/fbench/fbench.h +++ b/fbench/src/fbench/fbench.h @@ -22,6 +22,7 @@ private: int _restartLimit; int _maxLineSize; bool _keepAlive; + bool _usePostMode; bool _headerBenchmarkdataCoverage; int _seconds; std::vector<uint64_t> _queryfileOffset; @@ -36,7 +37,7 @@ private: int byteLimit, int restartLimit, int maxLineSize, bool keepAlive, bool headerBenchmarkdataCoverage, int seconds, bool singleQueryFile, const std::string & queryStringToAppend, const std::string & extraHeaders, - const std::string &authority); + const std::string &authority, bool postMode); void CreateClients(); void StartClients(); diff --git a/fbench/src/httpclient/httpclient.cpp b/fbench/src/httpclient/httpclient.cpp index ce2157335e4..1a9564c5dd4 100644 --- a/fbench/src/httpclient/httpclient.cpp +++ b/fbench/src/httpclient/httpclient.cpp @@ -187,6 +187,103 @@ HTTPClient::Connect(const char *url) return false; } +bool +HTTPClient::ConnectForPost(const char *url, const char *content, int cLen) +{ + char tmp[4096]; + char *req = NULL; + uint32_t req_max = 0; + uint32_t url_len = strlen(url); + uint32_t host_len = _hostname.size(); + + // Add additional headers + std::string headers = _extraHeaders; + + // this is always requested to get robust info on total hit count. + headers += "X-Yahoo-Vespa-Benchmarkdata: true\r\n"; + + if ( _headerBenchmarkdataCoverage ) { + headers += "X-Yahoo-Vespa-Benchmarkdata-Coverage: true\r\n"; + } + + if (url_len + host_len + headers.length() + FIXED_REQ_MAX < sizeof(tmp)) { + req = tmp; + req_max = sizeof(tmp); + } else { + req_max = url_len + host_len + headers.length() + FIXED_REQ_MAX; + req = new char[req_max]; + assert(req != NULL); + } + + // create request + if(_keepAlive) { + snprintf(req, req_max, + "POST %s HTTP/1.1\r\n" + "Host: %s\r\n" + "Content-Length: %d\r\n" + "User-Agent: fbench/4.2.10\r\n" + "%s" + "\r\n", + url, _authority.c_str(), cLen, headers.c_str()); + } else { + snprintf(req, req_max, + "POST %s HTTP/1.1\r\n" + "Host: %s\r\n" + "Connection: close\r\n" + "Content-Length: %d\r\n" + "User-Agent: fbench/4.2.10\r\n" + "%s" + "\r\n", + url, _authority.c_str(), cLen, headers.c_str()); + } + + // try to reuse connection if keep-alive is enabled + if (_keepAlive + && _socket->IsOpened() + && _socket->Write(req, strlen(req)) == (ssize_t)strlen(req) + && _socket->Write(content, cLen) == (ssize_t)cLen + && FillBuffer() > 0) { + + // DEBUG + // printf("Socket Connection reused!\n"); + _reuseCount++; + if (req != tmp) { + delete [] req; + } + return true; + } else { + _socket->Close(); + ResetBuffer(); + } + + // try to open new connection to server + if (_socket->SetSoBlocking(true) + && _socket->Connect() + && _socket->SetNoDelay(true) + && _socket->SetSoLinger(false, 0) + && _socket->Write(req, strlen(req)) == (ssize_t)strlen(req) + && _socket->Write(content, cLen) == (ssize_t)cLen) + { + + // DEBUG + // printf("New Socket connection!\n"); + if (req != tmp) { + delete [] req; + } + return true; + } else { + _socket->Close(); + } + + // DEBUG + // printf("Connect FAILED!\n"); + if (req != tmp) { + delete [] req; + } + return false; +} + + char * HTTPClient::SplitString(char *input, int &argc, char **argv, int maxargs) { @@ -338,7 +435,7 @@ HTTPClient::ReadChunkHeader() } bool -HTTPClient::Open(const char *url) +HTTPClient::Open(const char *url, bool usePost, const char *content, int cLen) { if (_isOpen) Close(); @@ -346,7 +443,7 @@ HTTPClient::Open(const char *url) ResetBuffer(); _dataRead = 0; _dataDone = false; - _isOpen = Connect(url); + _isOpen = usePost ? ConnectForPost(url, content, cLen) : Connect(url); if(!_isOpen || !ReadHTTPHeader()) { Close(); return false; @@ -547,3 +644,44 @@ HTTPClient::Fetch(const char *url, std::ostream *file) _totalHitCount, written); } + +HTTPClient::FetchStatus +HTTPClient::Post(const char *url, const char *content, int contentLen, std::ostream *file) +{ + size_t buflen = FETCH_BUFLEN; + char buf[FETCH_BUFLEN]; // NB: ensure big enough thread stack. + ssize_t readRes = 0; + ssize_t written = 0; + + if (!Open(url, true, content, contentLen)) { + return FetchStatus(false, _requestStatus, _totalHitCount, 0); + } + + // Write headerinfo + if (file) { + file->write(_headerinfo.c_str(), _headerinfo.length()); + if (file->fail()) { + Close(); + return FetchStatus(false, _requestStatus, _totalHitCount, 0); + } + file->write("\r\n", 2); + // Reset header data. + _headerinfo = ""; + } + + while((readRes = Read(buf, buflen)) > 0) { + if(file != NULL) { + if (!file->write(buf, readRes)) { + Close(); + return FetchStatus(false, _requestStatus, _totalHitCount, written); + } + } + written += readRes; + } + Close(); + + return FetchStatus(_requestStatus == 200 && readRes == 0 && _totalHitCount >= 0, + _requestStatus, + _totalHitCount, + written); +} diff --git a/fbench/src/httpclient/httpclient.h b/fbench/src/httpclient/httpclient.h index 831ff83651d..52a9511fc76 100644 --- a/fbench/src/httpclient/httpclient.h +++ b/fbench/src/httpclient/httpclient.h @@ -159,6 +159,9 @@ protected: **/ bool Connect(const char *url); + /** connect for post */ + bool ConnectForPost(const char *url, const char *content, int cLen); + /** * Read the next line of text from the data stream into 'buf'. If * the line is longer than ('bufsize' - 1), the first ('bufsize' - @@ -244,7 +247,7 @@ public: * @return success(true)/failure(false) * @param url the url you want to connect to **/ - bool Open(const char *url); + bool Open(const char *url, bool usePost = false, const char *content = 0, int cLen = 0); /** * Read data from the url we are currently connected to. This method @@ -328,5 +331,8 @@ public: * is NULL, the content will be read and then discarded. **/ FetchStatus Fetch(const char *url, std::ostream *file = NULL); + + /** post some content to URL */ + FetchStatus Post(const char *url, const char *content, int contentLen, std::ostream *file = NULL); }; diff --git a/fbench/src/util/filereader.cpp b/fbench/src/util/filereader.cpp index b1eebbcb2f0..30469e18e0c 100644 --- a/fbench/src/util/filereader.cpp +++ b/fbench/src/util/filereader.cpp @@ -20,6 +20,8 @@ FileReader::FileReader() _file(&std::cin), _bufsize(1024*1024), _buf(_bufsize), + _lastReadPos(0), + _nextReadPos(0), _bufused(0), _bufpos(0) { @@ -49,6 +51,7 @@ FileReader::Reset() { _file->clear(); _file->seekg(0); + _nextReadPos = 0; return bool(*_file); } @@ -57,6 +60,7 @@ FileReader::SetFilePos(int64_t pos) { _bufpos = 0; _file->seekg(pos); + _nextReadPos = pos; return bool(*_file); } @@ -81,8 +85,11 @@ FileReader::FindNewline(int64_t pos) void FileReader::FillBuffer() { + _lastReadPos = _nextReadPos; _file->read(&_buf[0], _bufsize); - _bufused = _file->gcount(); // may be -1 + auto wasRead = _file->gcount(); // may be -1 + _nextReadPos += wasRead; + _bufused = wasRead; _bufpos = 0; } diff --git a/fbench/src/util/filereader.h b/fbench/src/util/filereader.h index b553c73a262..6b85ad3ba51 100644 --- a/fbench/src/util/filereader.h +++ b/fbench/src/util/filereader.h @@ -21,6 +21,8 @@ private: std::istream *_file; int _bufsize; std::vector<char> _buf; + uint64_t _lastReadPos; + uint64_t _nextReadPos; int _bufused; int _bufpos; @@ -57,7 +59,7 @@ public: { if(_bufpos == _bufused) FillBuffer(); - return (_bufused > _bufpos) ? _buf[_bufpos++] & 0x0ff : -1; + return (_bufused > _bufpos) ? (_buf[_bufpos++] & 0x0ff) : -1; } /** @@ -98,6 +100,8 @@ public: **/ uint64_t GetBufPos() const { return _bufpos; } + uint64_t GetFilePos() const { return _lastReadPos + _bufpos; } + /** * @returns offset of next newline from pos **/ |