diff options
author | Martin Polden <mpolden@mpolden.no> | 2020-01-11 13:41:23 +0100 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2020-01-11 15:14:56 +0100 |
commit | 7bc6b84b2ddd3cdf6fad6ab2568f3e298530f0b0 (patch) | |
tree | 114c3b783d69f991d99eb35a2aa8e94a0c198ffb | |
parent | 541ec0a86c0d29c469bc48bce34493daf20a0ee4 (diff) |
Implement persistent cache backend
-rw-r--r-- | sql/cache.go | 104 | ||||
-rw-r--r-- | sql/cache_test.go | 47 | ||||
-rw-r--r-- | sql/sql.go | 64 |
3 files changed, 215 insertions, 0 deletions
diff --git a/sql/cache.go b/sql/cache.go new file mode 100644 index 0000000..82259bc --- /dev/null +++ b/sql/cache.go @@ -0,0 +1,104 @@ +package sql + +import ( + "log" + "sync" + + "github.com/mpolden/zdns/cache" +) + +const ( + setOp = iota + removeOp + resetOp +) + +type query struct { + op int + key uint32 + value cache.Value +} + +// Cache is a persistent cache. Entries are written to a SQL database. +type Cache struct { + wg sync.WaitGroup + queue chan query + client *Client + logger *log.Logger +} + +// NewCache creates a new cache using client to persist entries. +func NewCache(client *Client, logger *log.Logger) *Cache { + c := &Cache{ + queue: make(chan query, 100), + client: client, + logger: logger, + } + go c.readQueue() + return c +} + +// Close drains and persist queued requests in this cache. +func (c *Cache) Close() error { + c.wg.Wait() + return nil +} + +// Set associates the value v with key. +func (c *Cache) Set(key uint32, v cache.Value) { c.enqueue(query{op: setOp, key: key, value: v}) } + +// Evict removes any value associated with key. +func (c *Cache) Evict(key uint32) { c.enqueue(query{op: removeOp, key: key}) } + +// Reset removes all entries from the cache. +func (c *Cache) Reset() { c.enqueue(query{op: resetOp}) } + +// Read returns all entries in the cache. +func (c *Cache) Read() []cache.Value { + c.wg.Wait() + entries, err := c.client.readCache() + if err != nil { + c.logger.Print(err) + return nil + } + values := make([]cache.Value, 0, len(entries)) + for _, entry := range entries { + unpacked, err := cache.Unpack(entry.Data) + if err != nil { + panic(err) // Should never happen + } + values = append(values, unpacked) + } + return values +} + +func (c *Cache) enqueue(q query) { + c.wg.Add(1) + c.queue <- q +} + +func (c *Cache) readQueue() { + for q := range c.queue { + switch q.op { + case setOp: + packed, err := q.value.Pack() + if err != nil { + c.logger.Fatalf("failed to pack value: %w", err) + } + if err := c.client.writeCacheValue(q.key, packed); err != nil { + c.logger.Printf("failed to write key=%d data=%q: %w", q.key, packed, err) + } + case removeOp: + if err := c.client.removeCacheValue(q.key); err != nil { + c.logger.Printf("failed to remove key=%d: %w", q.key, err) + } + case resetOp: + if err := c.client.truncateCache(); err != nil { + c.logger.Printf("failed to truncate cache: %w", err) + } + default: + c.logger.Printf("unhandled operation %d", q.op) + } + c.wg.Add(-1) + } +} diff --git a/sql/cache_test.go b/sql/cache_test.go new file mode 100644 index 0000000..55dd369 --- /dev/null +++ b/sql/cache_test.go @@ -0,0 +1,47 @@ +package sql + +import ( + "io/ioutil" + "log" + "reflect" + "testing" + + "github.com/mpolden/zdns/cache" +) + +func TestCache(t *testing.T) { + data := "3980405151 1578680472 00000100000100000000000003777777076578616d706c6503636f6d0000010001" + v, err := cache.Unpack(data) + if err != nil { + t.Fatal(err) + } + client := testClient() + logger := log.New(ioutil.Discard, "", 0) + c := NewCache(client, logger) + + // Set and read + c.Set(1, v) + values := c.Read() + if got, want := len(values), 1; got != want { + t.Fatalf("len(values) = %d, want %d", got, want) + } + if got, want := values[0], v; !reflect.DeepEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } + + // Reset and read + c.Reset() + values = c.Read() + if got, want := len(values), 0; got != want { + t.Fatalf("len(values) = %d, want %d", got, want) + } + + // Insert, remove and read + c.Set(1, v) + c.Set(2, v) + c.Evict(1) + values = c.Read() + if got, want := len(values), 1; got != want { + t.Fatalf("len(values) = %d, want %d", got, want) + } +} @@ -53,6 +53,13 @@ CREATE TABLE IF NOT EXISTS log_rr_answer ( FOREIGN KEY (log_id) REFERENCES log(id), FOREIGN KEY (rr_answer_id) REFERENCES rr_answer(id) ); + +CREATE TABLE IF NOT EXISTS cache ( + id INTEGER PRIMARY KEY, + key INTEGER NOT NULL, + data TEXT NOT NULL, + CONSTRAINT key_unique UNIQUE(key) +); ` // Client implements a client for a SQLite database. @@ -72,6 +79,11 @@ type LogEntry struct { Answer string `db:"answer"` } +type cacheEntry struct { + Key uint32 `db:"key"` + Data string `db:"data"` +} + // New creates a new database client for given filename. func New(filename string) (*Client, error) { db, err := sqlx.Connect("sqlite3", filename) @@ -218,3 +230,55 @@ func (c *Client) DeleteLogBefore(t time.Time) (err error) { } return tx.Commit() } + +func (c *Client) writeCacheValue(key uint32, data string) error { + c.mu.Lock() + defer c.mu.Unlock() + tx, err := c.db.Beginx() + if err != nil { + return nil + } + defer tx.Rollback() + query := `INSERT INTO cache (key, data) VALUES ($1, $2) + ON CONFLICT(key) DO UPDATE SET data=excluded.data` + if _, err := tx.Exec(query, key, data); err != nil { + return err + } + return tx.Commit() +} + +func (c *Client) removeCacheValue(key uint32) error { + c.mu.Lock() + defer c.mu.Unlock() + tx, err := c.db.Beginx() + if err != nil { + return nil + } + defer tx.Rollback() + if _, err := tx.Exec("DELETE FROM cache WHERE key = $1", key); err != nil { + return err + } + return tx.Commit() +} + +func (c *Client) truncateCache() error { + c.mu.Lock() + defer c.mu.Unlock() + tx, err := c.db.Beginx() + if err != nil { + return nil + } + defer tx.Rollback() + if _, err := tx.Exec("DELETE FROM cache"); err != nil { + return err + } + return tx.Commit() +} + +func (c *Client) readCache() ([]cacheEntry, error) { + c.mu.RLock() + defer c.mu.RUnlock() + var entries []cacheEntry + err := c.db.Select(&entries, "SELECT key, data FROM cache ORDER BY id ASC") + return entries, err +} |