diff options
author | Martin Polden <mpolden@mpolden.no> | 2020-05-09 15:14:15 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2020-05-09 15:14:15 +0200 |
commit | bc32071a962c93bb207e6da466e20d5c8239017c (patch) | |
tree | 28840f92d30b0611b5e33957de2926e103fcf52c | |
parent | b9bf1ed089ae218998534391d33eb65c31e136f6 (diff) |
Extract queue type
-rw-r--r-- | cache/cache.go | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/cache/cache.go b/cache/cache.go index dc2b831..7505798 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -22,6 +22,11 @@ type Backend interface { Reset() } +type queue struct { + tasks chan func() + wg sync.WaitGroup +} + // Cache is a cache of DNS messages. type Cache struct { client dnsutil.Client @@ -31,8 +36,7 @@ type Cache struct { keys []uint32 mu sync.RWMutex now func() time.Time - queue chan func() - wg sync.WaitGroup + queue *queue } // Value wraps a DNS message stored in the cache. @@ -125,6 +129,8 @@ func NewWithBackend(capacity int, client dnsutil.Client, backend Backend) *Cache return newCache(capacity, client, backend, time.Now) } +func newQueue(capacity int) *queue { return &queue{tasks: make(chan func(), capacity)} } + func newCache(capacity int, client dnsutil.Client, backend Backend, now func() time.Time) *Cache { if capacity < 0 { capacity = 0 @@ -135,12 +141,12 @@ func newCache(capacity int, client dnsutil.Client, backend Backend, now func() t capacity: capacity, values: make(map[uint32]Value, capacity), keys: make([]uint32, 0, capacity), - queue: make(chan func(), 1024), + queue: newQueue(1024), } if backend != nil { c.load(backend) } - go c.readQueue() + go c.queue.consume() return c } @@ -178,7 +184,7 @@ func (c *Cache) load(backend Backend) { // Close consumes any outstanding cache operations. func (c *Cache) Close() error { - c.wg.Wait() + c.queue.wg.Wait() return nil } @@ -200,10 +206,10 @@ func (c *Cache) getValue(key uint32) (*Value, bool) { } if c.isExpired(&v) { if !c.prefetch() { - c.enqueue(func() { c.evictWithLock(key) }) + c.queue.add(func() { c.evictWithLock(key) }) return nil, false } - c.enqueue(func() { c.refresh(key, v.msg) }) + c.queue.add(func() { c.refresh(key, v.msg) }) } return &v, true } @@ -227,7 +233,7 @@ func (c *Cache) List(n int) []Value { // // If prefetching is disabled, the message will be evicted from the cache according to its TTL. // -// If prefetching is enabled, the message will never be evicted, but it will be refreshed when the TTL passes. +// If prefetching is enabled, the message will never be evicted, but it will be refreshed when its TTL passes. // // Setting a new key in a cache that has reached its capacity will evict values in a FIFO order. func (c *Cache) Set(key uint32, msg *dns.Msg) { @@ -243,7 +249,7 @@ func (c *Cache) Stats() Stats { return Stats{ Capacity: c.capacity, Size: len(c.values), - PendingTasks: len(c.queue), + PendingTasks: len(c.queue.tasks), } } @@ -339,15 +345,15 @@ func (c *Cache) isExpired(v *Value) bool { return c.now().After(expiresAt) } -func (c *Cache) enqueue(op func()) { - c.wg.Add(1) - c.queue <- op +func (q *queue) add(task func()) { + q.wg.Add(1) + q.tasks <- task } -func (c *Cache) readQueue() { - for op := range c.queue { - op() - c.wg.Done() +func (q *queue) consume() { + for task := range q.tasks { + task() + q.wg.Done() } } |