1
0
mirror of https://gitea.com/xorm/cachestore synced 2025-10-05 15:52:47 +02:00
This commit is contained in:
商讯在线
2014-10-30 22:55:37 +08:00
parent c5835759d4
commit 7f6037a1b0
32 changed files with 5138 additions and 1 deletions

View File

@@ -1,2 +1,4 @@
cachestore
==========
======
beta 1

54
encdec.go Normal file
View File

@@ -0,0 +1,54 @@
package cachestore
import (
"bytes"
"crypto/md5"
"encoding/gob"
"encoding/json"
"fmt"
"io"
)
// md5 hash string
func Md5(str string) string {
m := md5.New()
io.WriteString(m, str)
return fmt.Sprintf("%x", m.Sum(nil))
}
func Encode(data interface{}) ([]byte, error) {
//return JsonEncode(data)
return GobEncode(data)
}
func Decode(data []byte, to interface{}) error {
//return JsonDecode(data, to)
return GobDecode(data, to)
}
func GobEncode(data interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(&data)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func GobDecode(data []byte, to interface{}) error {
buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf)
return dec.Decode(to)
}
func JsonEncode(data interface{}) ([]byte, error) {
val, err := json.Marshal(data)
if err != nil {
return nil, err
}
return val, nil
}
func JsonDecode(data []byte, to interface{}) error {
return json.Unmarshal(data, to)
}

58
example/main.go Normal file
View File

@@ -0,0 +1,58 @@
package main
import (
"log"
"strings"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/go-xorm/cachestore"
//"github.com/go-xorm/core"
"github.com/go-xorm/xorm"
)
var (
cacher *xorm.LRUCacher
CacheDir string = "."
cfg []string = []string{"leveldb"}
)
func main() {
//cfg := strings.SplitN(cacherName, ":", 2)
engine, err := xorm.NewEngine("mysql", "root:root@/coscms?charset=utf8")
if err != nil {
log.Fatalf("The database connection failed: %v\n", err)
}
switch strings.ToLower(cfg[0]) {
case "memory":
ccStore := xorm.NewMemoryStore()
//this.CacheStore = ccStore
cacher = xorm.NewLRUCacher(ccStore, 1000) //NewLRUCacher(store core.CacheStore, maxElementSize int)
case "leveldb":
storagePath := CacheDir + "/leveldb/dbcache"
if len(cfg) == 2 {
storagePath = cfg[1]
}
ccStore := cachestore.NewLevelDBStore(storagePath)
//this.CacheStore = ccStore
//ccStore.Debug = this.ShowDebug
cacher = xorm.NewLRUCacher(ccStore, 999999999)
case "memcache":
conn := "127.0.0.1:11211"
if len(cfg) == 2 {
conn = cfg[1]
}
ccStore := cachestore.NewMemCache(conn)
//this.CacheStore = ccStore
//ccStore.Debug = this.ShowDebug
cacher = xorm.NewLRUCacher(ccStore, 999999999)
}
if cacher != nil {
cacher.Expired = 86400 * time.Second
engine.SetDefaultCacher(cacher)
}
//engine.Where(querystring, ...)
}

2
example/run.bat Normal file
View File

@@ -0,0 +1,2 @@
go run main.go
pause

73
leveldbstore.go Normal file
View File

@@ -0,0 +1,73 @@
package cachestore
import (
"fmt"
"github.com/syndtr/goleveldb/leveldb"
//"reflect"
)
// LevelDBStore implements CacheStore provide local machine
type LevelDBStore struct {
store *leveldb.DB
Debug bool
v interface{}
}
func NewLevelDBStore(dbfile string) *LevelDBStore {
db := &LevelDBStore{}
if h, err := leveldb.OpenFile(dbfile, nil); err != nil {
panic(err)
} else {
db.store = h
}
return db
}
func (s *LevelDBStore) Put(key string, value interface{}) error {
val, err := Encode(value)
if err != nil {
fmt.Println("[LevelDB]EncodeErr: ", err)
return err
}
err = s.store.Put([]byte(key), val, nil)
if err != nil {
fmt.Println("[LevelDB]PutErr: ", err)
}
if s.Debug {
fmt.Println("[LevelDB]Put: ", key)
}
return err
}
func (s *LevelDBStore) Get(key string) (interface{}, error) {
data, err := s.store.Get([]byte(key), nil)
if err != nil {
fmt.Println("[LevelDB]GetErr: ", err)
return nil, err
}
err = Decode(data, &s.v)
if err != nil {
fmt.Println("[LevelDB]DecodeErr: ", err)
}
if s.Debug {
fmt.Println("[LevelDB]Get: ", key, s.v)
}
return s.v, err
}
func (s *LevelDBStore) Del(key string) error {
err := s.store.Delete([]byte(key), nil)
if err != nil {
fmt.Println("[LevelDB]DelErr: ", err)
return err
}
if s.Debug {
fmt.Println("[LevelDB]Del: ", key)
}
return err
}
func (s *LevelDBStore) Close() {
s.store.Close()
}

175
memcache.go Normal file
View File

@@ -0,0 +1,175 @@
package cachestore
import (
"coscms/app/base/lib/cachestore/memcache"
"encoding/json"
"errors"
"fmt"
)
// Memcache adapter.
type MemcacheCache struct {
c *memcache.Connection
conninfo string
LifeTime uint64
Debug bool
}
// create new memcache adapter.
func NewMemCache(conn string) *MemcacheCache {
rc := &MemcacheCache{LifeTime: 86400}
rc.conninfo = conn
rc.c = rc.connectInit()
return rc
}
// get value from memcache.
func (rc *MemcacheCache) Get(key string) (interface{}, error) {
if rc.c == nil {
rc.c = rc.connectInit()
if rc.c == nil {
return nil, nil
}
}
val, err := rc.c.Get(Md5(key))
if err != nil || len(val) < 1 {
if err != nil {
fmt.Println("[Memcache]GetErr: ", err)
rc.c.Close()
rc.c = nil
}
return nil, err
}
var v interface{}
err = Decode(val[0].Value, &v)
if err != nil {
fmt.Println("[Memcache]DecodeErr: ", err)
}
if rc.Debug {
fmt.Println("[Memcache]Get: ", key)
}
return v, err
}
// put value to memcache. only support string.
func (rc *MemcacheCache) Put(key string, value interface{}) error {
if rc.c == nil {
rc.c = rc.connectInit()
if rc.c == nil {
return nil
}
}
val, err := Encode(value)
if err != nil {
fmt.Println("[Memcache]EncodeErr: ", err)
return err
}
stored, err := rc.c.Set(Md5(key), 0, rc.LifeTime, val)
if err != nil || stored == false {
if err != nil {
fmt.Println("[Memcache]PutErr: ", err)
rc.c.Close()
rc.c = nil
}
return errors.New("stored fail")
}
if rc.Debug {
fmt.Println("[Memcache]Put: ", key)
}
return err
}
// delete value in memcache.
func (rc *MemcacheCache) Del(key string) error {
if rc.c == nil {
rc.c = rc.connectInit()
if rc.c == nil {
return nil
}
}
_, err := rc.c.Delete(Md5(key))
if err != nil {
fmt.Println("[Memcache]DelErr: ", err)
rc.c.Close()
rc.c = nil
}
if rc.Debug {
fmt.Println("[Memcache]Del: ", key)
}
return err
}
// [Not Support]
// increase counter.
func (rc *MemcacheCache) Incr(key string) error {
return errors.New("not support in memcache")
}
// [Not Support]
// decrease counter.
func (rc *MemcacheCache) Decr(key string) error {
return errors.New("not support in memcache")
}
// check value exists in memcache.
func (rc *MemcacheCache) IsExist(key string) bool {
if rc.c == nil {
rc.c = rc.connectInit()
}
v, err := rc.c.Get(key)
if err != nil {
rc.c.Close()
rc.c = nil
return false
}
if len(v) == 0 {
return false
}
return true
}
// clear all cached in memcache.
func (rc *MemcacheCache) ClearAll() error {
if rc.c == nil {
rc.c = rc.connectInit()
if rc.c == nil {
return nil
}
}
err := rc.c.FlushAll()
return err
}
// start memcache adapter.
// config string is like {"conn":"connection info"}.
// if connecting error, return.
func (rc *MemcacheCache) Connect(config string) error {
var cf map[string]string
json.Unmarshal([]byte(config), &cf)
if _, ok := cf["conn"]; !ok {
return errors.New("config has no conn key")
}
rc.conninfo = cf["conn"]
rc.c = rc.connectInit()
if rc.c == nil {
return errors.New("dial tcp conn error")
}
return nil
}
// connect to memcache and keep the connection.
func (rc *MemcacheCache) connectInit() *memcache.Connection {
c, err := memcache.Connect(rc.conninfo)
if err != nil {
fmt.Println("[Memcahe]Connect failure:", err)
return nil
}
if rc.Debug {
fmt.Println("[Memcahe]Connect success:", rc.conninfo)
}
return c
}

280
memcache/memcache.go Normal file
View File

@@ -0,0 +1,280 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package memcache
import (
"bufio"
"fmt"
"io"
"net"
"strconv"
"strings"
)
type Connection struct {
conn net.Conn
buffered bufio.ReadWriter
}
type Result struct {
Key string
Value []byte
Flags uint16
Cas uint64
}
func Connect(address string) (conn *Connection, err error) {
var network string
if strings.Contains(address, "/") {
network = "unix"
} else {
network = "tcp"
}
var nc net.Conn
nc, err = net.Dial(network, address)
if err != nil {
return nil, err
}
return newConnection(nc), nil
}
func newConnection(nc net.Conn) *Connection {
return &Connection{
conn: nc,
buffered: *bufio.NewReadWriter(
bufio.NewReader(nc),
bufio.NewWriter(nc),
),
}
}
func (mc *Connection) Close() {
mc.conn.Close()
mc.conn = nil
}
func (mc *Connection) IsClosed() bool {
return mc.conn == nil
}
func (mc *Connection) Get(keys ...string) (results []Result, err error) {
defer handleError(&err)
results = mc.get("get", keys)
return
}
func (mc *Connection) Gets(keys ...string) (results []Result, err error) {
defer handleError(&err)
results = mc.get("gets", keys)
return
}
func (mc *Connection) Set(key string, flags uint16, timeout uint64, value []byte) (stored bool, err error) {
defer handleError(&err)
return mc.store("set", key, flags, timeout, value, 0), nil
}
func (mc *Connection) Add(key string, flags uint16, timeout uint64, value []byte) (stored bool, err error) {
defer handleError(&err)
return mc.store("add", key, flags, timeout, value, 0), nil
}
func (mc *Connection) Replace(key string, flags uint16, timeout uint64, value []byte) (stored bool, err error) {
defer handleError(&err)
return mc.store("replace", key, flags, timeout, value, 0), nil
}
func (mc *Connection) Append(key string, flags uint16, timeout uint64, value []byte) (stored bool, err error) {
defer handleError(&err)
return mc.store("append", key, flags, timeout, value, 0), nil
}
func (mc *Connection) Prepend(key string, flags uint16, timeout uint64, value []byte) (stored bool, err error) {
defer handleError(&err)
return mc.store("prepend", key, flags, timeout, value, 0), nil
}
func (mc *Connection) Cas(key string, flags uint16, timeout uint64, value []byte, cas uint64) (stored bool, err error) {
defer handleError(&err)
return mc.store("cas", key, flags, timeout, value, cas), nil
}
func (mc *Connection) Delete(key string) (deleted bool, err error) {
defer handleError(&err)
// delete <key> [<time>] [noreply]\r\n
mc.writestrings("delete ", key, "\r\n")
reply := mc.readline()
if strings.Contains(reply, "ERROR") {
panic(NewMemcacheError("Server error"))
}
return strings.HasPrefix(reply, "DELETED"), nil
}
//This purges the entire cache.
func (mc *Connection) FlushAll() (err error) {
defer handleError(&err)
// flush_all [delay] [noreply]\r\n
mc.writestrings("flush_all\r\n")
response := mc.readline()
if !strings.Contains(response, "OK") {
panic(NewMemcacheError(fmt.Sprintf("Error in FlushAll %v", response)))
}
return nil
}
func (mc *Connection) Stats(argument string) (result []byte, err error) {
defer handleError(&err)
if argument == "" {
mc.writestrings("stats\r\n")
} else {
mc.writestrings("stats ", argument, "\r\n")
}
mc.flush()
for {
l := mc.readline()
if strings.HasPrefix(l, "END") {
break
}
if strings.Contains(l, "ERROR") {
return nil, NewMemcacheError(l)
}
result = append(result, l...)
result = append(result, '\n')
}
return result, err
}
func (mc *Connection) get(command string, keys []string) (results []Result) {
results = make([]Result, 0, len(keys))
if len(keys) == 0 {
return
}
// get(s) <key>*\r\n
mc.writestrings(command)
for _, key := range keys {
mc.writestrings(" ", key)
}
mc.writestrings("\r\n")
header := mc.readline()
var result Result
for strings.HasPrefix(header, "VALUE") {
// VALUE <key> <flags> <bytes> [<cas unique>]\r\n
chunks := strings.Split(header, " ")
if len(chunks) < 4 {
panic(NewMemcacheError("Malformed response: %s", string(header)))
}
result.Key = chunks[1]
flags64, err := strconv.ParseUint(chunks[2], 10, 16)
if err != nil {
panic(NewMemcacheError("%v", err))
}
result.Flags = uint16(flags64)
size, err := strconv.ParseUint(chunks[3], 10, 64)
if err != nil {
panic(NewMemcacheError("%v", err))
}
if len(chunks) == 5 {
result.Cas, err = strconv.ParseUint(chunks[4], 10, 64)
if err != nil {
panic(NewMemcacheError("%v", err))
}
}
// <data block>\r\n
result.Value = mc.read(int(size) + 2)[:size]
results = append(results, result)
header = mc.readline()
}
if !strings.HasPrefix(header, "END") {
panic(NewMemcacheError("Malformed response: %s", string(header)))
}
return
}
func (mc *Connection) store(command, key string, flags uint16, timeout uint64, value []byte, cas uint64) (stored bool) {
if len(value) > 1000000 {
return false
}
// <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
mc.writestrings(command, " ", key, " ")
mc.write(strconv.AppendUint(nil, uint64(flags), 10))
mc.writestring(" ")
mc.write(strconv.AppendUint(nil, timeout, 10))
mc.writestring(" ")
mc.write(strconv.AppendInt(nil, int64(len(value)), 10))
if cas != 0 {
mc.writestring(" ")
mc.write(strconv.AppendUint(nil, cas, 10))
}
mc.writestring("\r\n")
// <data block>\r\n
mc.write(value)
mc.writestring("\r\n")
reply := mc.readline()
if strings.Contains(reply, "ERROR") {
panic(NewMemcacheError("Server error"))
}
return strings.HasPrefix(reply, "STORED")
}
func (mc *Connection) writestrings(strs ...string) {
for _, s := range strs {
mc.writestring(s)
}
}
func (mc *Connection) writestring(s string) {
if _, err := mc.buffered.WriteString(s); err != nil {
panic(NewMemcacheError("%s", err))
}
}
func (mc *Connection) write(b []byte) {
if _, err := mc.buffered.Write(b); err != nil {
panic(NewMemcacheError("%s", err))
}
}
func (mc *Connection) flush() {
if err := mc.buffered.Flush(); err != nil {
panic(NewMemcacheError("%s", err))
}
}
func (mc *Connection) readline() string {
mc.flush()
l, isPrefix, err := mc.buffered.ReadLine()
if isPrefix || err != nil {
panic(NewMemcacheError("Prefix: %v, %s", isPrefix, err))
}
return string(l)
}
func (mc *Connection) read(count int) []byte {
mc.flush()
b := make([]byte, count)
if _, err := io.ReadFull(mc.buffered, b); err != nil {
panic(NewMemcacheError("%s", err))
}
return b
}
type MemcacheError struct {
Message string
}
func NewMemcacheError(format string, args ...interface{}) MemcacheError {
return MemcacheError{fmt.Sprintf(format, args...)}
}
func (merr MemcacheError) Error() string {
return merr.Message
}
func handleError(err *error) {
if x := recover(); x != nil {
*err = x.(MemcacheError)
}
}

