diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /service-monitor |
Publish
Diffstat (limited to 'service-monitor')
18 files changed, 992 insertions, 0 deletions
diff --git a/service-monitor/OWNERS b/service-monitor/OWNERS new file mode 100644 index 00000000000..90fdb511ae3 --- /dev/null +++ b/service-monitor/OWNERS @@ -0,0 +1 @@ +bakksjo diff --git a/service-monitor/README b/service-monitor/README new file mode 100644 index 00000000000..4d69c8ef3be --- /dev/null +++ b/service-monitor/README @@ -0,0 +1 @@ +a service that gives a list of node/service and their status(up/down) for all instances in a zone diff --git a/service-monitor/pom.xml b/service-monitor/pom.xml new file mode 100644 index 00000000000..89cb8a2b719 --- /dev/null +++ b/service-monitor/pom.xml @@ -0,0 +1,160 @@ +<?xml version="1.0"?> +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>service-monitor</artifactId> + <packaging>container-plugin</packaging> + <version>6-SNAPSHOT</version> + <name>${project.artifactId}</name> + <description>Service monitor component for hosted vespa.</description> + + <dependencies> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> + <groupId>io.reactivex</groupId> + <artifactId>rxjava</artifactId> + <version>1.0.7</version> + </dependency> + <dependency> + <groupId>io.reactivex</groupId> + <artifactId>rxscala_${scala.major-version}</artifactId> + <version>0.23.1</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>configdefinitions</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>component</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>application-model</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.json4s</groupId> + <artifactId>json4s-native_${scala.major-version}</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>scalap</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>annotations</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson2.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + <execution> + <id>compile-scala-classes-for-use-in-java-classes</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + <configuration> + <args> + <arg>-unchecked</arg> + <arg>-deprecation</arg> + <arg>-feature</arg> + </args> + </configuration> + </plugin> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <Import-Package>sun.misc</Import-Package> <!-- needed by rx.internal.util.unsafe.UnsafeAccess --> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-plugin-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <compilerArgs> + <arg>-Xlint:rawtypes</arg> + <arg>-Xlint:unchecked</arg> + <arg>-Xlint:deprecation</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkMode>once</forkMode> + <systemPropertyVariables> + <expectedDefaultConfigGenVersion>${project.version}</expectedDefaultConfigGenVersion> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/JavaSlobrokMonitor.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/JavaSlobrokMonitor.java new file mode 100644 index 00000000000..70b4f05f45d --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/JavaSlobrokMonitor.java @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.api.SlobrokList; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Thin wrapper around {@link Mirror} class, that is a bit nicer to work with. + * + * @author <a href="mailto:bakksjo@yahoo-inc.com">Oyvind Bakksjo</a> + */ +public class JavaSlobrokMonitor { + private final Mirror mirror; + + public JavaSlobrokMonitor(final List<String> slobroks) { + final Supervisor supervisor = new Supervisor(new Transport()); + final SlobrokList slobrokList = new SlobrokList(); + slobrokList.setup(slobroks.toArray(new String[0])); + mirror = new Mirror(supervisor, slobrokList); + } + + public Map<String, String> getRegisteredServices() throws ServiceTemporarilyUnavailableException { + if (!mirror.ready()) { + throw new ServiceTemporarilyUnavailableException("Slobrok mirror not ready"); + } + // TODO: Get _all_ services without resorting to a hack like this. + return Stream.iterate("*", pattern -> pattern + "/*").limit(10) + .flatMap(pattern -> Stream.of(mirror.lookup(pattern))) + .collect(Collectors.toMap(Mirror.Entry::getName, Mirror.Entry::getSpec)); + } + + public void shutdown() { + mirror.shutdown(); + } + + public static class ServiceTemporarilyUnavailableException extends Exception { + public ServiceTemporarilyUnavailableException(final String msg) { + super(msg); + } + } +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitor.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitor.java new file mode 100644 index 00000000000..c0a856a8348 --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitor.java @@ -0,0 +1,23 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import com.yahoo.vespa.applicationmodel.ApplicationInstance; +import com.yahoo.vespa.applicationmodel.ApplicationInstanceReference; + +import java.util.Map; + +/** + * The service monitor interface. A service monitor provides up to date information about the liveness status + * (up, down or not known) of each service instance in a Vespa zone + * + * @author bratseth + */ +public interface ServiceMonitor { + + /** + * Returns the current liveness status (up, down or unknown) of all instances + * of all services of all clusters of all applications in a zone. + */ + Map<ApplicationInstanceReference, ApplicationInstance<ServiceMonitorStatus>> queryStatusOfAllApplicationInstances(); + +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitorStatus.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitorStatus.java new file mode 100644 index 00000000000..b430574404d --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceMonitorStatus.java @@ -0,0 +1,11 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +/** + * @author oyving + */ +public enum ServiceMonitorStatus { + UP, + DOWN, + NOT_CHECKED; +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceNameUtil.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceNameUtil.java new file mode 100644 index 00000000000..2fc63a24ba5 --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/ServiceNameUtil.java @@ -0,0 +1,88 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class provides utilities for working with slobrok-registered service names. + * + * @author <a href="mailto:bakksjo@yahoo-inc.com">Oyvind Bakksjo</a> + */ +public class ServiceNameUtil { + // Utility class; prevents instantiation. + private ServiceNameUtil() { + } + + static Set<String> convertSlobrokServicesToConfigIds(final Set<String> registeredServices) { + return registeredServices.stream() + .map(ALL_RECOGNIZER) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toSet()); + } + + // This is basically a typedef. + private interface ServiceNameRecognizer extends Function<String, Optional<String>> {} + + private static class RegexpServiceNameRecognizer implements ServiceNameRecognizer { + private final Pattern pattern; + private final Function<Matcher, String> nameConverter; + + public RegexpServiceNameRecognizer( + final String patternString, + final Function<Matcher, String> nameConverter) { + this.pattern = Pattern.compile(patternString); + this.nameConverter = nameConverter; + } + + @Override + public Optional<String> apply(final String serviceName) { + final Matcher matcher = pattern.matcher(serviceName); + if (!matcher.matches()) { + return Optional.empty(); + } + return Optional.of(nameConverter.apply(matcher)); + } + } + + private static class SingleGroupRegexpServiceNameRecognizer extends RegexpServiceNameRecognizer { + public SingleGroupRegexpServiceNameRecognizer(final String patternString) { + super(patternString, matcher -> matcher.group(1)); + } + } + + + // TODO: The regexps below almost certainly hard-code names that are dynamically set in config. + + // storage/cluster.basicsearch/storage/0 -> basicsearch/storage/0 + static final ServiceNameRecognizer STORAGENODE_RECOGNIZER = new SingleGroupRegexpServiceNameRecognizer( + "^storage/cluster\\.([^/]+/storage/[^/]+)$"); + + // storage/cluster.basicsearch/distributor/0 -> basicsearch/distributor/0 + static final ServiceNameRecognizer DISTRIBUTOR_RECOGNIZER = new SingleGroupRegexpServiceNameRecognizer( + "^storage/cluster\\.([^/]+/distributor/[^/]+)$"); + + // docproc/cluster.basicsearch.indexing/0/chain.indexing -> docproc/cluster.basicsearch.indexing/0 + static final ServiceNameRecognizer DOCPROC_RECOGNIZER = new SingleGroupRegexpServiceNameRecognizer( + "^(docproc/cluster\\.[^/.]+\\.indexing/[^/]+)/.*$"); + + // basicsearch/search/cluster.basicsearch/0/realtimecontroller -> basicsearch/search/cluster.basicsearch/0 + static final ServiceNameRecognizer SEARCH_RECOGNIZER = new SingleGroupRegexpServiceNameRecognizer( + "^(basicsearch/search/cluster.basicsearch/[^/.]+)/.*$"); + + static final ServiceNameRecognizer ALL_RECOGNIZER = serviceName -> Stream.of( + STORAGENODE_RECOGNIZER, + DISTRIBUTOR_RECOGNIZER, + DOCPROC_RECOGNIZER, + SEARCH_RECOGNIZER) + .map(recognizer -> recognizer.apply(serviceName)) + .filter(optional -> optional.isPresent()) + .findFirst() + .orElse(Optional.empty()); +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SingleClusterServiceMonitor.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SingleClusterServiceMonitor.java new file mode 100644 index 00000000000..4c73d13dcfd --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SingleClusterServiceMonitor.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.component.AbstractComponent; + +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * This class can be used as a component, and will regularly dump known slobrok entries to the log. It will use + * the slobrok service configured for the current application. + * + * It is quite noisy and not useful for anything but testing. + * + * @author <a href="mailto:bakksjo@yahoo-inc.com">Oyvind Bakksjo</a> + */ +public class SingleClusterServiceMonitor extends AbstractComponent { + private static final Logger logger = Logger.getLogger(SingleClusterServiceMonitor.class.getName()); + + private volatile JavaSlobrokMonitor slobrokMonitor; + + public SingleClusterServiceMonitor(final SlobroksConfig slobroksConfig) { + this.slobrokMonitor = new JavaSlobrokMonitor(getSlobrokSpecs(slobroksConfig)); + + new Thread(() -> { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + final JavaSlobrokMonitor slobrok = slobrokMonitor; + if (slobrok == null) { + return; + } + final Map<String, String> services; + try { + services = slobrok.getRegisteredServices(); + } catch (JavaSlobrokMonitor.ServiceTemporarilyUnavailableException e) { + logger.info("Slobrok monitor temporarily unavailable"); + continue; + } + if (services.isEmpty()) { + logger.info("Slobrok lookup returned no entries"); + } + services.forEach((serviceName, serviceSpec) -> + logger.info("Slobrok entry: " + serviceName + " => " + serviceSpec)); + } + }).start(); + } + + @Override + public void deconstruct() { + final JavaSlobrokMonitor slobrok = slobrokMonitor; + slobrokMonitor = null; + // Nothing prevents the mirror from being in use while we are shutting down, but so be it (for now at least). + slobrok.shutdown(); + } + + private static List<String> getSlobrokSpecs(final SlobroksConfig slobroksConfig) { + return slobroksConfig.slobrok().stream() + .map(SlobroksConfig.Slobrok::connectionspec) + .collect(Collectors.toList()); + } +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SlobrokServiceNameUtil.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SlobrokServiceNameUtil.java new file mode 100644 index 00000000000..8261d5e1965 --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/SlobrokServiceNameUtil.java @@ -0,0 +1,61 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.applicationmodel.ConfigId; +import com.yahoo.vespa.applicationmodel.ServiceType; +import com.yahoo.vespa.service.monitor.SlobrokMonitor.SlobrokServiceName; +import scala.Option; + +import java.util.logging.Logger; + +/** + * @author tonytv + */ +public class SlobrokServiceNameUtil { + private static final Logger log = Logger.getLogger(SlobrokServiceNameUtil.class.getName()); + + private static final String configServerServiceTypeString = "configserver"; + public static final ServiceType configServerServiceType = new ServiceType(configServerServiceTypeString); + + private SlobrokServiceNameUtil() {} + + /** + * Returns the name a service instance is registered with in slobrok, + * or empty if the service instance is never registered in slobrok. + */ + public static Option<SlobrokServiceName> serviceName(ServiceType serviceType, ConfigId configId) { + switch (serviceType.s()) { + case "adminserver": + case "config-sentinel": + case "configproxy": + case configServerServiceTypeString: + case "filedistributorservice": + case "logd": + case "logserver": + case "metricsproxy": + case "slobrok": + case "transactionlogserver": + case "ytracecleaner": + return Option.empty(); + + case "topleveldispatch": + return Option.apply(new SlobrokServiceName(configId.s())); + + case "qrserver": + case "container": + case "docprocservice": + case "container-clustercontroller": + return Option.apply(new SlobrokServiceName("vespa/service/" + configId.s())); + + case "searchnode": //TODO: handle only as storagenode instead of both as searchnode/storagenode + return Option.apply(new SlobrokServiceName(configId.s() + "/realtimecontroller")); + case "distributor": + case "storagenode": + return Option.apply(new SlobrokServiceName("storage/cluster." + configId.s())); + default: + log.log(LogLevel.DEBUG, "Unknown service type " + serviceType.s() + " with config id " + configId.s()); + return Option.empty(); + } + } +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/config/ConfigObservableCreator.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/config/ConfigObservableCreator.java new file mode 100644 index 00000000000..3259c9d3ee9 --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/config/ConfigObservableCreator.java @@ -0,0 +1,61 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor.config; + +import com.yahoo.config.ConfigInstance; +import com.yahoo.config.subscription.ConfigHandle; +import com.yahoo.config.subscription.ConfigSource; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.log.LogLevel; +import rx.Observable; + +import java.util.logging.Logger; + +/** + * @author oyving + */ +public class ConfigObservableCreator { + private static final Logger log = Logger.getLogger(ConfigObservableCreator.class.getName()); + + private ConfigObservableCreator() {} + + public static <T extends ConfigInstance> Observable<T> create( + ConfigSource configSource, + Class<T> configClass, + String configId) { + + return Observable.create( + subscriber -> { + try { + final ConfigSubscriber configSubscriber = new ConfigSubscriber(configSource); + + try { + final ConfigHandle<T> configHandle = configSubscriber.subscribe(configClass, configId); + + log.log(LogLevel.DEBUG, "Subscribing to configuration " + configClass + "@" + configId + " from " + configSource); + + while (!subscriber.isUnsubscribed() && !configSubscriber.isClosed()) { + if (configSubscriber.nextGeneration(1000) && configHandle.isChanged()) { + log.log(LogLevel.DEBUG, "Received new configuration: " + configHandle); + T configuration = configHandle.getConfig(); + log.log(LogLevel.DEBUG, "Received new configuration: " + configuration); + subscriber.onNext(configuration); + } else { + log.log(LogLevel.DEBUG, "Configuration tick with no change: " + configHandle); + } + } + } finally { + configSubscriber.close(); + } + + if (!subscriber.isUnsubscribed()) { + subscriber.onCompleted(); + } + } catch (Exception e) { + if (!subscriber.isUnsubscribed()) { + subscriber.onError(e); + } + } + } + ); + } +} diff --git a/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/package-info.java b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/package-info.java new file mode 100644 index 00000000000..02df0f88aea --- /dev/null +++ b/service-monitor/src/main/java/com/yahoo/vespa/service/monitor/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/* + * @author andreer + */ +@ExportPackage +package com.yahoo.vespa.service.monitor; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/service-monitor/src/main/resources/config-models/service-monitor.xml b/service-monitor/src/main/resources/config-models/service-monitor.xml new file mode 100644 index 00000000000..39dc4e57a3e --- /dev/null +++ b/service-monitor/src/main/resources/config-models/service-monitor.xml @@ -0,0 +1,4 @@ +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<components> + <component id="com.yahoo.vespa.service.monitor.IntersectSlobrokAndConfig" bundle="service-monitor" /> +</components> diff --git a/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokAndConfigIntersector.scala b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokAndConfigIntersector.scala new file mode 100644 index 00000000000..a39c82bb0e3 --- /dev/null +++ b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokAndConfigIntersector.scala @@ -0,0 +1,200 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor + +import java.util.{Collections, Optional} +import java.util.logging.Logger + +import com.google.inject.Inject +import com.yahoo.cloud.config.ConfigserverConfig +import com.yahoo.component.AbstractComponent +import com.yahoo.config.subscription.ConfigSourceSet +import com.yahoo.log.LogLevel +import com.yahoo.vespa.applicationmodel._ +import com.yahoo.vespa.config.GenerationCounter +import com.yahoo.vespa.service.monitor.SlobrokAndConfigIntersector._ +import com.yahoo.vespa.service.monitor.SlobrokMonitor._ +import com.yahoo.vespa.service.monitor.config.InstancesObservables +import com.yahoo.vespa.service.monitor.config.InstancesObservables._ +import rx.lang.scala.{Subscription, Observable} + +import scala.collection.convert.decorateAsJava._ +import scala.collection.convert.decorateAsScala._ +import scala.collection.immutable.Set +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** + * Quick and dirty intersection of slobrok and model config. + * @author tonytv + */ + // TODO: This class is the API of the service monitor. It should have a proper name rather than be named after an implementation detail + // TODO: For the same reason, add javadoc +class SlobrokAndConfigIntersector( + configSourceSet: ConfigSourceSet, + //Config servers that are part of an application instance, i.e. not multi-tenant, are included in lb-services config + multiTenantConfigServerHostNames: Set[HostName], + configCounter: GenerationCounter + ) extends AbstractComponent with ServiceMonitor +{ + private val instancesObservables = new InstancesObservables(configSourceSet) + + @Inject + def this(config: ConfigserverConfig, configCounter: GenerationCounter) = { + this( + new ConfigSourceSet(s"tcp/localhost:${config.rpcport()}"), + if (config.multitenant()) configServerHostNames(config) else Set(), + configCounter) + } + + @volatile + private var latestSlobrokMonitorMap: Map[ApplicationInstanceReference, SlobrokMonitor] = Map() + + private val zoneConfigServerCluster: Map[ApplicationInstanceReference, ApplicationInstance[Void]] = + if (multiTenantConfigServerHostNames.isEmpty) Map() + else Map( + ApplicationInstanceReference(syntheticHostedVespaTenantId, configServerApplicationInstanceId ) -> + ApplicationInstance[Void]( + syntheticHostedVespaTenantId, + configServerApplicationInstanceId, + Collections.singleton(ServiceCluster[Void]( + ClusterId("zone-config-servers"), + SlobrokServiceNameUtil.configServerServiceType, + configServer_ServerInstances(multiTenantConfigServerHostNames) + )))) + + @Override + def queryStatusOfAllApplicationInstances() + : java.util.Map[ApplicationInstanceReference, ApplicationInstance[ServiceMonitorStatus]] = + { + val applicationInstanceMap: Map[ApplicationInstanceReference, ApplicationInstance[ServiceMonitorStatus]] = for { + (applicationInstanceReference, applicationInstance) <- latestConfiguredServices() + } yield { + val slobrokMonitor = latestSlobrokMonitorMap.get(applicationInstanceReference) + + def monitoredStatus(serviceType: ServiceType, configId: ConfigId) = { + SlobrokServiceNameUtil.serviceName(serviceType, configId) map { name => + if (slobrokMonitor.exists(_.isRegistered(name))) ServiceMonitorStatus.UP + else ServiceMonitorStatus.DOWN + } getOrElse ServiceMonitorStatus.NOT_CHECKED + } + + val serviceClustersWithStatus = applicationInstance.serviceClusters.asScala.map { serviceCluster => + val serviceInstancesWithStatus = serviceCluster.serviceInstances.asScala.map { serviceInstance => + serviceInstance.copy(serviceStatus = monitoredStatus(serviceCluster.serviceType, serviceInstance.configId)) + } + serviceCluster.copy(serviceInstances = serviceInstancesWithStatus.asJava) + } + val applicationInstanceWithStatus: ApplicationInstance[ServiceMonitorStatus] = new ApplicationInstance( + applicationInstanceReference.tenantId, + applicationInstanceReference.applicationInstanceId, + serviceClustersWithStatus.asJava) + + applicationInstanceReference -> applicationInstanceWithStatus + } + applicationInstanceMap.asJava + } + + instancesObservables.slobroksPerInstance.subscribe { slobrokServiceMap => + val nextSlobrokMonitorMap = slobrokServiceMap.map { case (instanceReference, slobrokServices) => + val slobrokMonitor = latestSlobrokMonitorMap.getOrElse(instanceReference, new SlobrokMonitor()) + slobrokMonitor.setSlobrokConnectionSpecs(asConnectionSpecs(slobrokServices)) + (instanceReference, slobrokMonitor) + } + val removedSlobrokMonitors = (latestSlobrokMonitorMap -- nextSlobrokMonitorMap.keySet).values + latestSlobrokMonitorMap = nextSlobrokMonitorMap + removedSlobrokMonitors.foreach { _.shutdown() } + } + + @volatile private var subscription: Option[Subscription] = None + + private val waitForConfig = Observable.interval(10 seconds) + .map(ignored => configCounter.get()).filter(_ > 0).take(1).subscribe { _ => + subscription = Some(instancesObservables.connect()) + } + + Observable.interval(10 seconds).subscribe { _ => + val applicationInstances: Map[ApplicationInstanceReference, ApplicationInstance[ServiceMonitorStatus]] = + queryStatusOfAllApplicationInstances().asScala.toMap + logServiceStatus(applicationInstances) + } + + object latestConfiguredServices { + private val mostRecentServicesIterator = + instancesObservables.servicesPerInstance. + map(_ ++ zoneConfigServerCluster). + toBlocking. + mostRecent(initialValue = zoneConfigServerCluster). + iterator + + def apply() = mostRecentServicesIterator.synchronized { + mostRecentServicesIterator.next() + } + } + + override def deconstruct(): Unit = { + waitForConfig.unsubscribe() + subscription.foreach(sub => sub.unsubscribe()) + } +} + +object SlobrokAndConfigIntersector { + private val log = Logger.getLogger(getClass.getName) + + val syntheticHostedVespaTenantId = TenantId("hosted-vespa") + val configServerApplicationInstanceId = ApplicationInstanceId("zone-config-servers") + + implicit class AsJavaOptional[T <: AnyRef](private val option: Option[T]) extends AnyVal { + def asJava: Optional[T] = option match { + case Some(v) => Optional.of(v) + case None => Optional.empty() + } + } + + def selectFirst[A, B](a: A, b: B) = a + + private def convertSlobrokServicesToConfigIds(registeredServiceNames: Set[SlobrokServiceName]): Set[ConfigId] = { + val registeredServiceNamesJavaSet: java.util.Set[String] = registeredServiceNames.map { _.s }.asJava + val configIdsJavaSet: java.util.Set[String] = ServiceNameUtil.convertSlobrokServicesToConfigIds(registeredServiceNamesJavaSet) + configIdsJavaSet.asScala.toSet.map { x: String => new ConfigId(x) } + } + + private def logServiceStatus(instanceMap: Map[ApplicationInstanceReference, ApplicationInstance[ServiceMonitorStatus]]): Unit = { + instanceMap.values.foreach(logServiceStatus) + } + + private def logServiceStatus(applicationInstance: ApplicationInstance[ServiceMonitorStatus]): Unit = { + val serviceInstances = + for { + serviceCluster <- applicationInstance.serviceClusters.asScala + serviceInstance <- serviceCluster.serviceInstances.asScala + } yield serviceInstance + + val serviceInstancesGroupedByStatus = serviceInstances.groupBy(_.serviceStatus) + + def mkString(services: Traversable[ServiceInstance[ServiceMonitorStatus]]) = services.mkString("\t", "\n\t", "\n") + + log.log(LogLevel.DEBUG, s"For tenant ${applicationInstance.tenantId}, application instance ${applicationInstance.applicationInstanceId}\n" + + serviceInstancesGroupedByStatus.map { case (monitoredStatus, serviceInstances) => + s" $monitoredStatus\n" + mkString(serviceInstances) + }.mkString("\n")) + } + + private def asConnectionSpecs(slobroks: Traversable[SlobrokService]): Traversable[String] = + slobroks map { case SlobrokService(hostName, port) => s"tcp/$hostName:$port" } + + private def configServerHostNames(config: ConfigserverConfig): Set[HostName] = + //Each Zookeeper server in this config is started by a config server. + //Each config server starts a single zookeeper server. + config.zookeeperserver().asScala map {server => HostName(server.hostname())} toSet + + private def configServer_ServerInstances(multiTenantConfigServerHostNames: Set[HostName]) + : java.util.Set[ServiceInstance[Void]] = + { + def serviceInstance(hostName: HostName) = ServiceInstance[Void]( + ConfigId("configId." + hostName.s), + hostName, + serviceStatus = null) + + multiTenantConfigServerHostNames map serviceInstance asJava + } +} diff --git a/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokMonitor.scala b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokMonitor.scala new file mode 100644 index 00000000000..652a59260b0 --- /dev/null +++ b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/SlobrokMonitor.scala @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor + +import com.yahoo.jrt.slobrok.api.Mirror.Entry +import com.yahoo.jrt.slobrok.api.{Mirror, SlobrokList} +import com.yahoo.jrt.{Transport, Supervisor} +import com.yahoo.vespa.service.monitor.SlobrokMonitor._ +import scala.collection.convert.wrapAsJava._ + +/** + * @author <a href="mailto:bakksjo@yahoo-inc.com">Oyvind Bakksjo</a> + */ +class SlobrokMonitor { + private val supervisor: Supervisor = new Supervisor(new Transport()) + private val slobrokList = new SlobrokList() + private val mirror = new Mirror(supervisor, slobrokList) + + def setSlobrokConnectionSpecs(slobrokConnectionSpecs: Traversable[String]): Unit = { + val slobrokConnectionSpecsJavaList: java.util.List[String] = slobrokConnectionSpecs.toList + slobrokList.setup(slobrokConnectionSpecsJavaList.toArray(new Array[java.lang.String](0))) + } + + def getRegisteredServices(): Map[SlobrokServiceName, SlobrokServiceSpec] = { + if (!mirror.ready()) { + return Map() + } + val mirrorEntries: Array[Entry] = mirror.lookup("**") + mirrorEntries.map { mirrorEntry => + (SlobrokServiceName(mirrorEntry.getName), SlobrokServiceSpec(mirrorEntry.getSpec)) + }.toMap + } + + def isRegistered(serviceName: SlobrokServiceName): Boolean = { + mirror.lookup(serviceName.s).length != 0 + } + + def shutdown(): Unit = { + mirror.shutdown() + } +} + +object SlobrokMonitor { + case class SlobrokServiceName(s: String) + case class SlobrokServiceSpec(s: String) +} diff --git a/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/config/InstancesObservables.scala b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/config/InstancesObservables.scala new file mode 100644 index 00000000000..a78744d184a --- /dev/null +++ b/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/config/InstancesObservables.scala @@ -0,0 +1,118 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor.config + +import java.util.concurrent.TimeUnit +import java.util.logging.{Level, Logger} + +import com.yahoo.cloud.config.LbServicesConfig +import com.yahoo.cloud.config.LbServicesConfig.Tenants.Applications.Hosts +import com.yahoo.cloud.config.LbServicesConfig.Tenants.Applications.Hosts.Services.Ports +import com.yahoo.config.subscription.ConfigSourceSet +import com.yahoo.vespa.applicationmodel.{ApplicationInstanceId, TenantId, ServiceClusterKey, ServiceCluster, ServiceInstance, ApplicationInstance, ServiceType, HostName, ConfigId, ClusterId, ApplicationInstanceReference} +import com.yahoo.vespa.service.monitor.config.InstancesObservables._ +import rx.lang.scala.JavaConversions._ +import rx.lang.scala.{Observable, Subscription} +import rx.schedulers.Schedulers + +import scala.collection.convert.decorateAsJava._ +import scala.collection.convert.wrapAsScala._ +import scala.concurrent.duration._ + +/** + * Provides streams of slobroks and services per application instance. + * @author tonytv + */ +class InstancesObservables(configSourceSet: ConfigSourceSet) { + private val lbServicesConfigObservable = + toScalaObservable(ConfigObservableCreator. + create(configSourceSet, classOf[LbServicesConfig], "*")). + retryWhen( + _.flatMap { throwable => + val delay = Duration(30, TimeUnit.SECONDS) + log.log(Level.WARNING, s"Subscription to LbServicesConfig failed, sources=$configSourceSet. Retrying in $delay", throwable) + Observable.timer(delay) + }). + map(asInstanceReferenceToHostConfigMap). + subscribeOn(Schedulers.io()). + publish + + + val slobroksPerInstance: Observable[Map[ApplicationInstanceReference, Traversable[SlobrokService]]] = + lbServicesConfigObservable.map { _.mapValues(extractSlobrokServices).toMap } + + + val servicesPerInstance: Observable[Map[ApplicationInstanceReference, ApplicationInstance[Void]]] = + lbServicesConfigObservable.map( _.map { case (applicationInstanceReference, x) => + val serviceClusters: java.util.Set[ServiceCluster[Void]] = asServiceClusterSet(x) + val applicationInstance = ApplicationInstance( + applicationInstanceReference.tenantId, + applicationInstanceReference.applicationInstanceId, + serviceClusters) + applicationInstanceReference -> applicationInstance + }.toMap) + + def connect(): Subscription = lbServicesConfigObservable.connect +} + +object InstancesObservables { + private val log = Logger.getLogger(getClass.getName) + + case class SlobrokService(hostName: String, port: Integer) + + + + private def asInstanceReferenceToHostConfigMap(config: LbServicesConfig) = { + for { + (tenantIdString, tenantConfig) <- config.tenants() + (applicationIdString, applicationConfig) <- tenantConfig.applications() + } yield { + val applicationInstanceReference: ApplicationInstanceReference = ApplicationInstanceReference( + TenantId(tenantIdString), + ApplicationInstanceId(applicationIdString)) + + (applicationInstanceReference, applicationConfig.hosts()) + } + } + + private def extractSlobrokServices(hostsConfigs: java.util.Map[String, Hosts]): Traversable[SlobrokService] = { + def rpcPort(ports: Traversable[Ports]) = ports.collectFirst { + case port if port.tags().contains("rpc") => port.number() + } + + for { + (hostName, hostConfig) <- hostsConfigs + slobrokService <- Option(hostConfig.services("slobrok")) + } yield SlobrokService( + hostName, + rpcPort(slobrokService.ports()).getOrElse(throw new RuntimeException("Found slobrok without rpc port"))) + } + + private def asServiceClusterSet(hostsConfigs: java.util.Map[String, Hosts]) + : java.util.Set[ServiceCluster[Void]] = { + + val serviceInstancesGroupedByCluster: Map[ServiceClusterKey, Iterable[ServiceInstance[Void]]] = (for { + (hostName, hostConfig) <- hostsConfigs.view + (serviceName, servicesConfig) <- hostConfig.services() + } yield { + (ServiceClusterKey(ClusterId(servicesConfig.clustername()), ServiceType(servicesConfig.`type`())), + ServiceInstance(ConfigId(servicesConfig.configId()), HostName(hostName), null.asInstanceOf[Void])) + }).groupByKeyWithValue(_._1, _._2) + + val serviceClusterSet: Set[ServiceCluster[Void]] = serviceInstancesGroupedByCluster.map { + case (serviceClusterKey, serviceInstances) => + ServiceCluster( + serviceClusterKey.clusterId, + serviceClusterKey.serviceType, + serviceInstances.toSet.asJava) + }.toSet + + serviceClusterSet.asJava + } + + implicit class IterableWithImprovedGrouping[A](val iterable: Iterable[A]) { + def groupByKeyWithValue[K, V](keyExtractor: (A) => K, valueExtractor: (A) => V): Map[K, Iterable[V]] = { + val groupedByKey: Map[K, Iterable[A]] = iterable.groupBy(keyExtractor) + groupedByKey.mapValues(_.map(valueExtractor)) + } + } +} diff --git a/service-monitor/src/test/java/com/yahoo/vespa/service/monitor/ServiceNameUtilTest.java b/service-monitor/src/test/java/com/yahoo/vespa/service/monitor/ServiceNameUtilTest.java new file mode 100644 index 00000000000..f339b149317 --- /dev/null +++ b/service-monitor/src/test/java/com/yahoo/vespa/service/monitor/ServiceNameUtilTest.java @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.service.monitor; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ServiceNameUtilTest { + @Test + public void testConversionToConfigId() { + final Set<String> slobrokServices = new HashSet<>(); + slobrokServices.add("storage/cluster.basicsearch/storage/0"); + slobrokServices.add("storage/cluster.basicsearch/distributor/0"); + slobrokServices.add("docproc/cluster.basicsearch.indexing/0/chain.indexing"); + slobrokServices.add("storage/cluster.basicsearch/distributor/0/default"); + slobrokServices.add("storage/cluster.basicsearch/storage/0/default"); + slobrokServices.add("basicsearch/search/cluster.basicsearch/0/realtimecontroller"); + + final Set<String> configIds = ServiceNameUtil.convertSlobrokServicesToConfigIds(slobrokServices); + + assertThat(configIds, + is(setOf( + "basicsearch/search/cluster.basicsearch/0", + "basicsearch/distributor/0", + "basicsearch/storage/0", + "docproc/cluster.basicsearch.indexing/0"))); + } + + @SafeVarargs + private static <T> Set<T> setOf(T... t) { + return new HashSet<>(Arrays.asList(t)); + } +} diff --git a/service-monitor/src/test/scala/PrintInstanceObservables.scala b/service-monitor/src/test/scala/PrintInstanceObservables.scala new file mode 100644 index 00000000000..6a51fa0eb16 --- /dev/null +++ b/service-monitor/src/test/scala/PrintInstanceObservables.scala @@ -0,0 +1,33 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.config.subscription.ConfigSourceSet +import com.yahoo.vespa.applicationmodel.{ApplicationInstance, ApplicationInstanceReference} +import com.yahoo.vespa.service.monitor.config.InstancesObservables + +import org.json4s.native.Serialization +import org.json4s.{CustomKeySerializer, NoTypeHints} + + +/** + * @author tonytv + */ +object PrintInstanceObservables { + def main(args: Array[String]): Unit = { + val sourceSet = new ConfigSourceSet("tcp/test1-node:19070") + + val observables = new InstancesObservables(sourceSet) + + observables.servicesPerInstance.subscribe(prettyPrint _) + observables.slobroksPerInstance.subscribe(println(_)) + val subscription = observables.connect() + + Thread.sleep(100000) + subscription.unsubscribe() + } + + private def prettyPrint(map: Map[ApplicationInstanceReference, ApplicationInstance[Void]]): Unit = { + implicit val formats = Serialization.formats(NoTypeHints) + + new CustomKeySerializer[Object](formats => ({case string => ???} , { case ref: AnyRef => ref.toString })) + + println(Serialization.writePretty(map)) + } +} diff --git a/service-monitor/src/test/scala/PrintServiceStates.scala b/service-monitor/src/test/scala/PrintServiceStates.scala new file mode 100644 index 00000000000..455010165cf --- /dev/null +++ b/service-monitor/src/test/scala/PrintServiceStates.scala @@ -0,0 +1,24 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import com.yahoo.config.subscription.ConfigSourceSet +import com.yahoo.vespa.config.GenerationCounter +import com.yahoo.vespa.applicationmodel.HostName +import com.yahoo.vespa.service.monitor.SlobrokAndConfigIntersector + +/** + * Quick hack to just see some results. + * + * @author <a href="mailto:bakksjo@yahoo-inc.com">Oyvind Bakksjo</a> + */ +object PrintServiceStates { + def main(args: Array[String]): Unit = { + val intersector = new SlobrokAndConfigIntersector( + new ConfigSourceSet("tcp/test1-node:19070"), + multiTenantConfigServerHostNames = Set("config-server1", "config-server2") map HostName, + new GenerationCounter { + override def increment = ??? + override def get = 1L + }) + Thread.sleep(100000) + intersector.deconstruct() + } +} |