aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2020-05-09 15:14:15 +0200
committerMartin Polden <mpolden@mpolden.no>2020-05-09 15:14:15 +0200
commitbc32071a962c93bb207e6da466e20d5c8239017c (patch)
tree28840f92d30b0611b5e33957de2926e103fcf52c
parentb9bf1ed089ae218998534391d33eb65c31e136f6 (diff)
Extract queue type
-rw-r--r--cache/cache.go38
1 files changed, 22 insertions, 16 deletions
diff --git a/cache/cache.go b/cache/cache.go
index dc2b831..7505798 100644
--- a/cache/cache.go
+++ b/cache/cache.go
@@ -22,6 +22,11 @@ type Backend interface {
Reset()
}
+type queue struct {
+ tasks chan func()
+ wg sync.WaitGroup
+}
+
// Cache is a cache of DNS messages.
type Cache struct {
client dnsutil.Client
@@ -31,8 +36,7 @@ type Cache struct {
keys []uint32
mu sync.RWMutex
now func() time.Time
- queue chan func()
- wg sync.WaitGroup
+ queue *queue
}
// Value wraps a DNS message stored in the cache.
@@ -125,6 +129,8 @@ func NewWithBackend(capacity int, client dnsutil.Client, backend Backend) *Cache
return newCache(capacity, client, backend, time.Now)
}
+func newQueue(capacity int) *queue { return &queue{tasks: make(chan func(), capacity)} }
+
func newCache(capacity int, client dnsutil.Client, backend Backend, now func() time.Time) *Cache {
if capacity < 0 {
capacity = 0
@@ -135,12 +141,12 @@ func newCache(capacity int, client dnsutil.Client, backend Backend, now func() t
capacity: capacity,
values: make(map[uint32]Value, capacity),
keys: make([]uint32, 0, capacity),
- queue: make(chan func(), 1024),
+ queue: newQueue(1024),
}
if backend != nil {
c.load(backend)
}
- go c.readQueue()
+ go c.queue.consume()
return c
}
@@ -178,7 +184,7 @@ func (c *Cache) load(backend Backend) {
// Close consumes any outstanding cache operations.
func (c *Cache) Close() error {
- c.wg.Wait()
+ c.queue.wg.Wait()
return nil
}
@@ -200,10 +206,10 @@ func (c *Cache) getValue(key uint32) (*Value, bool) {
}
if c.isExpired(&v) {
if !c.prefetch() {
- c.enqueue(func() { c.evictWithLock(key) })
+ c.queue.add(func() { c.evictWithLock(key) })
return nil, false
}
- c.enqueue(func() { c.refresh(key, v.msg) })
+ c.queue.add(func() { c.refresh(key, v.msg) })
}
return &v, true
}
@@ -227,7 +233,7 @@ func (c *Cache) List(n int) []Value {
//
// If prefetching is disabled, the message will be evicted from the cache according to its TTL.
//
-// If prefetching is enabled, the message will never be evicted, but it will be refreshed when the TTL passes.
+// If prefetching is enabled, the message will never be evicted, but it will be refreshed when its TTL passes.
//
// Setting a new key in a cache that has reached its capacity will evict values in a FIFO order.
func (c *Cache) Set(key uint32, msg *dns.Msg) {
@@ -243,7 +249,7 @@ func (c *Cache) Stats() Stats {
return Stats{
Capacity: c.capacity,
Size: len(c.values),
- PendingTasks: len(c.queue),
+ PendingTasks: len(c.queue.tasks),
}
}
@@ -339,15 +345,15 @@ func (c *Cache) isExpired(v *Value) bool {
return c.now().After(expiresAt)
}
-func (c *Cache) enqueue(op func()) {
- c.wg.Add(1)
- c.queue <- op
+func (q *queue) add(task func()) {
+ q.wg.Add(1)
+ q.tasks <- task
}
-func (c *Cache) readQueue() {
- for op := range c.queue {
- op()
- c.wg.Done()
+func (q *queue) consume() {
+ for task := range q.tasks {
+ task()
+ q.wg.Done()
}
}