simulation_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package bbolt_test
  2. import (
  3. "bytes"
  4. "fmt"
  5. "math/rand"
  6. "sync"
  7. "testing"
  8. bolt "go.etcd.io/bbolt"
  9. )
  10. func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, nil, 1, 1, 1) }
  11. func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, nil, 1, 10, 1) }
  12. func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, nil, 1, 100, 1) }
  13. func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, nil, 1, 1000, 1) }
  14. func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1) }
  15. func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, nil, 1, 10, 10) }
  16. func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, nil, 1, 100, 10) }
  17. func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, nil, 1, 1000, 10) }
  18. func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, nil, 1, 10000, 10) }
  19. func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, nil, 1, 100, 100) }
  20. func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, nil, 1, 1000, 100) }
  21. func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, nil, 1, 10000, 100) }
  22. func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1000) }
  23. // Randomly generate operations on a given database with multiple clients to ensure consistency and thread safety.
  24. func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, parallelism int) {
  25. if testing.Short() {
  26. t.Skip("skipping test in short mode.")
  27. }
  28. rand.Seed(int64(qseed))
  29. // A list of operations that readers and writers can perform.
  30. var readerHandlers = []simulateHandler{simulateGetHandler}
  31. var writerHandlers = []simulateHandler{simulateGetHandler, simulatePutHandler}
  32. var versions = make(map[int]*QuickDB)
  33. versions[1] = NewQuickDB()
  34. db := MustOpenWithOption(openOption)
  35. defer db.MustClose()
  36. var mutex sync.Mutex
  37. // Run n threads in parallel, each with their own operation.
  38. var wg sync.WaitGroup
  39. for n := 0; n < round; n++ {
  40. var threads = make(chan bool, parallelism)
  41. var i int
  42. for {
  43. threads <- true
  44. wg.Add(1)
  45. writable := ((rand.Int() % 100) < 20) // 20% writers
  46. // Choose an operation to execute.
  47. var handler simulateHandler
  48. if writable {
  49. handler = writerHandlers[rand.Intn(len(writerHandlers))]
  50. } else {
  51. handler = readerHandlers[rand.Intn(len(readerHandlers))]
  52. }
  53. // Execute a thread for the given operation.
  54. go func(writable bool, handler simulateHandler) {
  55. defer wg.Done()
  56. // Start transaction.
  57. tx, err := db.Begin(writable)
  58. if err != nil {
  59. t.Fatal("tx begin: ", err)
  60. }
  61. // Obtain current state of the dataset.
  62. mutex.Lock()
  63. var qdb = versions[tx.ID()]
  64. if writable {
  65. qdb = versions[tx.ID()-1].Copy()
  66. }
  67. mutex.Unlock()
  68. // Make sure we commit/rollback the tx at the end and update the state.
  69. if writable {
  70. defer func() {
  71. mutex.Lock()
  72. versions[tx.ID()] = qdb
  73. mutex.Unlock()
  74. if err := tx.Commit(); err != nil {
  75. t.Fatal(err)
  76. }
  77. }()
  78. } else {
  79. defer func() { _ = tx.Rollback() }()
  80. }
  81. // Ignore operation if we don't have data yet.
  82. if qdb == nil {
  83. return
  84. }
  85. // Execute handler.
  86. handler(tx, qdb)
  87. // Release a thread back to the scheduling loop.
  88. <-threads
  89. }(writable, handler)
  90. i++
  91. if i > threadCount {
  92. break
  93. }
  94. }
  95. // Wait until all threads are done.
  96. wg.Wait()
  97. db.MustClose()
  98. db.MustReopen()
  99. }
  100. }
  101. type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)
  102. // Retrieves a key from the database and verifies that it is what is expected.
  103. func simulateGetHandler(tx *bolt.Tx, qdb *QuickDB) {
  104. // Randomly retrieve an existing exist.
  105. keys := qdb.Rand()
  106. if len(keys) == 0 {
  107. return
  108. }
  109. // Retrieve root bucket.
  110. b := tx.Bucket(keys[0])
  111. if b == nil {
  112. panic(fmt.Sprintf("bucket[0] expected: %08x\n", trunc(keys[0], 4)))
  113. }
  114. // Drill into nested buckets.
  115. for _, key := range keys[1 : len(keys)-1] {
  116. b = b.Bucket(key)
  117. if b == nil {
  118. panic(fmt.Sprintf("bucket[n] expected: %v -> %v\n", keys, key))
  119. }
  120. }
  121. // Verify key/value on the final bucket.
  122. expected := qdb.Get(keys)
  123. actual := b.Get(keys[len(keys)-1])
  124. if !bytes.Equal(actual, expected) {
  125. fmt.Println("=== EXPECTED ===")
  126. fmt.Println(expected)
  127. fmt.Println("=== ACTUAL ===")
  128. fmt.Println(actual)
  129. fmt.Println("=== END ===")
  130. panic("value mismatch")
  131. }
  132. }
  133. // Inserts a key into the database.
  134. func simulatePutHandler(tx *bolt.Tx, qdb *QuickDB) {
  135. var err error
  136. keys, value := randKeys(), randValue()
  137. // Retrieve root bucket.
  138. b := tx.Bucket(keys[0])
  139. if b == nil {
  140. b, err = tx.CreateBucket(keys[0])
  141. if err != nil {
  142. panic("create bucket: " + err.Error())
  143. }
  144. }
  145. // Create nested buckets, if necessary.
  146. for _, key := range keys[1 : len(keys)-1] {
  147. child := b.Bucket(key)
  148. if child != nil {
  149. b = child
  150. } else {
  151. b, err = b.CreateBucket(key)
  152. if err != nil {
  153. panic("create bucket: " + err.Error())
  154. }
  155. }
  156. }
  157. // Insert into database.
  158. if err := b.Put(keys[len(keys)-1], value); err != nil {
  159. panic("put: " + err.Error())
  160. }
  161. // Insert into in-memory database.
  162. qdb.Put(keys, value)
  163. }
  164. // QuickDB is an in-memory database that replicates the functionality of the
  165. // Bolt DB type except that it is entirely in-memory. It is meant for testing
  166. // that the Bolt database is consistent.
  167. type QuickDB struct {
  168. sync.RWMutex
  169. m map[string]interface{}
  170. }
  171. // NewQuickDB returns an instance of QuickDB.
  172. func NewQuickDB() *QuickDB {
  173. return &QuickDB{m: make(map[string]interface{})}
  174. }
  175. // Get retrieves the value at a key path.
  176. func (db *QuickDB) Get(keys [][]byte) []byte {
  177. db.RLock()
  178. defer db.RUnlock()
  179. m := db.m
  180. for _, key := range keys[:len(keys)-1] {
  181. value := m[string(key)]
  182. if value == nil {
  183. return nil
  184. }
  185. switch value := value.(type) {
  186. case map[string]interface{}:
  187. m = value
  188. case []byte:
  189. return nil
  190. }
  191. }
  192. // Only return if it's a simple value.
  193. if value, ok := m[string(keys[len(keys)-1])].([]byte); ok {
  194. return value
  195. }
  196. return nil
  197. }
  198. // Put inserts a value into a key path.
  199. func (db *QuickDB) Put(keys [][]byte, value []byte) {
  200. db.Lock()
  201. defer db.Unlock()
  202. // Build buckets all the way down the key path.
  203. m := db.m
  204. for _, key := range keys[:len(keys)-1] {
  205. if _, ok := m[string(key)].([]byte); ok {
  206. return // Keypath intersects with a simple value. Do nothing.
  207. }
  208. if m[string(key)] == nil {
  209. m[string(key)] = make(map[string]interface{})
  210. }
  211. m = m[string(key)].(map[string]interface{})
  212. }
  213. // Insert value into the last key.
  214. m[string(keys[len(keys)-1])] = value
  215. }
  216. // Rand returns a random key path that points to a simple value.
  217. func (db *QuickDB) Rand() [][]byte {
  218. db.RLock()
  219. defer db.RUnlock()
  220. if len(db.m) == 0 {
  221. return nil
  222. }
  223. var keys [][]byte
  224. db.rand(db.m, &keys)
  225. return keys
  226. }
  227. func (db *QuickDB) rand(m map[string]interface{}, keys *[][]byte) {
  228. i, index := 0, rand.Intn(len(m))
  229. for k, v := range m {
  230. if i == index {
  231. *keys = append(*keys, []byte(k))
  232. if v, ok := v.(map[string]interface{}); ok {
  233. db.rand(v, keys)
  234. }
  235. return
  236. }
  237. i++
  238. }
  239. panic("quickdb rand: out-of-range")
  240. }
  241. // Copy copies the entire database.
  242. func (db *QuickDB) Copy() *QuickDB {
  243. db.RLock()
  244. defer db.RUnlock()
  245. return &QuickDB{m: db.copy(db.m)}
  246. }
  247. func (db *QuickDB) copy(m map[string]interface{}) map[string]interface{} {
  248. clone := make(map[string]interface{}, len(m))
  249. for k, v := range m {
  250. switch v := v.(type) {
  251. case map[string]interface{}:
  252. clone[k] = db.copy(v)
  253. default:
  254. clone[k] = v
  255. }
  256. }
  257. return clone
  258. }
  259. func randKey() []byte {
  260. var min, max = 1, 1024
  261. n := rand.Intn(max-min) + min
  262. b := make([]byte, n)
  263. for i := 0; i < n; i++ {
  264. b[i] = byte(rand.Intn(255))
  265. }
  266. return b
  267. }
  268. func randKeys() [][]byte {
  269. var keys [][]byte
  270. var count = rand.Intn(2) + 2
  271. for i := 0; i < count; i++ {
  272. keys = append(keys, randKey())
  273. }
  274. return keys
  275. }
  276. func randValue() []byte {
  277. n := rand.Intn(8192)
  278. b := make([]byte, n)
  279. for i := 0; i < n; i++ {
  280. b[i] = byte(rand.Intn(255))
  281. }
  282. return b
  283. }