256
memcache/memcache_test.go Normal file
View File

@@ -0,0 +1,256 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package memcache
import (
"os/exec"
"testing"
"time"
)
func TestMemcache(t *testing.T) {
cmd := exec.Command("memcached", "-s", "/tmp/vtocc_cache.sock")
if err := cmd.Start(); err != nil {
t.Errorf("Memcache start: %v", err)
return
}
defer cmd.Process.Kill()
time.Sleep(time.Second)
c, err := Connect("/tmp/vtocc_cache.sock")
if err != nil {
t.Errorf("Connect: %v", err)
return
}
// Set
stored, err := c.Set("Hello", 0, 0, []byte("world"))
if err != nil {
t.Errorf("Set: %v", err)
return
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Hello", "world")
// Add
stored, err = c.Add("Hello", 0, 0, []byte("Jupiter"))
if err != nil {
t.Errorf("Add: %v", err)
}
if stored {
t.Errorf("want false, got %v", stored)
}
expect(t, c, "Hello", "world")
// Replace
stored, err = c.Replace("Hello", 0, 0, []byte("World"))
if err != nil {
t.Errorf("Replace: %v", err)
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Hello", "World")
// Append
stored, err = c.Append("Hello", 0, 0, []byte("!"))
if err != nil {
t.Errorf("Append: %v", err)
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Hello", "World!")
// Prepend
stored, err = c.Prepend("Hello", 0, 0, []byte("Hello, "))
if err != nil {
t.Errorf("Prepend: %v", err)
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Hello", "Hello, World!")
// Delete
deleted, err := c.Delete("Hello")
if err != nil {
t.Errorf("Delete: %v", err)
}
if !deleted {
t.Errorf("want true, got %v", deleted)
}
expect(t, c, "Hello", "")
// Flags
stored, err = c.Set("Hello", 0xFFFF, 0, []byte("world"))
if err != nil {
t.Errorf("Set: %v", err)
return
}
if !stored {
t.Errorf("want true, got %v", stored)
}
results, err := c.Get("Hello")
if err != nil {
t.Errorf("Get: %v", err)
return
}
if results[0].Flags != 0xFFFF {
t.Errorf("want 0xFFFF, got %x", results[0].Flags)
}
if string(results[0].Value) != "world" {
t.Errorf("want world, got %s", results[0].Value)
}
// timeout
stored, err = c.Set("Lost", 0, 1, []byte("World"))
if err != nil {
t.Errorf("Set: %v", err)
return
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Lost", "World")
time.Sleep(2 * time.Second)
expect(t, c, "Lost", "")
// cas
stored, err = c.Set("Data", 0, 0, []byte("Set"))
if err != nil {
t.Errorf("Set: %v", err)
return
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Data", "Set")
results, err = c.Gets("Data")
if err != nil {
t.Errorf("Gets: %v", err)
return
}
cas := results[0].Cas
if cas == 0 {
t.Errorf("want non-zero for cas")
}
stored, err = c.Cas("Data", 0, 0, []byte("not set"), 12345)
if err != nil {
t.Errorf("Set: %v", err)
return
}
if stored {
t.Errorf("want false, got %v", stored)
}
expect(t, c, "Data", "Set")
stored, err = c.Cas("Data", 0, 0, []byte("Changed"), cas)
if err != nil {
t.Errorf("Set: %v", err)
return
}
expect(t, c, "Data", "Changed")
stored, err = c.Set("Data", 0, 0, []byte("Overwritten"))
if err != nil {
t.Errorf("Set: %v", err)
return
}
if !stored {
t.Errorf("want true, got %v", stored)
}
expect(t, c, "Data", "Overwritten")
// stats
_, err = c.Stats("")
if err != nil {
t.Errorf("Stats: %v", err)
return
}
_, err = c.Stats("slabs")
if err != nil {
t.Errorf("Stats: %v", err)
return
}
// FlushAll
// Set
stored, err = c.Set("Flush", 0, 0, []byte("Test"))
if err != nil {
t.Errorf("Set: %v", err)
}
expect(t, c, "Flush", "Test")
err = c.FlushAll()
if err != nil {
t.Errorf("FlushAll: err %v", err)
return
}
results, err = c.Get("Flush")
if err != nil {
t.Errorf("Get: %v", err)
return
}
if len(results) != 0 {
t.Errorf("FlushAll failed")
return
}
// Multi
stored, _ = c.Set("key1", 0, 0, []byte("val1"))
stored, _ = c.Set("key2", 0, 0, []byte("val2"))
results, _ = c.Get("key1", "key2")
if len(results) != 2 {
t.Fatalf("want 2, gto %d", len(results))
}
if results[0].Key != "key1" {
t.Errorf("want key1, got %s", results[0].Key)
}
if string(results[0].Value) != "val1" {
t.Errorf("want val1, got %s", string(results[0].Value))
}
if results[1].Key != "key2" {
t.Errorf("want key2, got %s", results[0].Key)
}
if string(results[1].Value) != "val2" {
t.Errorf("want val2, got %s", string(results[1].Value))
}
results, _ = c.Gets("key1", "key3", "key2")
if len(results) != 2 {
t.Fatalf("want 2, gto %d", len(results))
}
if results[0].Key != "key1" {
t.Errorf("want key1, got %s", results[0].Key)
}
if string(results[0].Value) != "val1" {
t.Errorf("want val1, got %s", string(results[0].Value))
}
if results[1].Key != "key2" {
t.Errorf("want key2, got %s", results[0].Key)
}
if string(results[1].Value) != "val2" {
t.Errorf("want val2, got %s", string(results[1].Value))
}
}
func expect(t *testing.T, c *Connection, key, value string) {
results, err := c.Get(key)
if err != nil {
t.Errorf("Get: %v", err)
return
}
var got string
if len(results) != 0 {
got = string(results[0].Value)
}
if got != value {
t.Errorf("want %s, got %s", value, results[0].Value)
}
}

44
redigo/README.markdown Normal file
View File

@@ -0,0 +1,44 @@
Redigo
======
Redigo is a [Go](http://golang.org/) client for the [Redis](http://redis.io/) database.
Features
-------
* A [Print-like](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Executing_Commands) API with support for all Redis commands.
* [Pipelining](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Pipelining), including pipelined transactions.
* [Publish/Subscribe](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Publish_and_Subscribe).
* [Connection pooling](http://godoc.org/github.com/garyburd/redigo/redis#Pool).
* [Script helper type](http://godoc.org/github.com/garyburd/redigo/redis#Script) with optimistic use of EVALSHA.
* [Helper functions](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Reply_Helpers) for working with command replies.
Documentation
-------------
- [API Reference](http://godoc.org/github.com/garyburd/redigo/redis)
- [FAQ](https://github.com/garyburd/redigo/wiki/FAQ)
Installation
------------
Install Redigo using the "go get" command:
go get github.com/garyburd/redigo/redis
The Go distribution is Redigo's only dependency.
Contributing
------------
Contributions are welcome.
Before writing code, send mail to gary@beagledreams.com to discuss what you
plan to do. This gives me a chance to validate the design, avoid duplication of
effort and ensure that the changes fit the goals of the project. Do not start
the discussion with a pull request.
License
-------
Redigo is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html).

414
redigo/redis/conn.go Normal file
View File

@@ -0,0 +1,414 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
)
// conn is the low-level implementation of Conn
type conn struct {
// Shared
mu sync.Mutex
pending int
err error
conn net.Conn
// Read
readTimeout time.Duration
br *bufio.Reader
// Write
writeTimeout time.Duration
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
}
// Dial connects to the Redis server at the given network and address.
func Dial(network, address string) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewConn(c, 0, 0), nil
}
// DialTimeout acts like Dial but takes timeouts for establishing the
// connection to the server, writing a command and reading a reply.
func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) {
var c net.Conn
var err error
if connectTimeout > 0 {
c, err = net.DialTimeout(network, address, connectTimeout)
} else {
c, err = net.Dial(network, address)
}
if err != nil {
return nil, err
}
return NewConn(c, readTimeout, writeTimeout), nil
}
// NewConn returns a new Redigo connection for the given net connection.
func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn {
return &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
}
func (c *conn) Close() error {
c.mu.Lock()
err := c.err
if c.err == nil {
c.err = errors.New("redigo: closed")
err = c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) fatal(err error) error {
c.mu.Lock()
if c.err == nil {
c.err = err
// Close connection to force errors on subsequent calls and to unblock
// other reader or writer.
c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *conn) writeLen(prefix byte, n int) error {
c.lenScratch[len(c.lenScratch)-1] = '\n'
c.lenScratch[len(c.lenScratch)-2] = '\r'
i := len(c.lenScratch) - 3
for {
c.lenScratch[i] = byte('0' + n%10)
i -= 1
n = n / 10
if n == 0 {
break
}
}
c.lenScratch[i] = prefix
_, err := c.bw.Write(c.lenScratch[i:])
return err
}
func (c *conn) writeString(s string) error {
c.writeLen('$', len(s))
c.bw.WriteString(s)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeBytes(p []byte) error {
c.writeLen('$', len(p))
c.bw.Write(p)
_, err := c.bw.WriteString("\r\n")
return err
}
func (c *conn) writeInt64(n int64) error {
return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10))
}
func (c *conn) writeFloat64(n float64) error {
return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64))
}
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
c.writeLen('*', 1+len(args))
err = c.writeString(cmd)
for _, arg := range args {
if err != nil {
break
}
switch arg := arg.(type) {
case string:
err = c.writeString(arg)
case []byte:
err = c.writeBytes(arg)
case int:
err = c.writeInt64(int64(arg))
case int64:
err = c.writeInt64(arg)
case float64:
err = c.writeFloat64(arg)
case bool:
if arg {
err = c.writeString("1")
} else {
err = c.writeString("0")
}
case nil:
err = c.writeString("")
default:
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
err = c.writeBytes(buf.Bytes())
}
}
return err
}
func (c *conn) readLine() ([]byte, error) {
p, err := c.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
return nil, errors.New("redigo: long response line")
}
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, errors.New("redigo: bad response line terminator")
}
return p[:i], nil
}
// parseLen parses bulk and multi-bulk lengths.
func parseLen(p []byte) (int, error) {
if len(p) == 0 {
return -1, errors.New("redigo: malformed length")
}
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
// handle $-1 and $-1 null replies.
return -1, nil
}
var n int
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return -1, errors.New("redigo: illegal bytes in length")
}
n += int(b - '0')
}
return n, nil
}
// parseInt parses an integer reply.
func parseInt(p []byte) (interface{}, error) {
if len(p) == 0 {
return 0, errors.New("redigo: malformed integer")
}
var negate bool
if p[0] == '-' {
negate = true
p = p[1:]
if len(p) == 0 {
return 0, errors.New("redigo: malformed integer")
}
}
var n int64
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return 0, errors.New("redigo: illegal bytes in length")
}
n += int64(b - '0')
}
if negate {
n = -n
}
return n, nil
}
var (
okReply interface{} = "OK"
pongReply interface{} = "PONG"
)
func (c *conn) readReply() (interface{}, error) {
line, err := c.readLine()
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, errors.New("redigo: short response line")
}
switch line[0] {
case '+':
switch {
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
case '$':
n, err := parseLen(line[1:])
if n < 0 {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, err
}
if line, err := c.readLine(); err != nil {
return nil, err
} else if len(line) != 0 {
return nil, errors.New("redigo: bad bulk format")
}
return p, nil
case '*':
n, err := parseLen(line[1:])
if n < 0 {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, errors.New("redigo: unexpected response line")
}
func (c *conn) Send(cmd string, args ...interface{}) error {
c.mu.Lock()
c.pending += 1
c.mu.Unlock()
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.writeCommand(cmd, args); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Flush() error {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err := c.bw.Flush(); err != nil {
return c.fatal(err)
}
return nil
}
func (c *conn) Receive() (reply interface{}, err error) {
c.mu.Lock()
// There can be more receives than sends when using pub/sub. To allow
// normal use of the connection after unsubscribe from all channels, do not
// decrement pending to a negative value.
if c.pending > 0 {
c.pending -= 1
}
c.mu.Unlock()
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if reply, err = c.readReply(); err != nil {
return nil, c.fatal(err)
}
if err, ok := reply.(Error); ok {
return nil, err
}
return
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if cmd != "" {
c.writeCommand(cmd, args)
}
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if cmd == "" {
reply := make([]interface{}, pending)
for i := range reply {
r, e := c.readReply()
if e != nil {
return nil, c.fatal(e)
}
reply[i] = r
}
return reply, nil
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
return reply, err
}

411
redigo/redis/conn_test.go Normal file
View File

@@ -0,0 +1,411 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"bufio"
"bytes"
"errors"
"math"
"net"
"reflect"
"strings"
"testing"
"time"
"github.com/garyburd/redigo/redis"
)
var writeTests = []struct {
args []interface{}
expected string
}{
{
[]interface{}{"SET", "foo", "bar"},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n",
},
{
[]interface{}{"SET", "foo", "bar"},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n",
},
{
[]interface{}{"SET", "foo", byte(100)},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n100\r\n",
},
{
[]interface{}{"SET", "foo", 100},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\n100\r\n",
},
{
[]interface{}{"SET", "foo", int64(math.MinInt64)},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$20\r\n-9223372036854775808\r\n",
},
{
[]interface{}{"SET", "foo", float64(1349673917.939762)},
"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$21\r\n1.349673917939762e+09\r\n",
},
{
[]interface{}{"SET", "", []byte("foo")},
"*3\r\n$3\r\nSET\r\n$0\r\n\r\n$3\r\nfoo\r\n",
},
{
[]interface{}{"SET", nil, []byte("foo")},
"*3\r\n$3\r\nSET\r\n$0\r\n\r\n$3\r\nfoo\r\n",
},
}
func TestWrite(t *testing.T) {
for _, tt := range writeTests {
var buf bytes.Buffer
rw := bufio.ReadWriter{Writer: bufio.NewWriter(&buf)}
c := redis.NewConnBufio(rw)
err := c.Send(tt.args[0].(string), tt.args[1:]...)
if err != nil {
t.Errorf("Send(%v) returned error %v", tt.args, err)
continue
}
rw.Flush()
actual := buf.String()
if actual != tt.expected {
t.Errorf("Send(%v) = %q, want %q", tt.args, actual, tt.expected)
}
}
}
var errorSentinel = &struct{}{}
var readTests = []struct {
reply string
expected interface{}
}{
{
"+OK\r\n",
"OK",
},
{
"+PONG\r\n",
"PONG",
},
{
"@OK\r\n",
errorSentinel,
},
{
"$6\r\nfoobar\r\n",
[]byte("foobar"),
},
{
"$-1\r\n",
nil,
},
{
":1\r\n",
int64(1),
},
{
":-2\r\n",
int64(-2),
},
{
"*0\r\n",
[]interface{}{},
},
{
"*-1\r\n",
nil,
},
{
"*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nHello\r\n$5\r\nWorld\r\n",
[]interface{}{[]byte("foo"), []byte("bar"), []byte("Hello"), []byte("World")},
},
{
"*3\r\n$3\r\nfoo\r\n$-1\r\n$3\r\nbar\r\n",
[]interface{}{[]byte("foo"), nil, []byte("bar")},
},
}
func TestRead(t *testing.T) {
for _, tt := range readTests {
rw := bufio.ReadWriter{
Reader: bufio.NewReader(strings.NewReader(tt.reply)),
Writer: bufio.NewWriter(nil), // writer need to support Flush
}
c := redis.NewConnBufio(rw)
actual, err := c.Receive()
if tt.expected == errorSentinel {
if err == nil {
t.Errorf("Receive(%q) did not return expected error", tt.reply)
}
} else {
if err != nil {
t.Errorf("Receive(%q) returned error %v", tt.reply, err)
continue
}
if !reflect.DeepEqual(actual, tt.expected) {
t.Errorf("Receive(%q) = %v, want %v", tt.reply, actual, tt.expected)
}
}
}
}
type testConn struct {
redis.Conn
}
func (t testConn) Close() error {
_, err := t.Conn.Do("SELECT", "9")
if err != nil {
return nil
}
_, err = t.Conn.Do("FLUSHDB")
if err != nil {
return err
}
return t.Conn.Close()
}
func dial() (redis.Conn, error) {
c, err := redis.DialTimeout("tcp", ":6379", 0, 1*time.Second, 1*time.Second)
if err != nil {
return nil, err
}
_, err = c.Do("SELECT", "9")
if err != nil {
return nil, err
}
n, err := redis.Int(c.Do("DBSIZE"))
if err != nil {
return nil, err
}
if n != 0 {
return nil, errors.New("database #9 is not empty, test can not continue")
}
return testConn{c}, nil
}
func dialt(t *testing.T) redis.Conn {
c, err := dial()
if err != nil {
t.Fatalf("error connection to database, %v", err)
}
return c
}
var testCommands = []struct {
args []interface{}
expected interface{}
}{
{
[]interface{}{"PING"},
"PONG",
},
{
[]interface{}{"SET", "foo", "bar"},
"OK",
},
{
[]interface{}{"GET", "foo"},
[]byte("bar"),
},
{
[]interface{}{"GET", "nokey"},
nil,
},
{
[]interface{}{"MGET", "nokey", "foo"},
[]interface{}{nil, []byte("bar")},
},
{
[]interface{}{"INCR", "mycounter"},
int64(1),
},
{
[]interface{}{"LPUSH", "mylist", "foo"},
int64(1),
},
{
[]interface{}{"LPUSH", "mylist", "bar"},
int64(2),
},
{
[]interface{}{"LRANGE", "mylist", 0, -1},
[]interface{}{[]byte("bar"), []byte("foo")},
},
{
[]interface{}{"MULTI"},
"OK",
},
{
[]interface{}{"LRANGE", "mylist", 0, -1},
"QUEUED",
},
{
[]interface{}{"PING"},
"QUEUED",
},
{
[]interface{}{"EXEC"},
[]interface{}{
[]interface{}{[]byte("bar"), []byte("foo")},
"PONG",
},
},
}
func TestDoCommands(t *testing.T) {
c := dialt(t)
defer c.Close()
for _, cmd := range testCommands {
actual, err := c.Do(cmd.args[0].(string), cmd.args[1:]...)
if err != nil {
t.Errorf("Do(%v) returned error %v", cmd.args, err)
continue
}
if !reflect.DeepEqual(actual, cmd.expected) {
t.Errorf("Do(%v) = %v, want %v", cmd.args, actual, cmd.expected)
}
}
}
func TestPipelineCommands(t *testing.T) {
c := dialt(t)
defer c.Close()
for _, cmd := range testCommands {
if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil {
t.Fatalf("Send(%v) returned error %v", cmd.args, err)
}
}
if err := c.Flush(); err != nil {
t.Errorf("Flush() returned error %v", err)
}
for _, cmd := range testCommands {
actual, err := c.Receive()
if err != nil {
t.Fatalf("Receive(%v) returned error %v", cmd.args, err)
}
if !reflect.DeepEqual(actual, cmd.expected) {
t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected)
}
}
}
func TestBlankCommmand(t *testing.T) {
c := dialt(t)
defer c.Close()
for _, cmd := range testCommands {
if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil {
t.Fatalf("Send(%v) returned error %v", cmd.args, err)
}
}
reply, err := redis.Values(c.Do(""))
if err != nil {
t.Fatalf("Do() returned error %v", err)
}
if len(reply) != len(testCommands) {
t.Fatalf("len(reply)=%d, want %d", len(reply), len(testCommands))
}
for i, cmd := range testCommands {
actual := reply[i]
if !reflect.DeepEqual(actual, cmd.expected) {
t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected)
}
}
}
func TestError(t *testing.T) {
c := dialt(t)
defer c.Close()
c.Do("SET", "key", "val")
_, err := c.Do("HSET", "key", "fld", "val")
if err == nil {
t.Errorf("Expected err for HSET on string key.")
}
if c.Err() != nil {
t.Errorf("Conn has Err()=%v, expect nil", c.Err())
}
_, err = c.Do("SET", "key", "val")
if err != nil {
t.Errorf("Do(SET, key, val) returned error %v, expected nil.", err)
}
}
func TestReadDeadline(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("net.Listen returned %v", err)
}
defer l.Close()
go func() {
for {
c, err := l.Accept()
if err != nil {
return
}
go func() {
time.Sleep(time.Second)
c.Write([]byte("+OK\r\n"))
c.Close()
}()
}
}()
c1, err := redis.DialTimeout(l.Addr().Network(), l.Addr().String(), 0, time.Millisecond, 0)
if err != nil {
t.Fatalf("redis.Dial returned %v", err)
}
defer c1.Close()
_, err = c1.Do("PING")
if err == nil {
t.Fatalf("c1.Do() returned nil, expect error")
}
if c1.Err() == nil {
t.Fatalf("c1.Err() = nil, expect error")
}
c2, err := redis.DialTimeout(l.Addr().Network(), l.Addr().String(), 0, time.Millisecond, 0)
if err != nil {
t.Fatalf("redis.Dial returned %v", err)
}
defer c2.Close()
c2.Send("PING")
c2.Flush()
_, err = c2.Receive()
if err == nil {
t.Fatalf("c2.Receive() returned nil, expect error")
}
if c2.Err() == nil {
t.Fatalf("c2.Err() = nil, expect error")
}
}
// Connect to local instance of Redis running on the default port.
func ExampleDial(x int) {
c, err := redis.Dial("tcp", ":6379")
if err != nil {
// handle error
}
defer c.Close()
}

