pipeline.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package redis
  2. import (
  3. "errors"
  4. "sync"
  5. "gopkg.in/redis.v5/internal/pool"
  6. )
  7. type pipelineExecer func([]Cmder) error
  8. // Pipeline implements pipelining as described in
  9. // http://redis.io/topics/pipelining. It's safe for concurrent use
  10. // by multiple goroutines.
  11. type Pipeline struct {
  12. cmdable
  13. statefulCmdable
  14. exec pipelineExecer
  15. mu sync.Mutex
  16. cmds []Cmder
  17. closed bool
  18. }
  19. func (c *Pipeline) Process(cmd Cmder) error {
  20. c.mu.Lock()
  21. c.cmds = append(c.cmds, cmd)
  22. c.mu.Unlock()
  23. return nil
  24. }
  25. // Close closes the pipeline, releasing any open resources.
  26. func (c *Pipeline) Close() error {
  27. c.mu.Lock()
  28. c.discard()
  29. c.closed = true
  30. c.mu.Unlock()
  31. return nil
  32. }
  33. // Discard resets the pipeline and discards queued commands.
  34. func (c *Pipeline) Discard() error {
  35. c.mu.Lock()
  36. err := c.discard()
  37. c.mu.Unlock()
  38. return err
  39. }
  40. func (c *Pipeline) discard() error {
  41. if c.closed {
  42. return pool.ErrClosed
  43. }
  44. c.cmds = c.cmds[:0]
  45. return nil
  46. }
  47. // Exec executes all previously queued commands using one
  48. // client-server roundtrip.
  49. //
  50. // Exec always returns list of commands and error of the first failed
  51. // command if any.
  52. func (c *Pipeline) Exec() ([]Cmder, error) {
  53. c.mu.Lock()
  54. defer c.mu.Unlock()
  55. if c.closed {
  56. return nil, pool.ErrClosed
  57. }
  58. if len(c.cmds) == 0 {
  59. return nil, errors.New("redis: pipeline is empty")
  60. }
  61. cmds := c.cmds
  62. c.cmds = nil
  63. return cmds, c.exec(cmds)
  64. }
  65. func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  66. if err := fn(c); err != nil {
  67. return nil, err
  68. }
  69. cmds, err := c.Exec()
  70. _ = c.Close()
  71. return cmds, err
  72. }