aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2020-01-11 13:41:23 +0100
committerMartin Polden <mpolden@mpolden.no>2020-01-11 15:14:56 +0100
commit7bc6b84b2ddd3cdf6fad6ab2568f3e298530f0b0 (patch)
tree114c3b783d69f991d99eb35a2aa8e94a0c198ffb
parent541ec0a86c0d29c469bc48bce34493daf20a0ee4 (diff)
Implement persistent cache backend
-rw-r--r--sql/cache.go104
-rw-r--r--sql/cache_test.go47
-rw-r--r--sql/sql.go64
3 files changed, 215 insertions, 0 deletions
diff --git a/sql/cache.go b/sql/cache.go
new file mode 100644
index 0000000..82259bc
--- /dev/null
+++ b/sql/cache.go
@@ -0,0 +1,104 @@
+package sql
+
+import (
+ "log"
+ "sync"
+
+ "github.com/mpolden/zdns/cache"
+)
+
+const (
+ setOp = iota
+ removeOp
+ resetOp
+)
+
+type query struct {
+ op int
+ key uint32
+ value cache.Value
+}
+
+// Cache is a persistent cache. Entries are written to a SQL database.
+type Cache struct {
+ wg sync.WaitGroup
+ queue chan query
+ client *Client
+ logger *log.Logger
+}
+
+// NewCache creates a new cache using client to persist entries.
+func NewCache(client *Client, logger *log.Logger) *Cache {
+ c := &Cache{
+ queue: make(chan query, 100),
+ client: client,
+ logger: logger,
+ }
+ go c.readQueue()
+ return c
+}
+
+// Close drains and persist queued requests in this cache.
+func (c *Cache) Close() error {
+ c.wg.Wait()
+ return nil
+}
+
+// Set associates the value v with key.
+func (c *Cache) Set(key uint32, v cache.Value) { c.enqueue(query{op: setOp, key: key, value: v}) }
+
+// Evict removes any value associated with key.
+func (c *Cache) Evict(key uint32) { c.enqueue(query{op: removeOp, key: key}) }
+
+// Reset removes all entries from the cache.
+func (c *Cache) Reset() { c.enqueue(query{op: resetOp}) }
+
+// Read returns all entries in the cache.
+func (c *Cache) Read() []cache.Value {
+ c.wg.Wait()
+ entries, err := c.client.readCache()
+ if err != nil {
+ c.logger.Print(err)
+ return nil
+ }
+ values := make([]cache.Value, 0, len(entries))
+ for _, entry := range entries {
+ unpacked, err := cache.Unpack(entry.Data)
+ if err != nil {
+ panic(err) // Should never happen
+ }
+ values = append(values, unpacked)
+ }
+ return values
+}
+
+func (c *Cache) enqueue(q query) {
+ c.wg.Add(1)
+ c.queue <- q
+}
+
+func (c *Cache) readQueue() {
+ for q := range c.queue {
+ switch q.op {
+ case setOp:
+ packed, err := q.value.Pack()
+ if err != nil {
+ c.logger.Fatalf("failed to pack value: %w", err)
+ }
+ if err := c.client.writeCacheValue(q.key, packed); err != nil {
+ c.logger.Printf("failed to write key=%d data=%q: %w", q.key, packed, err)
+ }
+ case removeOp:
+ if err := c.client.removeCacheValue(q.key); err != nil {
+ c.logger.Printf("failed to remove key=%d: %w", q.key, err)
+ }
+ case resetOp:
+ if err := c.client.truncateCache(); err != nil {
+ c.logger.Printf("failed to truncate cache: %w", err)
+ }
+ default:
+ c.logger.Printf("unhandled operation %d", q.op)
+ }
+ c.wg.Add(-1)
+ }
+}
diff --git a/sql/cache_test.go b/sql/cache_test.go
new file mode 100644
index 0000000..55dd369
--- /dev/null
+++ b/sql/cache_test.go
@@ -0,0 +1,47 @@
+package sql
+
+import (
+ "io/ioutil"
+ "log"
+ "reflect"
+ "testing"
+
+ "github.com/mpolden/zdns/cache"
+)
+
+func TestCache(t *testing.T) {
+ data := "3980405151 1578680472 00000100000100000000000003777777076578616d706c6503636f6d0000010001"
+ v, err := cache.Unpack(data)
+ if err != nil {
+ t.Fatal(err)
+ }
+ client := testClient()
+ logger := log.New(ioutil.Discard, "", 0)
+ c := NewCache(client, logger)
+
+ // Set and read
+ c.Set(1, v)
+ values := c.Read()
+ if got, want := len(values), 1; got != want {
+ t.Fatalf("len(values) = %d, want %d", got, want)
+ }
+ if got, want := values[0], v; !reflect.DeepEqual(got, want) {
+ t.Errorf("got %+v, want %+v", got, want)
+ }
+
+ // Reset and read
+ c.Reset()
+ values = c.Read()
+ if got, want := len(values), 0; got != want {
+ t.Fatalf("len(values) = %d, want %d", got, want)
+ }
+
+ // Insert, remove and read
+ c.Set(1, v)
+ c.Set(2, v)
+ c.Evict(1)
+ values = c.Read()
+ if got, want := len(values), 1; got != want {
+ t.Fatalf("len(values) = %d, want %d", got, want)
+ }
+}
diff --git a/sql/sql.go b/sql/sql.go
index eeafd90..ac39e09 100644
--- a/sql/sql.go
+++ b/sql/sql.go
@@ -53,6 +53,13 @@ CREATE TABLE IF NOT EXISTS log_rr_answer (
FOREIGN KEY (log_id) REFERENCES log(id),
FOREIGN KEY (rr_answer_id) REFERENCES rr_answer(id)
);
+
+CREATE TABLE IF NOT EXISTS cache (
+ id INTEGER PRIMARY KEY,
+ key INTEGER NOT NULL,
+ data TEXT NOT NULL,
+ CONSTRAINT key_unique UNIQUE(key)
+);
`
// Client implements a client for a SQLite database.
@@ -72,6 +79,11 @@ type LogEntry struct {
Answer string `db:"answer"`
}
+type cacheEntry struct {
+ Key uint32 `db:"key"`
+ Data string `db:"data"`
+}
+
// New creates a new database client for given filename.
func New(filename string) (*Client, error) {
db, err := sqlx.Connect("sqlite3", filename)
@@ -218,3 +230,55 @@ func (c *Client) DeleteLogBefore(t time.Time) (err error) {
}
return tx.Commit()
}
+
+func (c *Client) writeCacheValue(key uint32, data string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ tx, err := c.db.Beginx()
+ if err != nil {
+ return nil
+ }
+ defer tx.Rollback()
+ query := `INSERT INTO cache (key, data) VALUES ($1, $2)
+ ON CONFLICT(key) DO UPDATE SET data=excluded.data`
+ if _, err := tx.Exec(query, key, data); err != nil {
+ return err
+ }
+ return tx.Commit()
+}
+
+func (c *Client) removeCacheValue(key uint32) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ tx, err := c.db.Beginx()
+ if err != nil {
+ return nil
+ }
+ defer tx.Rollback()
+ if _, err := tx.Exec("DELETE FROM cache WHERE key = $1", key); err != nil {
+ return err
+ }
+ return tx.Commit()
+}
+
+func (c *Client) truncateCache() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ tx, err := c.db.Beginx()
+ if err != nil {
+ return nil
+ }
+ defer tx.Rollback()
+ if _, err := tx.Exec("DELETE FROM cache"); err != nil {
+ return err
+ }
+ return tx.Commit()
+}
+
+func (c *Client) readCache() ([]cacheEntry, error) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ var entries []cacheEntry
+ err := c.db.Select(&entries, "SELECT key, data FROM cache ORDER BY id ASC")
+ return entries, err
+}