167
redigo/redis/doc.go Normal file
View File

@@ -0,0 +1,167 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
// Package redis is a client for the Redis database.
//
// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more
// documentation about this package.
//
// Connections
//
// The Conn interface is the primary interface for working with Redis.
// Applications create connections by calling the Dial, DialWithTimeout or
// NewConn functions. In the future, functions will be added for creating
// sharded and other types of connections.
//
// The application must call the connection Close method when the application
// is done with the connection.
//
// Executing Commands
//
// The Conn interface has a generic method for executing Redis commands:
//
// Do(commandName string, args ...interface{}) (reply interface{}, err error)
//
// The Redis command reference (http://redis.io/commands) lists the available
// commands. An example of using the Redis APPEND command is:
//
// n, err := conn.Do("APPEND", "key", "value")
//
// The Do method converts command arguments to binary strings for transmission
// to the server as follows:
//
// Go Type Conversion
// []byte Sent as is
// string Sent as is
// int, int64 strconv.FormatInt(v)
// float64 strconv.FormatFloat(v, 'g', -1, 64)
// bool true -> "1", false -> "0"
// nil ""
// all other types fmt.Print(v)
//
// Redis command reply types are represented using the following Go types:
//
// Redis type Go type
// error redis.Error
// integer int64
// status string
// bulk []byte or nil if value not present.
// multi-bulk []interface{} or nil if value not present.
//
// Use type assertions or the reply helper functions to convert from
// interface{} to the specific Go type for the command result.
//
// Pipelining
//
// Connections support pipelining using the Send, Flush and Receive methods.
//
// Send(commandName string, args ...interface{}) error
// Flush() error
// Receive() (reply interface{}, err error)
//
// Send writes the command to the connection's output buffer. Flush flushes the
// connection's output buffer to the server. Receive reads a single reply from
// the server. The following example shows a simple pipeline.
//
// c.Send("SET", "foo", "bar")
// c.Send("GET", "foo")
// c.Flush()
// c.Receive() // reply from SET
// v, err = c.Receive() // reply from GET
//
// The Do method combines the functionality of the Send, Flush and Receive
// methods. The Do method starts by writing the command and flushing the output
// buffer. Next, the Do method receives all pending replies including the reply
// for the command just sent by Do. If any of the received replies is an error,
// then Do returns the error. If there are no errors, then Do returns the last
// reply. If the command argument to the Do method is "", then the Do method
// will flush the output buffer and receive pending replies without sending a
// command.
//
// Use the Send and Do methods to implement pipelined transactions.
//
// c.Send("MULTI")
// c.Send("INCR", "foo")
// c.Send("INCR", "bar")
// r, err := c.Do("EXEC")
// fmt.Println(r) // prints [1, 1]
//
// Concurrency
//
// Connections support a single concurrent caller to the write methods (Send,
// Flush) and a single concurrent caller to the read method (Receive). Because
// Do method combines the functionality of Send, Flush and Receive, the Do
// method cannot be called concurrently with the other methods.
//
// For full concurrent access to Redis, use the thread-safe Pool to get and
// release connections from within a goroutine.
//
// Publish and Subscribe
//
// Use the Send, Flush and Receive methods to implement Pub/Sub subscribers.
//
// c.Send("SUBSCRIBE", "example")
// c.Flush()
// for {
// reply, err := c.Receive()
// if err != nil {
// return err
// }
// // process pushed message
// }
//
// The PubSubConn type wraps a Conn with convenience methods for implementing
// subscribers. The Subscribe, PSubscribe, Unsubscribe and PUnsubscribe methods
// send and flush a subscription management command. The receive method
// converts a pushed message to convenient types for use in a type switch.
//
// psc := PubSubConn{c}
// psc.Subscribe("example")
// for {
// switch v := psc.Receive().(type) {
// case redis.Message:
// fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
// case redis.Subscription:
// fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
// case error:
// return v
// }
// }
//
// Reply Helpers
//
// The Bool, Int, Bytes, String, Strings and Values functions convert a reply
// to a value of a specific type. To allow convenient wrapping of calls to the
// connection Do and Receive methods, the functions take a second argument of
// type error. If the error is non-nil, then the helper function returns the
// error. If the error is nil, the function converts the reply to the specified
// type:
//
// exists, err := redis.Bool(c.Do("EXISTS", "foo"))
// if err != nil {
// // handle error return from c.Do or type conversion error.
// }
//
// The Scan function converts elements of a multi-bulk reply to Go types:
//
// var value1 int
// var value2 string
// reply, err := redis.Values(c.Do("MGET", "key1", "key2"))
// if err != nil {
// // handle error
// }
// if _, err := redis.Scan(reply, &value1, &value2); err != nil {
// // handle error
// }
package redis

