aboutsummaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-03-18 09:59:58 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-03-18 09:59:58 +0100
commit157520ffdf9b4e5c9a0fe9589e99b7e06e42c6d7 (patch)
tree78a5d84df098c7f5d6b8ab434432488dfa273446 /configserver
parent99f2cdcefeecf6a1254c021a72c475905216fc37 (diff)
Rewrite ZKMetricUpdater to support ZK with Vespa mTLS
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java63
1 files changed, 35 insertions, 28 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java
index 401588f478c..9b387da1fa4 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/monitoring/ZKMetricUpdater.java
@@ -3,24 +3,25 @@ package com.yahoo.vespa.config.server.monitoring;
import com.yahoo.cloud.config.ZookeeperServerConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.security.tls.MixedMode;
+import com.yahoo.security.tls.TlsContext;
+import com.yahoo.security.tls.TransportSecurityUtils;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousSocketChannel;
+import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,7 +38,6 @@ public class ZKMetricUpdater implements Runnable {
public static final String METRIC_ZK_OUTSTANDING_REQUESTS = getMetricName("zkOutstandingRequests");
private static final int CONNECTION_TIMEOUT_MS = 1000;
- private static final int WRITE_TIMEOUT_MS = 500;
private static final int READ_TIMEOUT_MS = 1000;
private final AtomicReference<Map<String, Long>> zkMetrics = new AtomicReference<>(new HashMap<>());
@@ -82,32 +82,39 @@ public class ZKMetricUpdater implements Runnable {
}
private Optional<String> retrieveReport() {
- try (AsynchronousSocketChannel chan = AsynchronousSocketChannel.open()) {
- InetSocketAddress zkAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort);
- Future<Void> connected = chan.connect(zkAddress);
- connected.get(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
- Future<Integer> written = chan.write(ByteBuffer.wrap("mntr\n".getBytes(StandardCharsets.UTF_8)));
- written.get(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
- int nread = -1;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ByteBuffer buffer = ByteBuffer.allocate(4096);
- do {
- Future<Integer> read = chan.read(buffer);
- nread = read.get(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- buffer.flip();
- baos.write(buffer.array());
- buffer.clear();
- } while (nread >= 0);
-
- return Optional.of(baos.toString(StandardCharsets.UTF_8));
- } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+ try {
+ Socket socket = null;
+ InputStream in = null;
+ OutputStream out = null;
+ try {
+ socket = createSocket();
+ socket.setSoTimeout(READ_TIMEOUT_MS);
+ socket.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), zkPort), CONNECTION_TIMEOUT_MS);
+ in = socket.getInputStream();
+ out = socket.getOutputStream();
+ out.write("mntr\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ return Optional.of(new String(in.readAllBytes(), StandardCharsets.UTF_8));
+ } finally {
+ if (in != null) in.close();
+ if (out != null) out.close();
+ if (socket != null && socket.isConnected()) socket.close();
+ }
+ } catch (Exception e) {
log.warning("Failure in retrieving monitoring data: (" + e.getClass().getSimpleName() + ") " + e.getMessage());
+ log.log(Level.FINE, e, e::toString);
return Optional.empty();
}
}
+ private static Socket createSocket() throws IOException {
+ TlsContext tlsContext = TransportSecurityUtils.getSystemTlsContext().orElse(null);
+ if (tlsContext == null || TransportSecurityUtils.getInsecureMixedMode() != MixedMode.DISABLED) {
+ return new Socket();
+ }
+ return tlsContext.context().getSocketFactory().createSocket();
+ }
+
private static final Pattern MONITORING_REPORT = Pattern.compile("^(\\w+)\\s+(\\d+)$", Pattern.MULTILINE);
private void parseReport(String report) {