tx.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package redis
  2. import (
  3. "gopkg.in/redis.v5/internal"
  4. "gopkg.in/redis.v5/internal/pool"
  5. )
  6. // Redis transaction failed.
  7. const TxFailedErr = internal.RedisError("redis: transaction failed")
  8. // Tx implements Redis transactions as described in
  9. // http://redis.io/topics/transactions. It's NOT safe for concurrent use
  10. // by multiple goroutines, because Exec resets list of watched keys.
  11. // If you don't need WATCH it is better to use Pipeline.
  12. type Tx struct {
  13. cmdable
  14. statefulCmdable
  15. baseClient
  16. }
  17. func (c *Client) newTx() *Tx {
  18. tx := Tx{
  19. baseClient: baseClient{
  20. opt: c.opt,
  21. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
  22. },
  23. }
  24. tx.cmdable.process = tx.Process
  25. tx.statefulCmdable.process = tx.Process
  26. return &tx
  27. }
  28. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
  29. tx := c.newTx()
  30. if len(keys) > 0 {
  31. if err := tx.Watch(keys...).Err(); err != nil {
  32. _ = tx.Close()
  33. return err
  34. }
  35. }
  36. firstErr := fn(tx)
  37. if err := tx.Close(); err != nil && firstErr == nil {
  38. firstErr = err
  39. }
  40. return firstErr
  41. }
  42. // close closes the transaction, releasing any open resources.
  43. func (c *Tx) Close() error {
  44. _ = c.Unwatch().Err()
  45. return c.baseClient.Close()
  46. }
  47. // Watch marks the keys to be watched for conditional execution
  48. // of a transaction.
  49. func (c *Tx) Watch(keys ...string) *StatusCmd {
  50. args := make([]interface{}, 1+len(keys))
  51. args[0] = "WATCH"
  52. for i, key := range keys {
  53. args[1+i] = key
  54. }
  55. cmd := NewStatusCmd(args...)
  56. c.Process(cmd)
  57. return cmd
  58. }
  59. // Unwatch flushes all the previously watched keys for a transaction.
  60. func (c *Tx) Unwatch(keys ...string) *StatusCmd {
  61. args := make([]interface{}, 1+len(keys))
  62. args[0] = "UNWATCH"
  63. for i, key := range keys {
  64. args[1+i] = key
  65. }
  66. cmd := NewStatusCmd(args...)
  67. c.Process(cmd)
  68. return cmd
  69. }
  70. func (c *Tx) Pipeline() *Pipeline {
  71. pipe := Pipeline{
  72. exec: c.pipelineExecer(c.txPipelineProcessCmds),
  73. }
  74. pipe.cmdable.process = pipe.Process
  75. pipe.statefulCmdable.process = pipe.Process
  76. return &pipe
  77. }
  78. // Pipelined executes commands queued in the fn in a transaction
  79. // and restores the connection state to normal.
  80. //
  81. // When using WATCH, EXEC will execute commands only if the watched keys
  82. // were not modified, allowing for a check-and-set mechanism.
  83. //
  84. // Exec always returns list of commands. If transaction fails
  85. // TxFailedErr is returned. Otherwise Exec returns error of the first
  86. // failed command or nil.
  87. func (c *Tx) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  88. return c.Pipeline().pipelined(fn)
  89. }