summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorOlli Virtanen <ovirtanen@gmail.com>2018-08-16 14:06:42 +0200
committerGitHub <noreply@github.com>2018-08-16 14:06:42 +0200
commit7a60ce7e38475b65ab9e5a7ed751293ace040e3b (patch)
treeedf3d17bc49c9846c46536c890d8d7ef6915d078 /configserver
parent9368de3002354e487bd9050a04c4ab228dffb8aa (diff)
parentc5c26faf89e5f41918bcd3984b113f1a4ede89bd (diff)
Merge pull request #6381 from vespa-engine/ovirtanen/zookeeper-metrics-in-configserver
Report zookeeper monitoring statistics in configserver; proposed metrics keys approved
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java13
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java122
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java93
3 files changed, 225 insertions, 3 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java
index fe9364ed33e..5813c3eb04a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java
@@ -2,6 +2,7 @@
package com.yahoo.vespa.config.server.monitoring;
import com.google.inject.Inject;
+import com.yahoo.cloud.config.ZookeeperServerConfig;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.TenantName;
import com.yahoo.container.jdisc.config.HealthMonitorConfig;
@@ -32,19 +33,23 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory {
private final Counter failedRequests;
private final Counter procTimeCounter;
private final Metric metric;
+ private final ZKMetricUpdater zkMetricUpdater;
// TODO The map is the key for now
private final Map<Map<String, String>, MetricUpdater> metricUpdaters = new ConcurrentHashMap<>();
private final Timer timer = new Timer();
@Inject
- public Metrics(Metric metric, Statistics statistics, HealthMonitorConfig healthMonitorConfig) {
+ public Metrics(Metric metric, Statistics statistics, HealthMonitorConfig healthMonitorConfig, ZookeeperServerConfig zkServerConfig) {
this.metric = metric;
requests = createCounter(METRIC_REQUESTS, statistics);
failedRequests = createCounter(METRIC_FAILED_REQUESTS, statistics);
procTimeCounter = createCounter("procTime", statistics);
- timer.scheduleAtFixedRate(this, 5000, (long) (healthMonitorConfig.snapshot_interval() * 1000));
+
log.log(LogLevel.DEBUG, "Metric update interval is " + healthMonitorConfig.snapshot_interval() + " seconds");
+ long intervalMs = (long) (healthMonitorConfig.snapshot_interval() * 1000);
+ timer.scheduleAtFixedRate(this, 5000, intervalMs);
+ zkMetricUpdater = new ZKMetricUpdater(zkServerConfig, 4500, intervalMs);
}
public static Metrics createTestMetrics() {
@@ -52,7 +57,8 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory {
Statistics.NullImplementation statistics = new Statistics.NullImplementation();
HealthMonitorConfig.Builder builder = new HealthMonitorConfig.Builder();
builder.snapshot_interval(60.0);
- return new Metrics(metric, statistics, new HealthMonitorConfig(builder));
+ ZookeeperServerConfig.Builder zkBuilder = new ZookeeperServerConfig.Builder().myid(1);
+ return new Metrics(metric, statistics, new HealthMonitorConfig(builder), new ZookeeperServerConfig(zkBuilder));
}
private Counter createCounter(String name, Statistics statistics) {
@@ -120,6 +126,7 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory {
}
}
setRegularMetrics();
+ zkMetricUpdater.getZKMetrics().forEach((attr, val) -> metric.set(attr, val, null));
timer.purge();
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java
new file mode 100644
index 00000000000..a08be228e48
--- /dev/null
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java
@@ -0,0 +1,122 @@
+package com.yahoo.vespa.config.server.monitoring;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.yahoo.vespa.config.server.monitoring.Metrics.getMetricName;
+
+public class ZKMetricUpdater extends TimerTask {
+ private static final Logger log = Logger.getLogger(ZKMetricUpdater.class.getName());
+
+ public static final String METRIC_ZK_ZNODES = getMetricName("zkZNodes");
+ public static final String METRIC_ZK_LATENCY_AVERAGE = getMetricName("zkAvgLatency");
+ public static final String METRIC_ZK_LATENCY_MAX = getMetricName("zkMaxLatency");
+ public static final String METRIC_ZK_CONNECTIONS = getMetricName("zkConnections");
+ public static final String METRIC_ZK_OUTSTANDING_REQUESTS = getMetricName("zkOutstandingRequests");
+
+ private final int CONNECTION_TIMEOUT_MS = 500;
+ private final int WRITE_TIMEOUT_MS = 250;
+ private final int READ_TIMEOUT_MS = 500;
+
+ private AtomicReference<Map<String, Long>> zkMetrics = new AtomicReference<>(new HashMap<>());
+ private final Timer timer = new Timer();
+ private final int zkPort;
+
+ public ZKMetricUpdater(ZookeeperServerConfig zkServerConfig, long delay, long interval) {
+ this.zkPort = zkServerConfig.clientPort();
+ if (interval > 0) {
+ timer.scheduleAtFixedRate(this, delay, interval);
+ }
+ }
+
+ private void setMetricAttribute(String attribute, long value, Map<String, Long> data) {
+ switch (attribute) {
+ case "zk_znode_count":
+ data.put(METRIC_ZK_ZNODES, value);
+ break;
+ case "zk_avg_latency":
+ data.put(METRIC_ZK_LATENCY_AVERAGE, value);
+ break;
+ case "zk_max_latency":
+ data.put(METRIC_ZK_LATENCY_MAX, value);
+ break;
+ case "zk_num_alive_connections":
+ data.put(METRIC_ZK_CONNECTIONS, value);
+ break;
+ case "zk_outstanding_requests":
+ data.put(METRIC_ZK_OUTSTANDING_REQUESTS, value);
+ break;
+ }
+ }
+
+ @Override
+ public void run() {
+ Optional<String> report = retrieveReport();
+ report.ifPresent(this::parseReport);
+ timer.purge();
+ }
+
+ private Optional<String> retrieveReport() {
+ try (AsynchronousSocketChannel chan = AsynchronousSocketChannel.open()) {
+ InetSocketAddress zkAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort);
+ Future<Void> connected = chan.connect(zkAddress);
+ connected.get(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ Future<Integer> written = chan.write(ByteBuffer.wrap("mntr\n".getBytes(StandardCharsets.UTF_8)));
+ written.get(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ int nread = -1;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ do {
+ Future<Integer> read = chan.read(buffer);
+ nread = read.get(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ buffer.flip();
+ baos.write(buffer.array());
+ buffer.clear();
+ } while (nread >= 0);
+
+ return Optional.of(baos.toString("UTF-8"));
+ } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+ log.warning("Failure in retrieving monitoring data: (" + e.getClass().getName() + ") " + e.getMessage());
+ return Optional.empty();
+ }
+ }
+
+ private static final Pattern MONITORING_REPORT = Pattern.compile("^(\\w+)\\s+(\\d+)$", Pattern.MULTILINE);
+
+ private void parseReport(String report) {
+ Matcher matcher = MONITORING_REPORT.matcher(report);
+ Map<String, Long> data = new HashMap<>();
+ while (matcher.find()) {
+ String attribute = matcher.group(1);
+ long value = Long.parseLong(matcher.group(2));
+ setMetricAttribute(attribute, value, data);
+ }
+ zkMetrics.set(data);
+ }
+
+ public Map<String, Long> getZKMetrics() {
+ return zkMetrics.get();
+ }
+}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java
new file mode 100644
index 00000000000..46a6047bb68
--- /dev/null
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java
@@ -0,0 +1,93 @@
+package com.yahoo.vespa.config.server.monitoring;
+
+import com.yahoo.cloud.config.ZookeeperServerConfig;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ZKMetricUpdaterTest {
+ private Thread serverThread;
+ private int serverPort;
+
+ @After
+ public void terminate() throws InterruptedException {
+ if (serverThread != null) {
+ serverThread.interrupt();
+ serverThread.join(500);
+ }
+ }
+
+ @Test
+ public void zk_monitoring_data_is_parsed_and_reported() throws IOException {
+ setupTcpServer(() -> "zk_version\t3.4.0\n" + //
+ "zk_avg_latency\t444\n" + //
+ "zk_max_latency\t1234\n" + //
+ "zk_min_latency\t0\n" + //
+ "zk_packets_received\t71\n" + //
+ "zk_packets_sent\t70\n" + //
+ "zk_outstanding_requests\t12\n" + //
+ "zk_num_alive_connections\t2\n" + //
+ "zk_server_state\tleader\n" + //
+ "zk_znode_count\t4\n" + //
+ "zk_watch_count\t0\n" + //
+ "zk_ephemerals_count\t0\n" + //
+ "zk_approximate_data_size\t27\n");
+
+ ZKMetricUpdater updater = buildUpdater();
+ updater.run();
+
+ Map<String, Long> reportedMetrics = updater.getZKMetrics();
+
+ assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_CONNECTIONS), equalTo(2L));
+ assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_LATENCY_AVERAGE), equalTo(444L));
+ assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_LATENCY_MAX), equalTo(1234L));
+ assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_OUTSTANDING_REQUESTS), equalTo(12L));
+ assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_ZNODES), equalTo(4L));
+ }
+
+ private ZKMetricUpdater buildUpdater() {
+ ZookeeperServerConfig zkServerConfig = new ZookeeperServerConfig(
+ new ZookeeperServerConfig.Builder().clientPort(serverPort).myid(12345));
+ ZKMetricUpdater updater = new ZKMetricUpdater(zkServerConfig, 0, -1);
+ return updater;
+ }
+
+ private void setupTcpServer(Supplier<String> reportProvider) throws IOException {
+ ServerSocket serverSocket = new ServerSocket(0);
+ serverPort = serverSocket.getLocalPort();
+ serverThread = Executors.defaultThreadFactory().newThread(() -> {
+ while (!Thread.interrupted()) {
+ try (Socket connection = serverSocket.accept()) {
+ BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ String verb = input.readLine();
+ if ("mntr".equals(verb)) {
+ DataOutputStream output = new DataOutputStream(connection.getOutputStream());
+ output.write(reportProvider.get().getBytes(StandardCharsets.UTF_8));
+ output.close();
+ }
+ } catch (IOException e) {
+ System.out.println("Error in fake ZK server: " + e.toString());
+ }
+ }
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ System.out.println("Error closing server socket in fake ZK server: " + e.toString());
+ }
+ });
+ serverThread.start();
+ }
+}