diff options
4 files changed, 231 insertions, 3 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java index a7a93e75293..bd98ccea8ea 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/monitoring/VespaMetricSet.java @@ -68,6 +68,12 @@ public class VespaMetricSet { metrics.add(new Metric("configserver.delayedResponses.count", "configserver.delayedResponses")); metrics.add(new Metric("configserver.sessionChangeErrors.count", "configserver.sessionChangeErrors")); + metrics.add(new Metric("configserver.zkZNodes.last", "configserver.zkZNodes")); + metrics.add(new Metric("configserver.zkAvgLatency.last", "configserver.zkAvgLatency")); + metrics.add(new Metric("configserver.zkMaxLatency.last", "configserver.zkMaxLatency")); + metrics.add(new Metric("configserver.zkConnections.last", "configserver.zkConnections")); + metrics.add(new Metric("configserver.zkOutstandingRequests.last", "configserver.zkOutstandingRequests")); + return metrics; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java index fe9364ed33e..5813c3eb04a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/Metrics.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.config.server.monitoring; import com.google.inject.Inject; +import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.TenantName; import com.yahoo.container.jdisc.config.HealthMonitorConfig; @@ -32,19 +33,23 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory { private final Counter failedRequests; private final Counter procTimeCounter; private final Metric metric; + private final ZKMetricUpdater zkMetricUpdater; // TODO The map is the key for now private final Map<Map<String, String>, MetricUpdater> metricUpdaters = new ConcurrentHashMap<>(); private final Timer timer = new Timer(); @Inject - public Metrics(Metric metric, Statistics statistics, HealthMonitorConfig healthMonitorConfig) { + public Metrics(Metric metric, Statistics statistics, HealthMonitorConfig healthMonitorConfig, ZookeeperServerConfig zkServerConfig) { this.metric = metric; requests = createCounter(METRIC_REQUESTS, statistics); failedRequests = createCounter(METRIC_FAILED_REQUESTS, statistics); procTimeCounter = createCounter("procTime", statistics); - timer.scheduleAtFixedRate(this, 5000, (long) (healthMonitorConfig.snapshot_interval() * 1000)); + log.log(LogLevel.DEBUG, "Metric update interval is " + healthMonitorConfig.snapshot_interval() + " seconds"); + long intervalMs = (long) (healthMonitorConfig.snapshot_interval() * 1000); + timer.scheduleAtFixedRate(this, 5000, intervalMs); + zkMetricUpdater = new ZKMetricUpdater(zkServerConfig, 4500, intervalMs); } public static Metrics createTestMetrics() { @@ -52,7 +57,8 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory { Statistics.NullImplementation statistics = new Statistics.NullImplementation(); HealthMonitorConfig.Builder builder = new HealthMonitorConfig.Builder(); builder.snapshot_interval(60.0); - return new Metrics(metric, statistics, new HealthMonitorConfig(builder)); + ZookeeperServerConfig.Builder zkBuilder = new ZookeeperServerConfig.Builder().myid(1); + return new Metrics(metric, statistics, new HealthMonitorConfig(builder), new ZookeeperServerConfig(zkBuilder)); } private Counter createCounter(String name, Statistics statistics) { @@ -120,6 +126,7 @@ public class Metrics extends TimerTask implements MetricUpdaterFactory { } } setRegularMetrics(); + zkMetricUpdater.getZKMetrics().forEach((attr, val) -> metric.set(attr, val, null)); timer.purge(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java new file mode 100644 index 00000000000..a08be228e48 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java @@ -0,0 +1,122 @@ +package com.yahoo.vespa.config.server.monitoring; + +import com.yahoo.cloud.config.ZookeeperServerConfig; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.yahoo.vespa.config.server.monitoring.Metrics.getMetricName; + +public class ZKMetricUpdater extends TimerTask { + private static final Logger log = Logger.getLogger(ZKMetricUpdater.class.getName()); + + public static final String METRIC_ZK_ZNODES = getMetricName("zkZNodes"); + public static final String METRIC_ZK_LATENCY_AVERAGE = getMetricName("zkAvgLatency"); + public static final String METRIC_ZK_LATENCY_MAX = getMetricName("zkMaxLatency"); + public static final String METRIC_ZK_CONNECTIONS = getMetricName("zkConnections"); + public static final String METRIC_ZK_OUTSTANDING_REQUESTS = getMetricName("zkOutstandingRequests"); + + private final int CONNECTION_TIMEOUT_MS = 500; + private final int WRITE_TIMEOUT_MS = 250; + private final int READ_TIMEOUT_MS = 500; + + private AtomicReference<Map<String, Long>> zkMetrics = new AtomicReference<>(new HashMap<>()); + private final Timer timer = new Timer(); + private final int zkPort; + + public ZKMetricUpdater(ZookeeperServerConfig zkServerConfig, long delay, long interval) { + this.zkPort = zkServerConfig.clientPort(); + if (interval > 0) { + timer.scheduleAtFixedRate(this, delay, interval); + } + } + + private void setMetricAttribute(String attribute, long value, Map<String, Long> data) { + switch (attribute) { + case "zk_znode_count": + data.put(METRIC_ZK_ZNODES, value); + break; + case "zk_avg_latency": + data.put(METRIC_ZK_LATENCY_AVERAGE, value); + break; + case "zk_max_latency": + data.put(METRIC_ZK_LATENCY_MAX, value); + break; + case "zk_num_alive_connections": + data.put(METRIC_ZK_CONNECTIONS, value); + break; + case "zk_outstanding_requests": + data.put(METRIC_ZK_OUTSTANDING_REQUESTS, value); + break; + } + } + + @Override + public void run() { + Optional<String> report = retrieveReport(); + report.ifPresent(this::parseReport); + timer.purge(); + } + + private Optional<String> retrieveReport() { + try (AsynchronousSocketChannel chan = AsynchronousSocketChannel.open()) { + InetSocketAddress zkAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort); + Future<Void> connected = chan.connect(zkAddress); + connected.get(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + Future<Integer> written = chan.write(ByteBuffer.wrap("mntr\n".getBytes(StandardCharsets.UTF_8))); + written.get(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + int nread = -1; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteBuffer buffer = ByteBuffer.allocate(4096); + do { + Future<Integer> read = chan.read(buffer); + nread = read.get(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS); + buffer.flip(); + baos.write(buffer.array()); + buffer.clear(); + } while (nread >= 0); + + return Optional.of(baos.toString("UTF-8")); + } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { + log.warning("Failure in retrieving monitoring data: (" + e.getClass().getName() + ") " + e.getMessage()); + return Optional.empty(); + } + } + + private static final Pattern MONITORING_REPORT = Pattern.compile("^(\\w+)\\s+(\\d+)$", Pattern.MULTILINE); + + private void parseReport(String report) { + Matcher matcher = MONITORING_REPORT.matcher(report); + Map<String, Long> data = new HashMap<>(); + while (matcher.find()) { + String attribute = matcher.group(1); + long value = Long.parseLong(matcher.group(2)); + setMetricAttribute(attribute, value, data); + } + zkMetrics.set(data); + } + + public Map<String, Long> getZKMetrics() { + return zkMetrics.get(); + } +} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java new file mode 100644 index 00000000000..46a6047bb68 --- /dev/null +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdaterTest.java @@ -0,0 +1,93 @@ +package com.yahoo.vespa.config.server.monitoring; + +import com.yahoo.cloud.config.ZookeeperServerConfig; +import org.junit.After; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class ZKMetricUpdaterTest { + private Thread serverThread; + private int serverPort; + + @After + public void terminate() throws InterruptedException { + if (serverThread != null) { + serverThread.interrupt(); + serverThread.join(500); + } + } + + @Test + public void zk_monitoring_data_is_parsed_and_reported() throws IOException { + setupTcpServer(() -> "zk_version\t3.4.0\n" + // + "zk_avg_latency\t444\n" + // + "zk_max_latency\t1234\n" + // + "zk_min_latency\t0\n" + // + "zk_packets_received\t71\n" + // + "zk_packets_sent\t70\n" + // + "zk_outstanding_requests\t12\n" + // + "zk_num_alive_connections\t2\n" + // + "zk_server_state\tleader\n" + // + "zk_znode_count\t4\n" + // + "zk_watch_count\t0\n" + // + "zk_ephemerals_count\t0\n" + // + "zk_approximate_data_size\t27\n"); + + ZKMetricUpdater updater = buildUpdater(); + updater.run(); + + Map<String, Long> reportedMetrics = updater.getZKMetrics(); + + assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_CONNECTIONS), equalTo(2L)); + assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_LATENCY_AVERAGE), equalTo(444L)); + assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_LATENCY_MAX), equalTo(1234L)); + assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_OUTSTANDING_REQUESTS), equalTo(12L)); + assertThat(reportedMetrics.get(ZKMetricUpdater.METRIC_ZK_ZNODES), equalTo(4L)); + } + + private ZKMetricUpdater buildUpdater() { + ZookeeperServerConfig zkServerConfig = new ZookeeperServerConfig( + new ZookeeperServerConfig.Builder().clientPort(serverPort).myid(12345)); + ZKMetricUpdater updater = new ZKMetricUpdater(zkServerConfig, 0, -1); + return updater; + } + + private void setupTcpServer(Supplier<String> reportProvider) throws IOException { + ServerSocket serverSocket = new ServerSocket(0); + serverPort = serverSocket.getLocalPort(); + serverThread = Executors.defaultThreadFactory().newThread(() -> { + while (!Thread.interrupted()) { + try (Socket connection = serverSocket.accept()) { + BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String verb = input.readLine(); + if ("mntr".equals(verb)) { + DataOutputStream output = new DataOutputStream(connection.getOutputStream()); + output.write(reportProvider.get().getBytes(StandardCharsets.UTF_8)); + output.close(); + } + } catch (IOException e) { + System.out.println("Error in fake ZK server: " + e.toString()); + } + } + try { + serverSocket.close(); + } catch (IOException e) { + System.out.println("Error closing server socket in fake ZK server: " + e.toString()); + } + }); + serverThread.start(); + } +} |