aboutsummaryrefslogtreecommitdiffstats
path: root/container-di/src/main/scala/com/yahoo/container/di/CloudSubscriberFactory.scala
blob: 23ae21ae84e9982623d9505a0524e8b691b7f0ab (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.di

import java.util.logging.Logger

import com.yahoo.config.ConfigInstance
import com.yahoo.config.subscription.{ConfigHandle, ConfigSource, ConfigSourceSet, ConfigSubscriber}
import com.yahoo.container.di.CloudSubscriberFactory._
import com.yahoo.container.di.config.{Subscriber, SubscriberFactory}
import com.yahoo.log.LogLevel
import com.yahoo.vespa.config.ConfigKey

import scala.collection.JavaConverters._
import scala.language.existentials


/**
 * @author Tony Vaagenes
 */

class CloudSubscriberFactory(configSource: ConfigSource) extends SubscriberFactory
{
  private var testGeneration: Option[Long] = None

  private val activeSubscribers = new java.util.WeakHashMap[CloudSubscriber, Int]()

  override def getSubscriber(configKeys: java.util.Set[_ <: ConfigKey[_]]): Subscriber = {
    val subscriber = new CloudSubscriber(configKeys.asScala.toSet.asInstanceOf[Set[ConfigKeyT]], configSource)

    testGeneration.foreach(subscriber.subscriber.reload(_)) //TODO: test specific code, remove
    activeSubscribers.put(subscriber, 0)

    subscriber
  }

  //TODO: test specific code, remove
  override def reloadActiveSubscribers(generation: Long) {
    testGeneration = Some(generation)

    val l = activeSubscribers.keySet().asScala.toSet
    l.foreach { _.subscriber.reload(generation) }
  }
}

object CloudSubscriberFactory {
  val log = Logger.getLogger(classOf[CloudSubscriberFactory].getName)

  private class CloudSubscriber(keys: Set[ConfigKeyT], configSource: ConfigSource) extends Subscriber
  {
    private[CloudSubscriberFactory] val subscriber = new ConfigSubscriber(configSource)
    private val handles: Map[ConfigKeyT, ConfigHandle[_ <: ConfigInstance]] = keys.map(subscribe).toMap


    //if waitNextGeneration has not yet been called, -1 should be returned
    var generation: Long = -1

    private def subscribe(key: ConfigKeyT) = (key, subscriber.subscribe(key.getConfigClass, key.getConfigId))

    override def configChanged = handles.values.exists(_.isChanged)

    //mapValues returns a view,, so we need to force evaluation of it here to prevent deferred evaluation.
    override def config = handles.mapValues(_.getConfig).toMap.view.force.
      asInstanceOf[Map[ConfigKey[ConfigInstance], ConfigInstance]].asJava

    override def waitNextGeneration() = {
      require(!handles.isEmpty)

      /* Catch and ignore config exceptions due to missing config values for parameters that do
       * not have a default value. These exceptions occur when the user has removed a component
       * from services.xml, and the component takes a config that has parameters without a
       * default value in the def-file. There is a new 'components' config underway, where the
       * component is removed, so this old config generation will soon be replaced by a new one. */
      var gotNextGen = false
      var numExceptions = 0
      while (!gotNextGen) {
        try{
          if (subscriber.nextGeneration())
            gotNextGen = true
        } catch {
          case e: IllegalArgumentException =>
            numExceptions += 1
            log.log(LogLevel.DEBUG, "Ignoring exception from the config library: " + e.getMessage + "\n" + e.getStackTrace)
            if (numExceptions >= 5)
              throw new IllegalArgumentException("Failed retrieving the next config generation.", e)
        }
      }

      generation = subscriber.getGeneration
      generation
    }

    override def close() {
      subscriber.close()
    }
  }


  class Provider extends com.google.inject.Provider[SubscriberFactory] {
    override def get() =  new CloudSubscriberFactory(ConfigSourceSet.createDefault())
  }
}