117
redigo/redis/log.go Normal file
View File

@@ -0,0 +1,117 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bytes"
"fmt"
"log"
)
// NewLoggingConn returns a logging wrapper around a connection.
func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
if prefix != "" {
prefix = prefix + "."
}
return &loggingConn{conn, logger, prefix}
}
type loggingConn struct {
Conn
logger *log.Logger
prefix string
}
func (c *loggingConn) Close() error {
err := c.Conn.Close()
var buf bytes.Buffer
fmt.Fprintf(&buf, "%sClose() -> (%v)", c.prefix, err)
c.logger.Output(2, buf.String())
return err
}
func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) {
const chop = 32
switch v := v.(type) {
case []byte:
if len(v) > chop {
fmt.Fprintf(buf, "%q...", v[:chop])
} else {
fmt.Fprintf(buf, "%q", v)
}
case string:
if len(v) > chop {
fmt.Fprintf(buf, "%q...", v[:chop])
} else {
fmt.Fprintf(buf, "%q", v)
}
case []interface{}:
if len(v) == 0 {
buf.WriteString("[]")
} else {
sep := "["
fin := "]"
if len(v) > chop {
v = v[:chop]
fin = "...]"
}
for _, vv := range v {
buf.WriteString(sep)
c.printValue(buf, vv)
sep = ", "
}
buf.WriteString(fin)
}
default:
fmt.Fprint(buf, v)
}
}
func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s%s(", c.prefix, method)
if method != "Receive" {
buf.WriteString(commandName)
for _, arg := range args {
buf.WriteString(", ")
c.printValue(&buf, arg)
}
}
buf.WriteString(") -> (")
if method != "Send" {
c.printValue(&buf, reply)
buf.WriteString(", ")
}
fmt.Fprintf(&buf, "%v)", err)
c.logger.Output(3, buf.String())
}
func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) {
reply, err := c.Conn.Do(commandName, args...)
c.print("Do", commandName, args, reply, err)
return reply, err
}
func (c *loggingConn) Send(commandName string, args ...interface{}) error {
err := c.Conn.Send(commandName, args...)
c.print("Send", commandName, args, nil, err)
return err
}
func (c *loggingConn) Receive() (interface{}, error) {
reply, err := c.Conn.Receive()
c.print("Receive", "", nil, reply, err)
return reply, err
}

291
redigo/redis/pool.go Normal file
View File

@@ -0,0 +1,291 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"container/list"
"errors"
"sync"
"time"
)
var nowFunc = time.Now // for testing
// ErrPoolExhausted is returned from pool connection methods when the maximum
// number of database connections in the pool has been reached.
var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
var errPoolClosed = errors.New("redigo: connection pool closed")
// Pool maintains a pool of connections. The application calls the Get method
// to get a connection from the pool and the connection's Close method to
// return the connection's resources to the pool.
//
// The following example shows how to use a pool in a web application. The
// application creates a pool at application startup and makes it available to
// request handlers, possibly using a global variable:
//
// var server string // host:port of server
// var password string
// ...
//
// pool = &redis.Pool{
// MaxIdle: 3,
// IdleTimeout: 240 * time.Second,
// Dial: func () (redis.Conn, error) {
// c, err := redis.Dial("tcp", server)
// if err != nil {
// return nil, err
// }
// if _, err := c.Do("AUTH", password); err != nil {
// c.Close()
// return nil, err
// }
// return c, err
// },
// TestOnBorrow: func(c redis.Conn, t time.Time) error {
// _, err := c.Do("PING")
// return err
// },
// }
//
// This pool has a maximum of three connections to the server specified by the
// variable "server". Each connection is authenticated using a password.
//
// A request handler gets a connection from the pool and closes the connection
// when the handler is done:
//
// conn := pool.Get()
// defer conn.Close()
// // do something with the connection
type Pool struct {
// Dial is an application supplied function for creating new connections.
Dial func() (Conn, error)
// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error
// Maximum number of idle connections in the pool.
MaxIdle int
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration
// mu protects fields defined below.
mu sync.Mutex
closed bool
active int
// Stack of idleConn with most recently used at the front.
idle list.List
}
type idleConn struct {
c Conn
t time.Time
}
// NewPool returns a pool that uses newPool to create connections as needed.
// The pool keeps a maximum of maxIdle idle connections.
func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
return &Pool{Dial: newFn, MaxIdle: maxIdle}
}
// Get gets a connection from the pool.
func (p *Pool) Get() Conn {
return &pooledConnection{p: p}
}
// ActiveCount returns the number of active connections in the pool.
func (p *Pool) ActiveCount() int {
p.mu.Lock()
active := p.active
p.mu.Unlock()
return active
}
// Close releases the resources used by the pool.
func (p *Pool) Close() error {
p.mu.Lock()
idle := p.idle
p.idle.Init()
p.closed = true
p.active -= idle.Len()
p.mu.Unlock()
for e := idle.Front(); e != nil; e = e.Next() {
e.Value.(idleConn).c.Close()
}
return nil
}
// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get() (Conn, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}
// Prune stale connections.
if timeout := p.IdleTimeout; timeout > 0 {
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Back()
if e == nil {
break
}
ic := e.Value.(idleConn)
if ic.t.Add(timeout).After(nowFunc()) {
break
}
p.idle.Remove(e)
p.active -= 1
p.mu.Unlock()
ic.c.Close()
p.mu.Lock()
}
}
// Get idle connection.
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Front()
if e == nil {
break
}
ic := e.Value.(idleConn)
p.idle.Remove(e)
test := p.TestOnBorrow
p.mu.Unlock()
if test == nil || test(ic.c, ic.t) == nil {
return ic.c, nil
}
ic.c.Close()
p.mu.Lock()
p.active -= 1
}
if p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
// No idle connection, create new.
dial := p.Dial
p.active += 1
p.mu.Unlock()
c, err := dial()
if err != nil {
p.mu.Lock()
p.active -= 1
p.mu.Unlock()
c = nil
}
return c, err
}
func (p *Pool) put(c Conn) error {
if c.Err() == nil {
p.mu.Lock()
if !p.closed {
p.idle.PushFront(idleConn{t: nowFunc(), c: c})
if p.idle.Len() > p.MaxIdle {
c = p.idle.Remove(p.idle.Back()).(idleConn).c
} else {
c = nil
}
}
p.mu.Unlock()
}
if c != nil {
p.mu.Lock()
p.active -= 1
p.mu.Unlock()
return c.Close()
}
return nil
}
type pooledConnection struct {
c Conn
err error
p *Pool
}
func (c *pooledConnection) get() error {
if c.err == nil && c.c == nil {
c.c, c.err = c.p.get()
}
return c.err
}
func (c *pooledConnection) Close() (err error) {
if c.c != nil {
c.c.Do("")
c.p.put(c.c)
c.c = nil
c.err = errPoolClosed
}
return err
}
func (c *pooledConnection) Err() error {
if err := c.get(); err != nil {
return err
}
return c.c.Err()
}
func (c *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
if err := c.get(); err != nil {
return nil, err
}
return c.c.Do(commandName, args...)
}
func (c *pooledConnection) Send(commandName string, args ...interface{}) error {
if err := c.get(); err != nil {
return err
}
return c.c.Send(commandName, args...)
}
func (c *pooledConnection) Flush() error {
if err := c.get(); err != nil {
return err
}
return c.c.Flush()
}
func (c *pooledConnection) Receive() (reply interface{}, err error) {
if err := c.get(); err != nil {
return nil, err
}
return c.c.Receive()
}

250
redigo/redis/pool_test.go Normal file
View File

@@ -0,0 +1,250 @@
// Copyright 2011 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"io"
"testing"
"time"
)
type fakeConn struct {
open *int
err error
}
func (c *fakeConn) Close() error { *c.open -= 1; return nil }
func (c *fakeConn) Err() error { return c.err }
func (c *fakeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
if commandName == "ERR" {
c.err = args[0].(error)
}
return nil, nil
}
func (c *fakeConn) Send(commandName string, args ...interface{}) error {
return nil
}
func (c *fakeConn) Flush() error {
return nil
}
func (c *fakeConn) Receive() (reply interface{}, err error) {
return nil, nil
}
type dialer struct {
t *testing.T
dialed, open int
}
func (d *dialer) dial() (Conn, error) {
d.open += 1
d.dialed += 1
return &fakeConn{open: &d.open}, nil
}
func (d *dialer) check(message string, p *Pool, dialed, open int) {
if d.dialed != dialed {
d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed)
}
if d.open != open {
d.t.Errorf("%s: open=%d, want %d", message, d.open, open)
}
if active := p.ActiveCount(); active != open {
d.t.Errorf("%s: active=%d, want %d", message, active, open)
}
}
func TestPoolReuse(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
Dial: d.dial,
}
for i := 0; i < 10; i++ {
c1 := p.Get()
c1.Do("PING")
c2 := p.Get()
c2.Do("PING")
c1.Close()
c2.Close()
}
d.check("before close", p, 2, 2)
p.Close()
d.check("after close", p, 2, 0)
}
func TestPoolMaxIdle(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
Dial: d.dial,
}
for i := 0; i < 10; i++ {
c1 := p.Get()
c1.Do("PING")
c2 := p.Get()
c2.Do("PING")
c3 := p.Get()
c3.Do("PING")
c1.Close()
c2.Close()
c3.Close()
}
d.check("before close", p, 12, 2)
p.Close()
d.check("after close", p, 12, 0)
}
func TestPoolError(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
Dial: d.dial,
}
c := p.Get()
c.Do("ERR", io.EOF)
if c.Err() == nil {
t.Errorf("expected c.Err() != nil")
}
c.Close()
c = p.Get()
c.Do("ERR", io.EOF)
c.Close()
d.check(".", p, 2, 0)
}
func TestPoolClose(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
Dial: d.dial,
}
c1 := p.Get()
c1.Do("PING")
c2 := p.Get()
c2.Do("PING")
c3 := p.Get()
c3.Do("PING")
c1.Close()
if _, err := c1.Do("PING"); err == nil {
t.Errorf("expected error after connection closed")
}
c2.Close()
p.Close()
d.check("after pool close", p, 3, 1)
if _, err := c1.Do("PING"); err == nil {
t.Errorf("expected error after connection and pool closed")
}
c3.Close()
d.check("after channel close", p, 3, 0)
c1 = p.Get()
if _, err := c1.Do("PING"); err == nil {
t.Errorf("expected error after pool closed")
}
}
func TestPoolTimeout(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
IdleTimeout: 300 * time.Second,
Dial: d.dial,
}
now := time.Now()
nowFunc = func() time.Time { return now }
defer func() { nowFunc = time.Now }()
c := p.Get()
c.Do("PING")
c.Close()
d.check("1", p, 1, 1)
now = now.Add(p.IdleTimeout)
c = p.Get()
c.Do("PING")
c.Close()
d.check("2", p, 2, 1)
}
func TestBorrowCheck(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
Dial: d.dial,
TestOnBorrow: func(Conn, time.Time) error { return Error("BLAH") },
}
for i := 0; i < 10; i++ {
c := p.Get()
c.Do("PING")
c.Close()
}
d.check("1", p, 10, 1)
}
func TestMaxActive(t *testing.T) {
d := dialer{t: t}
p := &Pool{
MaxIdle: 2,
MaxActive: 2,
Dial: d.dial,
}
c1 := p.Get()
c1.Do("PING")
c2 := p.Get()
c2.Do("PING")
d.check("1", p, 2, 2)
c3 := p.Get()
if _, err := c3.Do("PING"); err != ErrPoolExhausted {
t.Errorf("expected pool exhausted")
}
c3.Close()
d.check("2", p, 2, 2)
c2.Close()
d.check("2", p, 2, 2)
c3 = p.Get()
if _, err := c3.Do("PING"); err != nil {
t.Errorf("expected good channel, err=%v", err)
}
c3.Close()
d.check("2", p, 2, 2)
}

