1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.di
import java.util.logging.Logger
import com.yahoo.config.ConfigInstance
import com.yahoo.config.subscription.{ConfigHandle, ConfigSource, ConfigSourceSet, ConfigSubscriber}
import com.yahoo.container.di.CloudSubscriberFactory._
import com.yahoo.container.di.config.{Subscriber, SubscriberFactory}
import com.yahoo.log.LogLevel
import com.yahoo.vespa.config.ConfigKey
import scala.collection.JavaConverters._
import scala.language.existentials
/**
* @author Tony Vaagenes
*/
class CloudSubscriberFactory(configSource: ConfigSource) extends SubscriberFactory
{
private var testGeneration: Option[Long] = None
private val activeSubscribers = new java.util.WeakHashMap[CloudSubscriber, Int]()
override def getSubscriber(configKeys: java.util.Set[_ <: ConfigKey[_]]): Subscriber = {
val subscriber = new CloudSubscriber(configKeys.asScala.toSet.asInstanceOf[Set[ConfigKeyT]], configSource)
testGeneration.foreach(subscriber.subscriber.reload(_)) //TODO: test specific code, remove
activeSubscribers.put(subscriber, 0)
subscriber
}
//TODO: test specific code, remove
override def reloadActiveSubscribers(generation: Long) {
testGeneration = Some(generation)
val l = activeSubscribers.keySet().asScala.toSet
l.foreach { _.subscriber.reload(generation) }
}
}
object CloudSubscriberFactory {
val log = Logger.getLogger(classOf[CloudSubscriberFactory].getName)
private class CloudSubscriber(keys: Set[ConfigKeyT], configSource: ConfigSource) extends Subscriber
{
private[CloudSubscriberFactory] val subscriber = new ConfigSubscriber(configSource)
private val handles: Map[ConfigKeyT, ConfigHandle[_ <: ConfigInstance]] = keys.map(subscribe).toMap
//if waitNextGeneration has not yet been called, -1 should be returned
var generation: Long = -1
private def subscribe(key: ConfigKeyT) = (key, subscriber.subscribe(key.getConfigClass, key.getConfigId))
override def configChanged = handles.values.exists(_.isChanged)
//mapValues returns a view,, so we need to force evaluation of it here to prevent deferred evaluation.
override def config = handles.mapValues(_.getConfig).toMap.view.force.
asInstanceOf[Map[ConfigKey[ConfigInstance], ConfigInstance]].asJava
override def waitNextGeneration() = {
require(!handles.isEmpty)
/* Catch and ignore config exceptions due to missing config values for parameters that do
* not have a default value. These exceptions occur when the user has removed a component
* from services.xml, and the component takes a config that has parameters without a
* default value in the def-file. There is a new 'components' config underway, where the
* component is removed, so this old config generation will soon be replaced by a new one. */
var gotNextGen = false
var numExceptions = 0
while (!gotNextGen) {
try{
if (subscriber.nextGeneration())
gotNextGen = true
} catch {
case e: IllegalArgumentException =>
numExceptions += 1
log.log(LogLevel.DEBUG, "Ignoring exception from the config library: " + e.getMessage + "\n" + e.getStackTrace)
if (numExceptions >= 5)
throw new IllegalArgumentException("Failed retrieving the next config generation.", e)
}
}
generation = subscriber.getGeneration
generation
}
override def close() {
subscriber.close()
}
}
class Provider extends com.google.inject.Provider[SubscriberFactory] {
override def get() = new CloudSubscriberFactory(ConfigSourceSet.createDefault())
}
}
|