tx_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package redis_test
  2. import (
  3. "strconv"
  4. "sync"
  5. . "github.com/onsi/ginkgo"
  6. . "github.com/onsi/gomega"
  7. "gopkg.in/redis.v5"
  8. )
  9. var _ = Describe("Tx", func() {
  10. var client *redis.Client
  11. BeforeEach(func() {
  12. client = redis.NewClient(redisOptions())
  13. Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
  14. })
  15. AfterEach(func() {
  16. Expect(client.Close()).NotTo(HaveOccurred())
  17. })
  18. It("should Watch", func() {
  19. var incr func(string) error
  20. // Transactionally increments key using GET and SET commands.
  21. incr = func(key string) error {
  22. err := client.Watch(func(tx *redis.Tx) error {
  23. n, err := tx.Get(key).Int64()
  24. if err != nil && err != redis.Nil {
  25. return err
  26. }
  27. _, err = tx.Pipelined(func(pipe *redis.Pipeline) error {
  28. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  29. return nil
  30. })
  31. return err
  32. }, key)
  33. if err == redis.TxFailedErr {
  34. return incr(key)
  35. }
  36. return err
  37. }
  38. var wg sync.WaitGroup
  39. for i := 0; i < 100; i++ {
  40. wg.Add(1)
  41. go func() {
  42. defer GinkgoRecover()
  43. defer wg.Done()
  44. err := incr("key")
  45. Expect(err).NotTo(HaveOccurred())
  46. }()
  47. }
  48. wg.Wait()
  49. n, err := client.Get("key").Int64()
  50. Expect(err).NotTo(HaveOccurred())
  51. Expect(n).To(Equal(int64(100)))
  52. })
  53. It("should discard", func() {
  54. err := client.Watch(func(tx *redis.Tx) error {
  55. cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
  56. pipe.Set("key1", "hello1", 0)
  57. pipe.Discard()
  58. pipe.Set("key2", "hello2", 0)
  59. return nil
  60. })
  61. Expect(err).NotTo(HaveOccurred())
  62. Expect(cmds).To(HaveLen(1))
  63. return err
  64. }, "key1", "key2")
  65. Expect(err).NotTo(HaveOccurred())
  66. get := client.Get("key1")
  67. Expect(get.Err()).To(Equal(redis.Nil))
  68. Expect(get.Val()).To(Equal(""))
  69. get = client.Get("key2")
  70. Expect(get.Err()).NotTo(HaveOccurred())
  71. Expect(get.Val()).To(Equal("hello2"))
  72. })
  73. It("returns an error when there are no commands", func() {
  74. err := client.Watch(func(tx *redis.Tx) error {
  75. _, err := tx.Pipelined(func(*redis.Pipeline) error { return nil })
  76. return err
  77. })
  78. Expect(err).To(MatchError("redis: pipeline is empty"))
  79. v, err := client.Ping().Result()
  80. Expect(err).NotTo(HaveOccurred())
  81. Expect(v).To(Equal("PONG"))
  82. })
  83. It("should exec bulks", func() {
  84. const N = 20000
  85. err := client.Watch(func(tx *redis.Tx) error {
  86. cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
  87. for i := 0; i < N; i++ {
  88. pipe.Incr("key")
  89. }
  90. return nil
  91. })
  92. Expect(err).NotTo(HaveOccurred())
  93. Expect(len(cmds)).To(Equal(N))
  94. for _, cmd := range cmds {
  95. Expect(cmd.Err()).NotTo(HaveOccurred())
  96. }
  97. return err
  98. })
  99. Expect(err).NotTo(HaveOccurred())
  100. num, err := client.Get("key").Int64()
  101. Expect(err).NotTo(HaveOccurred())
  102. Expect(num).To(Equal(int64(N)))
  103. })
  104. It("should recover from bad connection", func() {
  105. // Put bad connection in the pool.
  106. cn, _, err := client.Pool().Get()
  107. Expect(err).NotTo(HaveOccurred())
  108. cn.SetNetConn(&badConn{})
  109. err = client.Pool().Put(cn)
  110. Expect(err).NotTo(HaveOccurred())
  111. do := func() error {
  112. err := client.Watch(func(tx *redis.Tx) error {
  113. _, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
  114. pipe.Ping()
  115. return nil
  116. })
  117. return err
  118. })
  119. return err
  120. }
  121. err = do()
  122. Expect(err).To(MatchError("bad connection"))
  123. err = do()
  124. Expect(err).NotTo(HaveOccurred())
  125. })
  126. })