diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-03-18 09:59:58 +0100 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-03-18 09:59:58 +0100 |
commit | 157520ffdf9b4e5c9a0fe9589e99b7e06e42c6d7 (patch) | |
tree | 78a5d84df098c7f5d6b8ab434432488dfa273446 /configserver | |
parent | 99f2cdcefeecf6a1254c021a72c475905216fc37 (diff) |
Rewrite ZKMetricUpdater to support ZK with Vespa mTLS
Diffstat (limited to 'configserver')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java | 63 |
1 files changed, 35 insertions, 28 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java index 401588f478c..9b387da1fa4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java @@ -3,24 +3,25 @@ package com.yahoo.vespa.config.server.monitoring; import com.yahoo.cloud.config.ZookeeperServerConfig; import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.security.tls.MixedMode; +import com.yahoo.security.tls.TlsContext; +import com.yahoo.security.tls.TransportSecurityUtils; -import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousSocketChannel; +import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -37,7 +38,6 @@ public class ZKMetricUpdater implements Runnable { public static final String METRIC_ZK_OUTSTANDING_REQUESTS = getMetricName("zkOutstandingRequests"); private static final int CONNECTION_TIMEOUT_MS = 1000; - private static final int WRITE_TIMEOUT_MS = 500; private static final int READ_TIMEOUT_MS = 1000; private final AtomicReference<Map<String, Long>> zkMetrics = new AtomicReference<>(new HashMap<>()); @@ -82,32 +82,39 @@ public class ZKMetricUpdater implements Runnable { } private Optional<String> retrieveReport() { - try (AsynchronousSocketChannel chan = AsynchronousSocketChannel.open()) { - InetSocketAddress zkAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort); - Future<Void> connected = chan.connect(zkAddress); - connected.get(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - Future<Integer> written = chan.write(ByteBuffer.wrap("mntr\n".getBytes(StandardCharsets.UTF_8))); - written.get(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - int nread = -1; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ByteBuffer buffer = ByteBuffer.allocate(4096); - do { - Future<Integer> read = chan.read(buffer); - nread = read.get(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS); - buffer.flip(); - baos.write(buffer.array()); - buffer.clear(); - } while (nread >= 0); - - return Optional.of(baos.toString(StandardCharsets.UTF_8)); - } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { + try { + Socket socket = null; + InputStream in = null; + OutputStream out = null; + try { + socket = createSocket(); + socket.setSoTimeout(READ_TIMEOUT_MS); + socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort), CONNECTION_TIMEOUT_MS); + in = socket.getInputStream(); + out = socket.getOutputStream(); + out.write("mntr\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + return Optional.of(new String(in.readAllBytes(), StandardCharsets.UTF_8)); + } finally { + if (in != null) in.close(); + if (out != null) out.close(); + if (socket != null && socket.isConnected()) socket.close(); + } + } catch (Exception e) { log.warning("Failure in retrieving monitoring data: (" + e.getClass().getSimpleName() + ") " + e.getMessage()); + log.log(Level.FINE, e, e::toString); return Optional.empty(); } } + private static Socket createSocket() throws IOException { + TlsContext tlsContext = TransportSecurityUtils.getSystemTlsContext().orElse(null); + if (tlsContext == null || TransportSecurityUtils.getInsecureMixedMode() != MixedMode.DISABLED) { + return new Socket(); + } + return tlsContext.context().getSocketFactory().createSocket(); + } + private static final Pattern MONITORING_REPORT = Pattern.compile("^(\\w+)\\s+(\\d+)$", Pattern.MULTILINE); private void parseReport(String report) { |