129
redigo/redis/pubsub.go Normal file
View File

@@ -0,0 +1,129 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
)
// Subscription represents a subscribe or unsubscribe notification.
type Subscription struct {
// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
Kind string
// The channel that was changed.
Channel string
// The current number of subscriptions for connection.
Count int
}
// Message represents a message notification.
type Message struct {
// The originating channel.
Channel string
// The message data.
Data []byte
}
// PMessage represents a pmessage notification.
type PMessage struct {
// The matched pattern.
Pattern string
// The originating channel.
Channel string
// The message data.
Data []byte
}
// PubSubConn wraps a Conn with convenience methods for subscribers.
type PubSubConn struct {
Conn Conn
}
// Close closes the connection.
func (c PubSubConn) Close() error {
return c.Conn.Close()
}
// Subscribe subscribes the connection to the specified channels.
func (c PubSubConn) Subscribe(channel ...interface{}) error {
c.Conn.Send("SUBSCRIBE", channel...)
return c.Conn.Flush()
}
// PSubscribe subscribes the connection to the given patterns.
func (c PubSubConn) PSubscribe(channel ...interface{}) error {
c.Conn.Send("PSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// Unsubscribe unsubscribes the connection from the given channels, or from all
// of them if none is given.
func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
c.Conn.Send("UNSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// PUnsubscribe unsubscribes the connection from the given patterns, or from all
// of them if none is given.
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
c.Conn.Send("PUNSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// Receive returns a pushed message as a Subscription, Message, PMessage or
// error. The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
reply, err := Values(c.Conn.Receive())
if err != nil {
return err
}
var kind string
reply, err = Scan(reply, &kind)
if err != nil {
return err
}
switch kind {
case "message":
var m Message
if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "pmessage":
var pm PMessage
if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil {
return err
}
return pm
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err
}
return s
}
return errors.New("redigo: unknown pubsub notification")
}

138
redigo/redis/pubsub_test.go Normal file
View File

@@ -0,0 +1,138 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"fmt"
"github.com/garyburd/redigo/redis"
"net"
"reflect"
"sync"
"testing"
"time"
)
func publish(channel, value interface{}) {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Do("PUBLISH", channel, value)
}
// Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine.
func ExamplePubSubConn() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
var wg sync.WaitGroup
wg.Add(2)
psc := redis.PubSubConn{Conn: c}
// This goroutine receives and prints pushed notifications from the server.
// The goroutine exits when the connection is unsubscribed from all
// channels or there is an error.
go func() {
defer wg.Done()
for {
switch n := psc.Receive().(type) {
case redis.Message:
fmt.Printf("Message: %s %s\n", n.Channel, n.Data)
case redis.PMessage:
fmt.Printf("PMessage: %s %s %s\n", n.Pattern, n.Channel, n.Data)
case redis.Subscription:
fmt.Printf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count)
if n.Count == 0 {
return
}
case error:
fmt.Printf("error: %v\n", n)
return
}
}
}()
// This goroutine manages subscriptions for the connection.
go func() {
defer wg.Done()
psc.Subscribe("example")
psc.PSubscribe("p*")
// The following function calls publish a message using another
// connection to the Redis server.
publish("example", "hello")
publish("example", "world")
publish("pexample", "foo")
publish("pexample", "bar")
// Unsubscribe from all connections. This will cause the receiving
// goroutine to exit.
psc.Unsubscribe()
psc.PUnsubscribe()
}()
wg.Wait()
// Output:
// Subscription: subscribe example 1
// Subscription: psubscribe p* 2
// Message: example hello
// Message: example world
// PMessage: p* pexample foo
// PMessage: p* pexample bar
// Subscription: unsubscribe example 1
// Subscription: punsubscribe p* 0
}
func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) {
actual := c.Receive()
if !reflect.DeepEqual(actual, expected) {
t.Errorf("%s = %v, want %v", message, actual, expected)
}
}
func TestPushed(t *testing.T) {
pc := dialt(t)
defer pc.Close()
nc, err := net.Dial("tcp", ":6379")
if err != nil {
t.Fatal(err)
}
defer nc.Close()
nc.SetReadDeadline(time.Now().Add(4 * time.Second))
c := redis.PubSubConn{Conn: redis.NewConn(nc, 0, 0)}
c.Subscribe("c1")
expectPushed(t, c, "Subscribe(c1)", redis.Subscription{Kind: "subscribe", Channel: "c1", Count: 1})
c.Subscribe("c2")
expectPushed(t, c, "Subscribe(c2)", redis.Subscription{Kind: "subscribe", Channel: "c2", Count: 2})
c.PSubscribe("p1")
expectPushed(t, c, "PSubscribe(p1)", redis.Subscription{Kind: "psubscribe", Channel: "p1", Count: 3})
c.PSubscribe("p2")
expectPushed(t, c, "PSubscribe(p2)", redis.Subscription{Kind: "psubscribe", Channel: "p2", Count: 4})
c.PUnsubscribe()
expectPushed(t, c, "Punsubscribe(p1)", redis.Subscription{Kind: "punsubscribe", Channel: "p1", Count: 3})
expectPushed(t, c, "Punsubscribe()", redis.Subscription{Kind: "punsubscribe", Channel: "p2", Count: 2})
pc.Do("PUBLISH", "c1", "hello")
expectPushed(t, c, "PUBLISH c1 hello", redis.Message{Channel: "c1", Data: []byte("hello")})
}

44
redigo/redis/redis.go Normal file
View File

@@ -0,0 +1,44 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value if the connection is broken. The returned
// value is either the first non-nil value returned from the underlying
// network connection or a protocol parsing error. Applications should
// close broken connections.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}

239
redigo/redis/reply.go Normal file
View File

@@ -0,0 +1,239 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"fmt"
"strconv"
)
// ErrNil indicates that a reply value is nil.
var ErrNil = errors.New("redigo: nil returned")
// Int is a helper that converts a command reply to an integer. If err is not
// equal to nil, then Int returns 0, err. Otherwise, Int converts the
// reply to an int as follows:
//
// Reply type Result
// integer int(reply), nil
// bulk parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
x := int(reply)
if int64(x) != reply {
return 0, strconv.ErrRange
}
return x, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 0)
return int(n), err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply)
}
// Int64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
return reply, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply)
}
// Float64 is a helper that converts a command reply to 64 bit float. If err is
// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts
// the reply to an int as follows:
//
// Reply type Result
// bulk parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case []byte:
n, err := strconv.ParseFloat(string(reply), 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply)
}
// String is a helper that converts a command reply to a string. If err is not
// equal to nil, then String returns "", err. Otherwise String converts the
// reply to a string as follows:
//
// Reply type Result
// bulk string(reply), nil
// status reply, nil
// nil "", ErrNil
// other "", error
func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}
switch reply := reply.(type) {
case []byte:
return string(reply), nil
case string:
return reply, nil
case nil:
return "", ErrNil
case Error:
return "", reply
}
return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply)
}
// Bytes is a helper that converts a command reply to a slice of bytes. If err
// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts
// the reply to a slice of bytes as follows:
//
// Reply type Result
// bulk reply, nil
// status []byte(reply), nil
// nil nil, ErrNil
// other nil, error
func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
return reply, nil
case string:
return []byte(reply), nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply)
}
// Bool is a helper that converts a command reply to a boolean. If err is not
// equal to nil, then Bool returns false, err. Otherwise Bool converts the
// reply to boolean as follows:
//
// Reply type Result
// integer value != 0, nil
// bulk strconv.ParseBool(reply)
// nil false, ErrNil
// other false, error
func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case int64:
return reply != 0, nil
case []byte:
return strconv.ParseBool(string(reply))
case nil:
return false, ErrNil
case Error:
return false, reply
}
return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply)
}
// MultiBulk is deprecated. Use Values.
func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) }
// Values is a helper that converts a multi-bulk command reply to a
// []interface{}. If err is not equal to nil, then Values returns nil, err.
// Otherwise, Multi converts the reply as follows:
//
// Reply type Result
// multi-bulk reply, nil
// nil nil, ErrNil
// other nil, error
func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply)
}
// Strings is a helper that converts a multi-bulk command reply to a []string.
// If err is not equal to nil, then Strings returns nil, err. If one if the
// multi-bulk items is not a bulk value or nil, then Strings returns an error.
func Strings(reply interface{}, err error) ([]string, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([]string, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
p, ok := reply[i].([]byte)
if !ok {
return nil, fmt.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i])
}
result[i] = string(p)
}
return result, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, fmt.Errorf("redigo: unexpected type for Strings, got type %T", reply)
}

126
redigo/redis/reply_test.go Normal file
View File

@@ -0,0 +1,126 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"fmt"
"reflect"
"testing"
"github.com/garyburd/redigo/redis"
)
type valueError struct {
v interface{}
err error
}
func ve(v interface{}, err error) valueError {
return valueError{v, err}
}
var replyTests = []struct {
name interface{}
actual valueError
expected valueError
}{
{
"strings([v1, v2])",
ve(redis.Strings([]interface{}{[]byte("v1"), []byte("v2")}, nil)),
ve([]string{"v1", "v2"}, nil),
},
{
"strings(nil)",
ve(redis.Strings(nil, nil)),
ve([]string(nil), redis.ErrNil),
},
{
"values([v1, v2])",
ve(redis.Values([]interface{}{[]byte("v1"), []byte("v2")}, nil)),
ve([]interface{}{[]byte("v1"), []byte("v2")}, nil),
},
{
"values(nil)",
ve(redis.Values(nil, nil)),
ve([]interface{}(nil), redis.ErrNil),
},
{
"float64(1.0)",
ve(redis.Float64([]byte("1.0"), nil)),
ve(float64(1.0), nil),
},
{
"float64(nil)",
ve(redis.Float64(nil, nil)),
ve(float64(0.0), redis.ErrNil),
},
}
func TestReply(t *testing.T) {
for _, rt := range replyTests {
if rt.actual.err != rt.expected.err {
t.Errorf("%s returned err %v, want %v", rt.name, rt.actual.err, rt.expected.err)
continue
}
if !reflect.DeepEqual(rt.actual.v, rt.expected.v) {
t.Errorf("%s=%+v, want %+v", rt.name, rt.actual.v, rt.expected.v)
}
}
}
func ExampleBool() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Do("SET", "foo", 1)
exists, _ := redis.Bool(c.Do("EXISTS", "foo"))
fmt.Printf("%#v\n", exists)
// Output:
// true
}
func ExampleInt() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Do("SET", "k1", 1)
n, _ := redis.Int(c.Do("GET", "k1"))
fmt.Printf("%#v\n", n)
n, _ = redis.Int(c.Do("INCR", "k1"))
fmt.Printf("%#v\n", n)
// Output:
// 1
// 2
}
func ExampleString() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Do("SET", "hello", "world")
s, err := redis.String(c.Do("GET", "hello"))
fmt.Printf("%#v\n", s)
// Output:
// "world"
}

529
redigo/redis/scan.go Normal file
View File

