// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config.benchmark;
import com.yahoo.collections.Tuple2;
import com.yahoo.vespa.config.PayloadChecksums;
import com.yahoo.io.IOUtils;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.TransportMetrics;
import com.yahoo.system.CommandLineParser;
import com.yahoo.vespa.config.ConfigDefinitionKey;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.protocol.CompressionType;
import com.yahoo.vespa.config.protocol.DefContent;
import com.yahoo.vespa.config.protocol.JRTClientConfigRequest;
import com.yahoo.vespa.config.protocol.JRTClientConfigRequestV3;
import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory;
import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.vespa.config.util.ConfigUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import static com.yahoo.vespa.config.ConfigKey.createFull;
/**
* A client for generating load (config requests) against a config server or config proxy.
*
* Log messages from a run will have a # first in the line, the end result will not.
*
* @author Vegard Havdal
*/
public class LoadTester {
private final Transport transport = new Transport("rpc-client");
protected Supervisor supervisor = new Supervisor(transport);
private List> configs = new ArrayList<>();
private Map> defs = new HashMap<>();
private final CompressionType compressionType = JRTConfigRequestFactory.getCompressionType();
private final String host;
private final int port;
private final int iterations;
private final int threads;
private final String configFile;
private final String defPath;
private final boolean debug;
LoadTester(String host, int port, int iterations, int threads, String configFile, String defPath, boolean debug) {
this.host = host;
this.port = port;
this.iterations = iterations;
this.threads = threads;
this.configFile = configFile;
this.defPath = defPath;
this.debug = debug;
}
/**
* @param args command-line arguments
*/
public static void main(String[] args) throws IOException, InterruptedException {
CommandLineParser parser = new CommandLineParser("LoadTester", args);
parser.addLegalUnarySwitch("-d", "debug");
parser.addRequiredBinarySwitch("-c", "host (config proxy or server)");
parser.addRequiredBinarySwitch("-p", "port");
parser.addRequiredBinarySwitch("-i", "iterations per thread");
parser.addRequiredBinarySwitch("-t", "threads");
parser.addLegalBinarySwitch("-l", "config file, on form name,configid. (To get list: vespa-configproxy-cmd -m cache | cut -d ',' -f1-2)");
parser.addLegalBinarySwitch("-dd", "dir with def files, must be of form name.def");
parser.parse();
String host = parser.getBinarySwitches().get("-c");
int port = Integer.parseInt(parser.getBinarySwitches().get("-p"));
int iterations = Integer.parseInt(parser.getBinarySwitches().get("-i"));
int threads = Integer.parseInt(parser.getBinarySwitches().get("-t"));
String configFile = parser.getBinarySwitches().get("-l");
String defPath = parser.getBinarySwitches().get("-dd");
boolean debug = parser.getUnarySwitches().contains("-d");
new LoadTester(host, port, iterations, threads, configFile, defPath, debug)
.runLoad();
}
private void runLoad() throws IOException, InterruptedException {
configs = readConfigs(configFile);
defs = readDefs(defPath);
validateConfigs(configs, defs);
List threadList = new ArrayList<>();
Metrics m = new Metrics();
long startInNanos = System.nanoTime();
for (int i = 0; i < threads; i++) {
LoadThread lt = new LoadThread(iterations, host, port);
threadList.add(lt);
lt.start();
}
for (LoadThread lt : threadList) {
lt.join();
m.merge(lt.metrics);
}
float durationInSeconds = (float) (System.nanoTime() - startInNanos) / 1_000_000_000f;
printResults(durationInSeconds, threads, iterations, m);
}
private Map> readDefs(String defPath) throws IOException {
Map> ret = new HashMap<>();
if (defPath == null) return ret;
File defDir = new File(defPath);
if (!defDir.isDirectory()) {
System.out.println("# Given def file dir is not a directory: " +
defDir.getPath() + " , will not send def contents in requests.");
return ret;
}
File[] files = defDir.listFiles();
if (files == null) {
System.out.println("# Given def file dir has no files: " +
defDir.getPath() + " , will not send def contents in requests.");
return ret;
}
for (File f : files) {
String name = f.getName();
if (!name.endsWith(".def")) continue;
String contents = IOUtils.readFile(f);
ConfigDefinitionKey key = ConfigUtils.createConfigDefinitionKeyFromDefFile(f);
ret.put(key, new Tuple2<>(ConfigUtils.getDefMd5(Arrays.asList(contents.split("\n"))), contents.split("\n")));
}
System.out.println("# Read " + ret.size() + " def files from " + defDir.getPath());
return ret;
}
private void printResults(float durationInSeconds, long threads, long iterations, Metrics metrics) {
StringBuilder sb = new StringBuilder();
sb.append("#reqs/sec #avglatency #minlatency #maxlatency #failedrequests\n");
sb.append(((float) (iterations * threads)) / durationInSeconds).append(",");
sb.append((metrics.latencyInMillis / threads / iterations)).append(",");
sb.append((metrics.minLatency)).append(",");
sb.append((metrics.maxLatency)).append(",");
sb.append((metrics.failedRequests));
sb.append("\n");
sb.append('#').append(TransportMetrics.getInstance().snapshot().toString()).append('\n');
System.out.println(sb);
}
private List> readConfigs(String configsList) throws IOException {
List> ret = new ArrayList<>();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(configsList), StandardCharsets.UTF_8));
String str = br.readLine();
while (str != null) {
String[] nameAndId = str.split(",");
Tuple2 nameAndNamespace = ConfigUtils.getNameAndNamespaceFromString(nameAndId[0]);
ConfigKey> key = new ConfigKey<>(nameAndNamespace.first, nameAndId[1], nameAndNamespace.second);
ret.add(key);
str = br.readLine();
}
br.close();
return ret;
}
private void validateConfigs(List> configs, Map> defs) {
for (ConfigKey> configKey : configs) {
ConfigDefinitionKey dKey = new ConfigDefinitionKey(configKey);
Tuple2 defContent = defs.get(dKey);
if (defContent == null)
throw new IllegalArgumentException("No matching config definition for " + configKey +
", known config definitions: " + defs.keySet());
}
}
private static class Metrics {
long latencyInMillis = 0;
long failedRequests = 0;
long maxLatency = Long.MIN_VALUE;
long minLatency = Long.MAX_VALUE;
public void merge(Metrics m) {
this.latencyInMillis += m.latencyInMillis;
this.failedRequests += m.failedRequests;
updateMin(m.minLatency);
updateMax(m.maxLatency);
}
public void update(long latency) {
this.latencyInMillis += latency;
updateMin(latency);
updateMax(latency);
}
private void updateMin(long latency) {
if (latency < minLatency)
minLatency = latency;
}
private void updateMax(long latency) {
if (latency > maxLatency)
maxLatency = latency;
}
private void incFailedRequests() {
failedRequests++;
}
}
private class LoadThread extends Thread {
private final int iterations;
private final Spec spec;
private final Metrics metrics = new Metrics();
LoadThread(int iterations, String host, int port) {
this.iterations = iterations;
this.spec = new Spec(host, port);
}
@Override
public void run() {
Target target = connect(spec);
int numberOfConfigs = configs.size();
for (int i = 0; i < iterations; i++) {
ConfigKey> reqKey = configs.get(ThreadLocalRandom.current().nextInt(numberOfConfigs));
JRTClientConfigRequest request = createRequest(reqKey);
if (debug)
System.out.println("# Requesting: " + reqKey);
long start = System.nanoTime();
target.invokeSync(request.getRequest(), Duration.ofSeconds(10));
long durationInMillis = (System.nanoTime() - start) / 1_000_000;
if (request.isError()) {
target = handleError(request, spec, target);
} else {
metrics.update(durationInMillis);
}
}
}
private JRTClientConfigRequest createRequest(ConfigKey> reqKey) {
ConfigDefinitionKey dKey = new ConfigDefinitionKey(reqKey);
Tuple2 defContent = defs.get(dKey);
ConfigKey> fullKey = createFull(reqKey.getName(), reqKey.getConfigId(), reqKey.getNamespace());
final long serverTimeout = 1000;
return JRTClientConfigRequestV3.createWithParams(fullKey, DefContent.fromList(List.of(defContent.second)),
ConfigUtils.getCanonicalHostName(), PayloadChecksums.empty(),
0, serverTimeout, Trace.createDummy(),
compressionType, Optional.empty());
}
private Target connect(Spec spec) {
return supervisor.connect(spec);
}
private Target handleError(JRTClientConfigRequest request, Spec spec, Target target) {
if (List.of("Connection lost", "Connection down").contains(request.errorMessage())) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("# Connection lost, reconnecting...");
target.close();
target = connect(spec);
} else {
System.err.println(request.errorMessage());
}
metrics.incFailedRequests();
return target;
}
}
}