summaryrefslogtreecommitdiffstats
path: root/service-monitor/src/main/scala/com/yahoo/vespa/service/monitor/config/InstancesObservables.scala
blob: ac9b8f5e5b0104b033d7802a154dbb851d3d22d0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.service.monitor.config

import java.util.concurrent.TimeUnit
import java.util.logging.{Level, Logger}

import com.yahoo.cloud.config.LbServicesConfig
import com.yahoo.cloud.config.LbServicesConfig.Tenants.Applications.Hosts
import com.yahoo.cloud.config.LbServicesConfig.Tenants.Applications.Hosts.Services.Ports
import com.yahoo.config.subscription.ConfigSourceSet
import com.yahoo.vespa.applicationmodel.{ApplicationInstance, ApplicationInstanceId, ApplicationInstanceReference, ClusterId, ConfigId, HostName, ServiceCluster, ServiceClusterKey, ServiceInstance, ServiceType, TenantId}
import com.yahoo.vespa.service.monitor.config.InstancesObservables._
import rx.lang.scala.JavaConversions._
import rx.lang.scala.{Observable, Subscription}
import rx.schedulers.Schedulers

import scala.collection.JavaConverters._
import scala.concurrent.duration._

/**
 * Provides streams of slobroks and services per application instance.
 * @author tonytv
 */
class InstancesObservables(configSourceSet: ConfigSourceSet) {
  private val lbServicesConfigObservable =
    toScalaObservable(ConfigObservableCreator.
      create(configSourceSet, classOf[LbServicesConfig], "*")).
      retryWhen(
        _.flatMap { throwable =>
          val delay = Duration(30, TimeUnit.SECONDS)
          log.log(Level.WARNING, s"Subscription to LbServicesConfig failed, sources=$configSourceSet. Retrying in $delay", throwable)
          Observable.timer(delay)
        }).
      map(asInstanceReferenceToHostConfigMap).
      subscribeOn(Schedulers.io()).
      publish


  val slobroksPerInstance: Observable[Map[ApplicationInstanceReference, Traversable[SlobrokService]]] =
    lbServicesConfigObservable.map { _.mapValues(extractSlobrokServices).toMap  }


  val servicesPerInstance: Observable[Map[ApplicationInstanceReference, ApplicationInstance[Void]]] =
    lbServicesConfigObservable.map( _.map { case (applicationInstanceReference, x) =>
      val serviceClusters: java.util.Set[ServiceCluster[Void]] = asServiceClusterSet(x)
      val applicationInstance = new ApplicationInstance(
        applicationInstanceReference.tenantId,
        applicationInstanceReference.applicationInstanceId,
        serviceClusters)
      applicationInstanceReference -> applicationInstance
    }.toMap)

  def connect(): Subscription = lbServicesConfigObservable.connect
}

object InstancesObservables {
  private val log = Logger.getLogger(getClass.getName)

  case class SlobrokService(hostName: String, port: Integer)



  private def asInstanceReferenceToHostConfigMap(config: LbServicesConfig) = {
    for {
      (tenantIdString, tenantConfig) <- config.tenants().asScala
      (applicationIdString, applicationConfig) <- tenantConfig.applications().asScala
    } yield {
      val applicationInstanceReference: ApplicationInstanceReference = new ApplicationInstanceReference(
        new TenantId(tenantIdString),
        new ApplicationInstanceId(applicationIdString))

      (applicationInstanceReference, applicationConfig.hosts())
    }
  }

  private def extractSlobrokServices(hostsConfigs: java.util.Map[String, Hosts]): Traversable[SlobrokService] = {
    def rpcPort(ports: Traversable[Ports]) = ports.collectFirst {
      case port if port.tags().contains("rpc") => port.number()
    }

    for {
      (hostName, hostConfig) <- hostsConfigs.asScala
      slobrokService <- Option(hostConfig.services("slobrok"))
    } yield SlobrokService(
      hostName,
      rpcPort(slobrokService.ports().asScala).getOrElse(throw new RuntimeException("Found slobrok without rpc port")))
  }

  private def asServiceClusterSet(hostsConfigs: java.util.Map[String, Hosts])
  : java.util.Set[ServiceCluster[Void]] = {

    val serviceInstancesGroupedByCluster: Map[ServiceClusterKey, Iterable[ServiceInstance[Void]]] = (for {
      (hostName, hostConfig) <- hostsConfigs.asScala.view
      (serviceName, servicesConfig) <- hostConfig.services().asScala
    } yield {
      (new ServiceClusterKey(new ClusterId(servicesConfig.clustername()), new ServiceType(servicesConfig.`type`())),
        new ServiceInstance(new ConfigId(servicesConfig.configId()), new HostName(hostName), null.asInstanceOf[Void]))
    }).groupByKeyWithValue(_._1, _._2)

    val serviceClusterSet: Set[ServiceCluster[Void]] = serviceInstancesGroupedByCluster.map {
      case (serviceClusterKey, serviceInstances) =>
        new ServiceCluster(
          serviceClusterKey.clusterId,
          serviceClusterKey.serviceType,
          serviceInstances.toSet.asJava)
    }.toSet

    serviceClusterSet.asJava
  }
  
  implicit class IterableWithImprovedGrouping[A](val iterable: Iterable[A]) {
    def groupByKeyWithValue[K, V](keyExtractor: (A) => K, valueExtractor: (A) => V): Map[K, Iterable[V]] = {
      val groupedByKey: Map[K, Iterable[A]] = iterable.groupBy(keyExtractor)
      groupedByKey.mapValues(_.map(valueExtractor))
    }
  }
}