@@ -0,0 +1,529 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
)
func ensureLen(d reflect.Value, n int) {
if n > d.Cap() {
d.Set(reflect.MakeSlice(d.Type(), n, n))
} else {
d.SetLen(n)
}
}
func cannotConvert(d reflect.Value, s interface{}) error {
return fmt.Errorf("redigo: Scan cannot convert from %s to %s",
reflect.TypeOf(s), d.Type())
}
func convertAssignBytes(d reflect.Value, s []byte) (err error) {
switch d.Type().Kind() {
case reflect.Float32, reflect.Float64:
var x float64
x, err = strconv.ParseFloat(string(s), d.Type().Bits())
d.SetFloat(x)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
var x int64
x, err = strconv.ParseInt(string(s), 10, d.Type().Bits())
d.SetInt(x)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var x uint64
x, err = strconv.ParseUint(string(s), 10, d.Type().Bits())
d.SetUint(x)
case reflect.Bool:
var x bool
x, err = strconv.ParseBool(string(s))
d.SetBool(x)
case reflect.String:
d.SetString(string(s))
case reflect.Slice:
if d.Type().Elem().Kind() != reflect.Uint8 {
err = cannotConvert(d, s)
} else {
d.SetBytes(s)
}
default:
err = cannotConvert(d, s)
}
return
}
func convertAssignInt(d reflect.Value, s int64) (err error) {
switch d.Type().Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
d.SetInt(s)
if d.Int() != s {
err = strconv.ErrRange
d.SetInt(0)
}
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
if s < 0 {
err = strconv.ErrRange
} else {
x := uint64(s)
d.SetUint(x)
if d.Uint() != x {
err = strconv.ErrRange
d.SetUint(0)
}
}
case reflect.Bool:
d.SetBool(s != 0)
default:
err = cannotConvert(d, s)
}
return
}
func convertAssignValues(d reflect.Value, s []interface{}) (err error) {
if d.Type().Kind() != reflect.Slice {
return cannotConvert(d, s)
}
ensureLen(d, len(s))
for i := 0; i < len(s); i++ {
switch s := s[i].(type) {
case []byte:
err = convertAssignBytes(d.Index(i), s)
case int64:
err = convertAssignInt(d.Index(i), s)
default:
err = cannotConvert(d, s)
}
if err != nil {
break
}
}
return
}
func convertAssign(d interface{}, s interface{}) (err error) {
// Handle the most common destination types using type switches and
// fall back to reflection for all other types.
switch s := s.(type) {
case nil:
// ingore
case []byte:
switch d := d.(type) {
case *string:
*d = string(s)
case *int:
*d, err = strconv.Atoi(string(s))
case *bool:
*d, err = strconv.ParseBool(string(s))
case *[]byte:
*d = s
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignBytes(d.Elem(), s)
}
}
case int64:
switch d := d.(type) {
case *int:
x := int(s)
if int64(x) != s {
err = strconv.ErrRange
x = 0
}
*d = x
case *bool:
*d = s != 0
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignInt(d.Elem(), s)
}
}
case []interface{}:
switch d := d.(type) {
case *[]interface{}:
*d = s
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignValues(d.Elem(), s)
}
}
case Error:
err = s
default:
err = cannotConvert(reflect.ValueOf(d), s)
}
return
}
// Scan copies from the multi-bulk src to the values pointed at by dest.
//
// The values pointed at by dest must be an integer, float, boolean, string,
// []byte, interface{} or slices of these types. Scan uses the standard strconv
// package to convert bulk values to numeric and boolean types.
//
// If a dest value is nil, then the corresponding src value is skipped.
//
// If the multi-bulk value is nil, then the corresponding dest value is not
// modified.
//
// To enable easy use of Scan in a loop, Scan returns the slice of src
// following the copied values.
func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) {
if len(src) < len(dest) {
return nil, errors.New("redigo: Scan multibulk short")
}
var err error
for i, d := range dest {
err = convertAssign(d, src[i])
if err != nil {
break
}
}
return src[len(dest):], err
}
type fieldSpec struct {
name string
index []int
//omitEmpty bool
}
type structSpec struct {
m map[string]*fieldSpec
l []*fieldSpec
}
func (ss *structSpec) fieldSpec(name []byte) *fieldSpec {
return ss.m[string(name)]
}
func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *structSpec) {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
switch {
case f.PkgPath != "":
// Ignore unexported fields.
case f.Anonymous:
// TODO: Handle pointers. Requires change to decoder and
// protection against infinite recursion.
if f.Type.Kind() == reflect.Struct {
compileStructSpec(f.Type, depth, append(index, i), ss)
}
default:
fs := &fieldSpec{name: f.Name}
tag := f.Tag.Get("redis")
p := strings.Split(tag, ",")
if len(p) > 0 {
if p[0] == "-" {
continue
}
if len(p[0]) > 0 {
fs.name = p[0]
}
for _, s := range p[1:] {
switch s {
//case "omitempty":
// fs.omitempty = true
default:
panic(errors.New("redigo: unknown field flag " + s + " for type " + t.Name()))
}
}
}
d, found := depth[fs.name]
if !found {
d = 1 << 30
}
switch {
case len(index) == d:
// At same depth, remove from result.
delete(ss.m, fs.name)
j := 0
for i := 0; i < len(ss.l); i++ {
if fs.name != ss.l[i].name {
ss.l[j] = ss.l[i]
j += 1
}
}
ss.l = ss.l[:j]
case len(index) < d:
fs.index = make([]int, len(index)+1)
copy(fs.index, index)
fs.index[len(index)] = i
depth[fs.name] = len(index)
ss.m[fs.name] = fs
ss.l = append(ss.l, fs)
}
}
}
}
var (
structSpecMutex sync.RWMutex
structSpecCache = make(map[reflect.Type]*structSpec)
defaultFieldSpec = &fieldSpec{}
)
func structSpecForType(t reflect.Type) *structSpec {
structSpecMutex.RLock()
ss, found := structSpecCache[t]
structSpecMutex.RUnlock()
if found {
return ss
}
structSpecMutex.Lock()
defer structSpecMutex.Unlock()
ss, found = structSpecCache[t]
if found {
return ss
}
ss = &structSpec{m: make(map[string]*fieldSpec)}
compileStructSpec(t, make(map[string]int), nil, ss)
structSpecCache[t] = ss
return ss
}
var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil pointer to a struct")
// ScanStruct scans a multi-bulk src containing alternating names and values to
// a struct. The HGETALL and CONFIG GET commands return replies in this format.
//
// ScanStruct uses exported field names to match values in the response. Use
// 'redis' field tag to override the name:
//
// Field int `redis:"myName"`
//
// Fields with the tag redis:"-" are ignored.
//
// Integer, float boolean string and []byte fields are supported. Scan uses
// the standard strconv package to convert bulk values to numeric and boolean
// types.
//
// If the multi-bulk value is nil, then the corresponding field is not
// modified.
func ScanStruct(src []interface{}, dest interface{}) error {
d := reflect.ValueOf(dest)
if d.Kind() != reflect.Ptr || d.IsNil() {
return errScanStructValue
}
d = d.Elem()
if d.Kind() != reflect.Struct {
return errScanStructValue
}
ss := structSpecForType(d.Type())
if len(src)%2 != 0 {
return errors.New("redigo: ScanStruct expects even number of values in values")
}
for i := 0; i < len(src); i += 2 {
name, ok := src[i].([]byte)
if !ok {
return errors.New("redigo: ScanStruct key not a bulk value")
}
fs := ss.fieldSpec(name)
if fs == nil {
continue
}
f := d.FieldByIndex(fs.index)
var err error
switch s := src[i+1].(type) {
case nil:
// ignore
case []byte:
err = convertAssignBytes(f, s)
case int64:
err = convertAssignInt(f, s)
default:
err = cannotConvert(f, s)
}
if err != nil {
return err
}
}
return nil
}
var (
errScanSliceValue = errors.New("redigo: ScanSlice dest must be non-nil pointer to a struct")
errScanSliceSrc = errors.New("redigo: ScanSlice src element must be bulk or nil")
)
// ScanSlice scans multi-bulk src to the slice pointed to by dest. The elements
// the dest slice must be integer, float, boolean, string, struct or pointer to
// struct values.
//
// Struct fields must be integer, float, boolean or string values. All struct
// fields are used unless a subset is specified using fieldNames.
func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error {
d := reflect.ValueOf(dest)
if d.Kind() != reflect.Ptr || d.IsNil() {
return errScanSliceValue
}
d = d.Elem()
if d.Kind() != reflect.Slice {
return errScanSliceValue
}
isPtr := false
t := d.Type().Elem()
if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct {
isPtr = true
t = t.Elem()
}
if t.Kind() != reflect.Struct {
ensureLen(d, len(src))
for i, s := range src {
if s == nil {
continue
}
s, ok := s.([]byte)
if !ok {
return errScanSliceSrc
}
if err := convertAssignBytes(d.Index(i), s); err != nil {
return err
}
}
return nil
}
ss := structSpecForType(t)
fss := ss.l
if len(fieldNames) > 0 {
fss = make([]*fieldSpec, len(fieldNames))
for i, name := range fieldNames {
fss[i] = ss.m[name]
if fss[i] == nil {
return errors.New("redigo: ScanSlice bad field name " + name)
}
}
}
if len(fss) == 0 {
return errors.New("redigo: ScanSlice no struct fields")
}
n := len(src) / len(fss)
if n*len(fss) != len(src) {
return errors.New("redigo: ScanSlice length not a multiple of struct field count")
}
ensureLen(d, n)
for i := 0; i < n; i++ {
d := d.Index(i)
if isPtr {
if d.IsNil() {
d.Set(reflect.New(t))
}
d = d.Elem()
}
for j, fs := range fss {
s := src[i*len(fss)+j]
if s == nil {
continue
}
sb, ok := s.([]byte)
if !ok {
return errScanSliceSrc
}
d := d.FieldByIndex(fs.index)
if err := convertAssignBytes(d, sb); err != nil {
return err
}
}
}
return nil
}
// Args is a helper for constructing command arguments from structured values.
type Args []interface{}
// Add returns the result of appending value to args.
func (args Args) Add(value interface{}) Args {
return append(args, value)
}
// AddFlat returns the result of appending the flattened value of v to args.
//
// Maps are flattened by appending the alternating keys and map values to args.
//
// Slices are flattened by appending the slice elements to args.
//
// Structs are flattened by appending the alternating names and values of
// exported fields to args. If v is a nil struct pointer, then nothing is
// appended. The 'redis' field tag overrides struct field names. See ScanStruct
// for more information on the use of the 'redis' field tag.
//
// Other types are appended to args as is.
func (args Args) AddFlat(v interface{}) Args {
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Struct:
args = flattenStruct(args, rv)
case reflect.Slice:
for i := 0; i < rv.Len(); i++ {
args = append(args, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
args = append(args, k.Interface(), rv.MapIndex(k).Interface())
}
case reflect.Ptr:
if rv.Type().Elem().Kind() == reflect.Struct {
if !rv.IsNil() {
args = flattenStruct(args, rv.Elem())
}
} else {
args = append(args, v)
}
default:
args = append(args, v)
}
return args
}
func flattenStruct(args Args, v reflect.Value) Args {
ss := structSpecForType(v.Type())
for _, fs := range ss.l {
fv := v.FieldByIndex(fs.index)
args = append(args, fs.name, fv.Interface())
}
return args
}

412
redigo/redis/scan_test.go Normal file
View File

@@ -0,0 +1,412 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"fmt"
"github.com/garyburd/redigo/redis"
"math"
"reflect"
"testing"
)
var scanConversionTests = []struct {
src interface{}
dest interface{}
}{
{[]byte("-inf"), math.Inf(-1)},
{[]byte("+inf"), math.Inf(1)},
{[]byte("0"), float64(0)},
{[]byte("3.14159"), float64(3.14159)},
{[]byte("3.14"), float32(3.14)},
{[]byte("-100"), int(-100)},
{[]byte("101"), int(101)},
{int64(102), int(102)},
{[]byte("103"), uint(103)},
{int64(104), uint(104)},
{[]byte("105"), int8(105)},
{int64(106), int8(106)},
{[]byte("107"), uint8(107)},
{int64(108), uint8(108)},
{[]byte("0"), false},
{int64(0), false},
{[]byte("f"), false},
{[]byte("1"), true},
{int64(1), true},
{[]byte("t"), true},
{[]byte("hello"), "hello"},
{[]byte("world"), []byte("world")},
{[]interface{}{[]byte("foo")}, []interface{}{[]byte("foo")}},
{[]interface{}{[]byte("foo")}, []string{"foo"}},
{[]interface{}{[]byte("hello"), []byte("world")}, []string{"hello", "world"}},
{[]interface{}{[]byte("bar")}, [][]byte{[]byte("bar")}},
{[]interface{}{[]byte("1")}, []int{1}},
{[]interface{}{[]byte("1"), []byte("2")}, []int{1, 2}},
{[]interface{}{[]byte("1"), []byte("2")}, []float64{1, 2}},
{[]interface{}{[]byte("1")}, []byte{1}},
{[]interface{}{[]byte("1")}, []bool{true}},
}
func TestScanConversion(t *testing.T) {
for _, tt := range scanConversionTests {
values := []interface{}{tt.src}
dest := reflect.New(reflect.TypeOf(tt.dest))
values, err := redis.Scan(values, dest.Interface())
if err != nil {
t.Errorf("Scan(%v) returned error %v", tt, err)
continue
}
if !reflect.DeepEqual(tt.dest, dest.Elem().Interface()) {
t.Errorf("Scan(%v) returned %v, want %v", tt, dest.Elem().Interface(), tt.dest)
}
}
}
var scanConversionErrorTests = []struct {
src interface{}
dest interface{}
}{
{[]byte("1234"), byte(0)},
{int64(1234), byte(0)},
{[]byte("-1"), byte(0)},
{int64(-1), byte(0)},
{[]byte("junk"), false},
{redis.Error("blah"), false},
}
func TestScanConversionError(t *testing.T) {
for _, tt := range scanConversionErrorTests {
values := []interface{}{tt.src}
dest := reflect.New(reflect.TypeOf(tt.dest))
values, err := redis.Scan(values, dest.Interface())
if err == nil {
t.Errorf("Scan(%v) did not return error", tt)
}
}
}
func ExampleScan() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Send("HMSET", "album:1", "title", "Red", "rating", 5)
c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1)
c.Send("HMSET", "album:3", "title", "Beat")
c.Send("LPUSH", "albums", "1")
c.Send("LPUSH", "albums", "2")
c.Send("LPUSH", "albums", "3")
values, err := redis.Values(c.Do("SORT", "albums",
"BY", "album:*->rating",
"GET", "album:*->title",
"GET", "album:*->rating"))
if err != nil {
panic(err)
}
for len(values) > 0 {
var title string
rating := -1 // initialize to illegal value to detect nil.
values, err = redis.Scan(values, &title, &rating)
if err != nil {
panic(err)
}
if rating == -1 {
fmt.Println(title, "not-rated")
} else {
fmt.Println(title, rating)
}
}
// Output:
// Beat not-rated
// Earthbound 1
// Red 5
}
type s0 struct {
X int
Y int `redis:"y"`
Bt bool
}
type s1 struct {
X int `redis:"-"`
I int `redis:"i"`
U uint `redis:"u"`
S string `redis:"s"`
P []byte `redis:"p"`
B bool `redis:"b"`
Bt bool
Bf bool
s0
}
var scanStructTests = []struct {
title string
reply []string
value interface{}
}{
{"basic",
[]string{"i", "-1234", "u", "5678", "s", "hello", "p", "world", "b", "t", "Bt", "1", "Bf", "0", "X", "123", "y", "456"},
&s1{I: -1234, U: 5678, S: "hello", P: []byte("world"), B: true, Bt: true, Bf: false, s0: s0{X: 123, Y: 456}},
},
}
func TestScanStruct(t *testing.T) {
for _, tt := range scanStructTests {
var reply []interface{}
for _, v := range tt.reply {
reply = append(reply, []byte(v))
}
value := reflect.New(reflect.ValueOf(tt.value).Type().Elem())
if err := redis.ScanStruct(reply, value.Interface()); err != nil {
t.Fatalf("ScanStruct(%s) returned error %v", tt.title, err)
}
if !reflect.DeepEqual(value.Interface(), tt.value) {
t.Fatalf("ScanStruct(%s) returned %v, want %v", tt.title, value.Interface(), tt.value)
}
}
}
func TestBadScanStructArgs(t *testing.T) {
x := []interface{}{"A", "b"}
test := func(v interface{}) {
if err := redis.ScanStruct(x, v); err == nil {
t.Errorf("Expect error for ScanStruct(%T, %T)", x, v)
}
}
test(nil)
var v0 *struct{}
test(v0)
var v1 int
test(&v1)
x = x[:1]
v2 := struct{ A string }{}
test(&v2)
}
var scanSliceTests = []struct {
src []interface{}
fieldNames []string
ok bool
dest interface{}
}{
{
[]interface{}{[]byte("1"), nil, []byte("-1")},
nil,
true,
[]int{1, 0, -1},
},
{
[]interface{}{[]byte("1"), nil, []byte("2")},
nil,
true,
[]uint{1, 0, 2},
},
{
[]interface{}{[]byte("-1")},
nil,
false,
[]uint{1},
},
{
[]interface{}{[]byte("hello"), nil, []byte("world")},
nil,
true,
[][]byte{[]byte("hello"), nil, []byte("world")},
},
{
[]interface{}{[]byte("hello"), nil, []byte("world")},
nil,
true,
[]string{"hello", "", "world"},
},
{
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
nil,
true,
[]struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}},
},
{
[]interface{}{[]byte("a1"), []byte("b1")},
nil,
false,
[]struct{ A, B, C string }{{"a1", "b1", ""}},
},
{
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
nil,
true,
[]*struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}},
},
{
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
[]string{"A", "B"},
true,
[]struct{ A, C, B string }{{"a1", "", "b1"}, {"a2", "", "b2"}},
},
{
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
nil,
false,
[]struct{}{},
},
}
func TestScanSlice(t *testing.T) {
for _, tt := range scanSliceTests {
typ := reflect.ValueOf(tt.dest).Type()
dest := reflect.New(typ)
err := redis.ScanSlice(tt.src, dest.Interface(), tt.fieldNames...)
if tt.ok != (err == nil) {
t.Errorf("ScanSlice(%v, []%s, %v) returned error %v", tt.src, typ, tt.fieldNames, err)
continue
}
if tt.ok && !reflect.DeepEqual(dest.Elem().Interface(), tt.dest) {
t.Errorf("ScanSlice(src, []%s) returned %#v, want %#v", typ, dest.Elem().Interface(), tt.dest)
}
}
}
func ExampleScanSlice() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
c.Send("HMSET", "album:1", "title", "Red", "rating", 5)
c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1)
c.Send("HMSET", "album:3", "title", "Beat", "rating", 4)
c.Send("LPUSH", "albums", "1")
c.Send("LPUSH", "albums", "2")
c.Send("LPUSH", "albums", "3")
values, err := redis.Values(c.Do("SORT", "albums",
"BY", "album:*->rating",
"GET", "album:*->title",
"GET", "album:*->rating"))
if err != nil {
panic(err)
}
var albums []struct {
Title string
Rating int
}
if err := redis.ScanSlice(values, &albums); err != nil {
panic(err)
}
fmt.Printf("%v\n", albums)
// Output:
// [{Earthbound 1} {Beat 4} {Red 5}]
}
var argsTests = []struct {
title string
actual redis.Args
expected redis.Args
}{
{"struct ptr",
redis.Args{}.AddFlat(&struct {
I int `redis:"i"`
U uint `redis:"u"`
S string `redis:"s"`
P []byte `redis:"p"`
Bt bool
Bf bool
}{
-1234, 5678, "hello", []byte("world"), true, false,
}),
redis.Args{"i", int(-1234), "u", uint(5678), "s", "hello", "p", []byte("world"), "Bt", true, "Bf", false},
},
{"struct",
redis.Args{}.AddFlat(struct{ I int }{123}),
redis.Args{"I", 123},
},
{"slice",
redis.Args{}.Add(1).AddFlat([]string{"a", "b", "c"}).Add(2),
redis.Args{1, "a", "b", "c", 2},
},
}
func TestArgs(t *testing.T) {
for _, tt := range argsTests {
if !reflect.DeepEqual(tt.actual, tt.expected) {
t.Fatalf("%s is %v, want %v", tt.title, tt.actual, tt.expected)
}
}
}
func ExampleArgs() {
c, err := dial()
if err != nil {
panic(err)
}
defer c.Close()
var p1, p2 struct {
Title string `redis:"title"`
Author string `redis:"author"`
Body string `redis:"body"`
}
p1.Title = "Example"
p1.Author = "Gary"
p1.Body = "Hello"
if _, err := c.Do("HMSET", redis.Args{}.Add("id1").AddFlat(&p1)...); err != nil {
panic(err)
}
m := map[string]string{
"title": "Example2",
"author": "Steve",
"body": "Map",
}
if _, err := c.Do("HMSET", redis.Args{}.Add("id2").AddFlat(m)...); err != nil {
panic(err)
}
for _, id := range []string{"id1", "id2"} {
v, err := redis.Values(c.Do("HGETALL", id))
if err != nil {
panic(err)
}
if err := redis.ScanStruct(v, &p2); err != nil {
panic(err)
}
fmt.Printf("%+v\n", p2)
}
// Output:
// {Title:Example Author:Gary Body:Hello}
// {Title:Example2 Author:Steve Body:Map}
}

86
redigo/redis/script.go Normal file
View File

@@ -0,0 +1,86 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"crypto/sha1"
"encoding/hex"
"io"
"strings"
)
// Script encapsulates the source, hash and key count for a Lua script. See
// http://redis.io/commands/eval for information on scripts in Redis.
type Script struct {
keyCount int
src string
hash string
}
// NewScript returns a new script object. If keyCount is greater than or equal
// to zero, then the count is automatically inserted in the EVAL command
// argument list. If keyCount is less than zero, then the application supplies
// the count as the first value in the keysAndArgs argument to the Do, Send and
// SendHash methods.
func NewScript(keyCount int, src string) *Script {
h := sha1.New()
io.WriteString(h, src)
return &Script{keyCount, src, hex.EncodeToString(h.Sum(nil))}
}
func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} {
var args []interface{}
if s.keyCount < 0 {
args = make([]interface{}, 1+len(keysAndArgs))
args[0] = spec
copy(args[1:], keysAndArgs)
} else {
args = make([]interface{}, 2+len(keysAndArgs))
args[0] = spec
args[1] = s.keyCount
copy(args[2:], keysAndArgs)
}
return args
}
// Do evalutes the script. Under the covers, Do optimistically evaluates the
// script using the EVALSHA command. If the command fails because the script is
// not loaded, then Do evaluates the script using the EVAL command (thus
// causing the script to load).
func (s *Script) Do(c Conn, keysAndArgs ...interface{}) (interface{}, error) {
v, err := c.Do("EVALSHA", s.args(s.hash, keysAndArgs)...)
if e, ok := err.(Error); ok && strings.HasPrefix(string(e), "NOSCRIPT ") {
v, err = c.Do("EVAL", s.args(s.src, keysAndArgs)...)
}
return v, err
}
// SendHash evaluates the script without waiting for the reply. The script is
// evaluated with the EVALSHA command. The application must ensure that the
// script is loaded by a previous call to Send, Do or Load methods.
func (s *Script) SendHash(c Conn, keysAndArgs ...interface{}) error {
return c.Send("EVALSHA", s.args(s.hash, keysAndArgs)...)
}
// Send evaluates the script without waiting for the reply.
func (s *Script) Send(c Conn, keysAndArgs ...interface{}) error {
return c.Send("EVAL", s.args(s.src, keysAndArgs)...)
}
// Load loads the script without evaluating it.
func (s *Script) Load(c Conn) error {
_, err := c.Do("SCRIPT", "LOAD", s.src)
return err
}

View File

@@ -0,0 +1,88 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"fmt"
"github.com/garyburd/redigo/redis"
"reflect"
"testing"
"time"
)
func ExampleScript(c redis.Conn, reply interface{}, err error) {
// Initialize a package-level variable with a script.
var getScript = redis.NewScript(1, `return redis.call('get', KEYS[1])`)
// In a function, use the script Do method to evaluate the script. The Do
// method optimistically uses the EVALSHA command. If the script is not
// loaded, then the Do method falls back to the EVAL command.
reply, err = getScript.Do(c, "foo")
}
func TestScript(t *testing.T) {
c := dialt(t)
defer c.Close()
// To test fall back in Do, we make script unique by adding comment with current time.
script := fmt.Sprintf("--%d\nreturn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", time.Now().UnixNano())
s := redis.NewScript(2, script)
reply := []interface{}{[]byte("key1"), []byte("key2"), []byte("arg1"), []byte("arg2")}
v, err := s.Do(c, "key1", "key2", "arg1", "arg2")
if err != nil {
t.Errorf("s.Do(c, ...) returned %v", err)
}
if !reflect.DeepEqual(v, reply) {
t.Errorf("s.Do(c, ..); = %v, want %v", v, reply)
}
err = s.Load(c)
if err != nil {
t.Errorf("s.Load(c) returned %v", err)
}
err = s.SendHash(c, "key1", "key2", "arg1", "arg2")
if err != nil {
t.Errorf("s.SendHash(c, ...) returned %v", err)
}
err = c.Flush()
if err != nil {
t.Errorf("c.Flush() returned %v", err)
}
v, err = c.Receive()
if !reflect.DeepEqual(v, reply) {
t.Errorf("s.SendHash(c, ..); c.Receive() = %v, want %v", v, reply)
}
err = s.Send(c, "key1", "key2", "arg1", "arg2")
if err != nil {
t.Errorf("s.Send(c, ...) returned %v", err)
}
err = c.Flush()
if err != nil {
t.Errorf("c.Flush() returned %v", err)
}
v, err = c.Receive()
if !reflect.DeepEqual(v, reply) {
t.Errorf("s.Send(c, ..); c.Receive() = %v, want %v", v, reply)
}
}

29
redigo/redis/test_test.go Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bufio"
"net"
)
type dummyClose struct{ net.Conn }
func (dummyClose) Close() error { return nil }
// NewConnBufio is a hook for tests.
func NewConnBufio(rw bufio.ReadWriter) Conn {
return &conn{br: rw.Reader, bw: rw.Writer, conn: dummyClose{}}
}

View File

@@ -0,0 +1,113 @@
// Copyright 2013 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis_test
import (
"fmt"
"github.com/garyburd/redigo/redis"
)
// zpop pops a value from the ZSET key using WATCH/MULTI/EXEC commands.
func zpop(c redis.Conn, key string) (result string, err error) {
defer func() {
// Return connection to normal state on error.
if err != nil {
c.Do("DISCARD")
}
}()
// Loop until transaction is successful.
for {
if _, err := c.Do("WATCH", key); err != nil {
return "", err
}
members, err := redis.Strings(c.Do("ZRANGE", key, 0, 0))
if err != nil {
return "", err
}
if len(members) != 1 {
return "", redis.ErrNil
}
c.Send("MULTI")
c.Send("ZREM", key, members[0])
queued, err := c.Do("EXEC")
if err != nil {
return "", err
}
if queued != nil {
result = members[0]
break
}
}
return result, nil
}
// zpopScript pops a value from a ZSET.
var zpopScript = redis.NewScript(1, `
local r = redis.call('ZRANGE', KEYS[1], 0, 0)
if r ~= nil then
r = r[1]
redis.call('ZREM', KEYS[1], r)
end
return r
`)
// This example implements ZPOP as described at
// http://redis.io/topics/transactions using WATCH/MULTI/EXEC and scripting.
func Example_zpop() {
c, err := dial()
if err != nil {
fmt.Println(err)
return
}
defer c.Close()
// Add test data using a pipeline.
for i, member := range []string{"red", "blue", "green"} {
c.Send("ZADD", "zset", i, member)
}
if _, err := c.Do(""); err != nil {
fmt.Println(err)
return
}
// Pop using WATCH/MULTI/EXEC
v, err := zpop(c, "zset")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(v)
// Pop using a script.
v, err = redis.String(zpopScript.Do(c, "zset"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println(v)
// Output:
// red
// blue
}

17
redigo/redisx/doc.go Normal file
View File

@@ -0,0 +1,17 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
// Package redisx contains experimental features for Redigo. Features in this
// package may be modified or deleted at any time.
package redisx

55
redigo/redisx/example.go Normal file
View File

@@ -0,0 +1,55 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
// +build ignore
package main
import (
"github.com/garyburd/redigo/redis"
"github.com/garyburd/redigo/redisx"
"log"
)
type MyStruct struct {
A int
B string
}
func main() {
c, err := redis.Dial("tcp", ":6379")
if err != nil {
log.Fatal(err)
}
v0 := &MyStruct{1, "hello"}
_, err = c.Do("HMSET", redisx.AppendStruct([]interface{}{"key"}, v0)...)
if err != nil {
log.Fatal(err)
}
reply, err := c.Do("HGETALL", "key")
if err != nil {
log.Fatal(err)
}
v1 := &MyStruct{}
err = redisx.ScanStruct(reply, v1)
if err != nil {
log.Fatal(err)
}
log.Printf("v1=%v", v1)
}

120
redigo/redisx/struct.go Normal file
View File

@@ -0,0 +1,120 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redisx
import (
"errors"
"reflect"
"strconv"
)
// ScanStruct is deprecated. Use redis.ScanStruct instead.
//
// ScanStruct scans a reply containing alternating names and values to a
// struct. The HGETALL and CONFIG GET commands return replies in this format.
//
// ScanStruct uses the struct field name to match values in the response. Use
// 'redis' field tag to override the name:
//
// Field int `redis:"myName"`
//
// Fields with the tag redis:"-" are ignored.
func ScanStruct(reply interface{}, dst interface{}) error {
v := reflect.ValueOf(dst)
if v.Kind() != reflect.Ptr || v.IsNil() {
return errors.New("redigo: ScanStruct value must be non-nil pointer")
}
v = v.Elem()
ss := structSpecForType(v.Type())
p, ok := reply.([]interface{})
if !ok {
return errors.New("redigo: ScanStruct expectes multibulk reply")
}
if len(p)%2 != 0 {
return errors.New("redigo: ScanStruct expects even number of values in reply")
}
for i := 0; i < len(p); i += 2 {
name, ok := p[i].([]byte)
if !ok {
return errors.New("redigo: ScanStruct key not a bulk value")
}
value, ok := p[i+1].([]byte)
if !ok {
return errors.New("redigo: ScanStruct value not a bulk value")
}
fs := ss.fieldSpec(name)
if fs == nil {
continue
}
fv := v.FieldByIndex(fs.index)
switch fv.Type().Kind() {
case reflect.String:
fv.SetString(string(value))
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
x, err := strconv.ParseInt(string(value), 10, fv.Type().Bits())
if err != nil {
return err
}
fv.SetInt(x)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
x, err := strconv.ParseUint(string(value), 10, fv.Type().Bits())
if err != nil {
return err
}
fv.SetUint(x)
case reflect.Float32, reflect.Float64:
x, err := strconv.ParseFloat(string(value), fv.Type().Bits())
if err != nil {
return err
}
fv.SetFloat(x)
case reflect.Bool:
x := len(value) != 0 && (len(value) != 1 || value[0] != '0')
fv.SetBool(x)
case reflect.Slice:
if fv.Type().Elem().Kind() != reflect.Uint8 {
// TODO: check field types in structSpec
panic("redigo: unsuported type for field " + string(name))
}
fv.SetBytes(value)
default:
// TODO: check field types in structSpec
panic("redigo: unsuported type for field " + string(name))
}
}
return nil
}
// AppendStruct is deprecated. Use redis.Args{}.AddFlat() instead.
func AppendStruct(args []interface{}, src interface{}) []interface{} {
v := reflect.ValueOf(src)
if v.Kind() == reflect.Ptr {
if v.IsNil() {
panic("redigo: FormatStruct argument must not be nil")
}
v = v.Elem()
}
if v.Kind() != reflect.Struct {
panic("redigo: FormatStruct argument must be a struct or pointer to a struct")
}
ss := structSpecForType(v.Type())
for _, fs := range ss.l {
fv := v.FieldByIndex(fs.index)
args = append(args, fs.name, fv.Interface())
}
return args
}

View File

@@ -0,0 +1,91 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redisx_test
import (
"github.com/garyburd/redigo/redisx"
"reflect"
"testing"
)
var scanStructTests = []struct {
title string
reply []string
value interface{}
}{
{"basic",
[]string{"i", "-1234", "u", "5678", "s", "hello", "p", "world", "b", "", "Bt", "1", "Bf", "0"},
&struct {
I int `redis:"i"`
U uint `redis:"u"`
S string `redis:"s"`
P []byte `redis:"p"`
B bool `redis:"b"`
Bt bool
Bf bool
}{
-1234, 5678, "hello", []byte("world"), false, true, false,
},
},
}
func TestScanStruct(t *testing.T) {
for _, tt := range scanStructTests {
var reply []interface{}
for _, v := range tt.reply {
reply = append(reply, []byte(v))
}
value := reflect.New(reflect.ValueOf(tt.value).Type().Elem())
if err := redisx.ScanStruct(reply, value.Interface()); err != nil {
t.Fatalf("ScanStruct(%s) returned error %v", tt.title, err)
}
if !reflect.DeepEqual(value.Interface(), tt.value) {
t.Fatalf("ScanStruct(%s) returned %v, want %v", tt.title, value.Interface(), tt.value)
}
}
}
var formatStructTests = []struct {
title string
args []interface{}
value interface{}
}{
{"basic",
[]interface{}{"i", int(-1234), "u", uint(5678), "s", "hello", "p", []byte("world"), "Bt", true, "Bf", false},
&struct {
I int `redis:"i"`
U uint `redis:"u"`
S string `redis:"s"`
P []byte `redis:"p"`
Bt bool
Bf bool
}{
-1234, 5678, "hello", []byte("world"), true, false,
},
},
}
func TestFormatStruct(t *testing.T) {
for _, tt := range formatStructTests {
args := redisx.AppendStruct(nil, tt.value)
if !reflect.DeepEqual(args, tt.args) {
t.Fatalf("FormatStruct(%s) returned %v, want %v", tt.title, args, tt.args)
}
}
}

125
redigo/redisx/util.go Normal file
View File

@@ -0,0 +1,125 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redisx
import (
"errors"
"reflect"
"strings"
"sync"
)
type fieldSpec struct {
name string
index []int
omitEmpty bool
}
type structSpec struct {
m map[string]*fieldSpec
l []*fieldSpec
}
func (ss *structSpec) fieldSpec(name []byte) *fieldSpec {
return ss.m[string(name)]
}
func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *structSpec) {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
switch {
case f.PkgPath != "":
// Ignore unexported fields.
case f.Anonymous:
// TODO: Handle pointers. Requires change to decoder and
// protection against infinite recursion.
if f.Type.Kind() == reflect.Struct {
compileStructSpec(f.Type, depth, append(index, i), ss)
}
default:
fs := &fieldSpec{name: f.Name}
tag := f.Tag.Get("redis")
p := strings.Split(tag, ",")
if len(p) > 0 {
if p[0] == "-" {
continue
}
if len(p[0]) > 0 {
fs.name = p[0]
}
for _, s := range p[1:] {
switch s {
case "omitempty":
fs.omitEmpty = true
default:
panic(errors.New("redigo: unknown field flag " + s + " for type " + t.Name()))
}
}
}
d, found := depth[fs.name]
if !found {
d = 1 << 30
}
switch {
case len(index) == d:
// At same depth, remove from result.
delete(ss.m, fs.name)
j := 0
for i := 0; i < len(ss.l); i++ {
if fs.name != ss.l[i].name {
ss.l[j] = ss.l[i]
j += 1
}
}
ss.l = ss.l[:j]
case len(index) < d:
fs.index = make([]int, len(index)+1)
copy(fs.index, index)
fs.index[len(index)] = i
depth[fs.name] = len(index)
ss.m[fs.name] = fs
ss.l = append(ss.l, fs)
}
}
}
}
var (
structSpecMutex sync.RWMutex
structSpecCache = make(map[reflect.Type]*structSpec)
defaultFieldSpec = &fieldSpec{}
)
func structSpecForType(t reflect.Type) *structSpec {
structSpecMutex.RLock()
ss, found := structSpecCache[t]
structSpecMutex.RUnlock()
if found {
return ss
}
structSpecMutex.Lock()
defer structSpecMutex.Unlock()
ss, found = structSpecCache[t]
if found {
return ss
}
ss = &structSpec{m: make(map[string]*fieldSpec)}
compileStructSpec(t, make(map[string]int), nil, ss)
structSpecCache[t] = ss
return ss
}

202
redis.go Normal file
View File

@@ -0,0 +1,202 @@
package cachestore
import (
"coscms/app/base/lib/cachestore/redigo/redis"
"encoding/json"
"errors"
"fmt"
)
var (
// the collection name of redis for cache adapter.
DefaultKey string = "comcmsRedis"
)
// Redis cache adapter.
type RedisCache struct {
c redis.Conn
conninfo string
key string
Debug bool
}
// create new redis cache with default collection name.
func NewRedisCache(cf map[string]string) *RedisCache {
rc := &RedisCache{key: DefaultKey}
if _, ok := cf["key"]; !ok {
cf["key"] = DefaultKey
}
rc.key = cf["key"]
rc.conninfo = cf["conn"]
var err error
rc.c, err = rc.connectInit()
if err != nil {
rc.c = nil
}
return rc
}
// Get cache from redis.
func (rc *RedisCache) Get(key string) (interface{}, error) {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return nil, err
}
}
val, err := rc.c.Do("HGET", rc.key, key)
if err != nil {
fmt.Println("[Redis]GetErr: ", err)
return nil, err
}
var v interface{}
err = Decode(val.([]byte), &v)
if err != nil {
fmt.Println("[Redis]DecodeErr: ", err)
}
if rc.Debug {
fmt.Println("[Redis]Get: ", key)
}
return v, err
}
// put cache to redis.
// timeout is ignored.
func (rc *RedisCache) Put(key string, value interface{}) error {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
}
val, err := Encode(value)
if err != nil {
fmt.Println("[Redis]EncodeErr: ", err)
return err
}
_, err = rc.c.Do("HSET", rc.key, key, val)
if err != nil {
fmt.Println("[Redis]PutErr: ", err)
}
if rc.Debug {
fmt.Println("[Redis]Put: ", key)
}
return err
}
// delete cache in redis.
func (rc *RedisCache) Del(key string) error {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
}
_, err := rc.c.Do("HDEL", rc.key, key)
if err != nil {
fmt.Println("[Redis]DelErr: ", err)
}
if rc.Debug {
fmt.Println("[Redis]Del: ", key)
}
return err
}
// check cache exist in redis.
func (rc *RedisCache) IsExist(key string) bool {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return false
}
}
v, err := redis.Bool(rc.c.Do("HEXISTS", rc.key, key))
if err != nil {
return false
}
return v
}
// increase counter in redis.
func (rc *RedisCache) Incr(key string) error {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
}
_, err := redis.Bool(rc.c.Do("HINCRBY", rc.key, key, 1))
if err != nil {
return err
}
return nil
}
// decrease counter in redis.
func (rc *RedisCache) Decr(key string) error {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
}
_, err := redis.Bool(rc.c.Do("HINCRBY", rc.key, key, -1))
if err != nil {
return err
}
return nil
}
// clean all cache in redis. delete this redis collection.
func (rc *RedisCache) ClearAll() error {
if rc.c == nil {
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
}
_, err := rc.c.Do("DEL", rc.key)
return err
}
// start redis cache adapter.
// config is like {"key":"collection key","conn":"connection info"}
// the cache item in redis are stored forever,
// so no gc operation.
func (rc *RedisCache) Connect(config string) error {
var cf map[string]string
json.Unmarshal([]byte(config), &cf)
if _, ok := cf["key"]; !ok {
cf["key"] = DefaultKey
}
if _, ok := cf["conn"]; !ok {
return errors.New("config has no conn key")
}
rc.key = cf["key"]
rc.conninfo = cf["conn"]
var err error
rc.c, err = rc.connectInit()
if err != nil {
return err
}
if rc.c == nil {
return errors.New("dial tcp conn error")
}
return nil
}
// connect to redis.
func (rc *RedisCache) connectInit() (redis.Conn, error) {
c, err := redis.Dial("tcp", rc.conninfo)
if err != nil {
return nil, err
}
return c, nil
}