server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo
  27. import (
  28. "errors"
  29. "net"
  30. "sort"
  31. "sync"
  32. "time"
  33. "gopkg.in/mgo.v2/bson"
  34. )
  35. // ---------------------------------------------------------------------------
  36. // Mongo server encapsulation.
  37. type mongoServer struct {
  38. sync.RWMutex
  39. Addr string
  40. ResolvedAddr string
  41. tcpaddr *net.TCPAddr
  42. unusedSockets []*mongoSocket
  43. liveSockets []*mongoSocket
  44. closed bool
  45. abended bool
  46. sync chan bool
  47. dial dialer
  48. pingValue time.Duration
  49. pingIndex int
  50. pingCount uint32
  51. pingWindow [6]time.Duration
  52. info *mongoServerInfo
  53. }
  54. type dialer struct {
  55. old func(addr net.Addr) (net.Conn, error)
  56. new func(addr *ServerAddr) (net.Conn, error)
  57. }
  58. func (dial dialer) isSet() bool {
  59. return dial.old != nil || dial.new != nil
  60. }
  61. type mongoServerInfo struct {
  62. Master bool
  63. Mongos bool
  64. Tags bson.D
  65. MaxWireVersion int
  66. SetName string
  67. }
  68. var defaultServerInfo mongoServerInfo
  69. func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
  70. server := &mongoServer{
  71. Addr: addr,
  72. ResolvedAddr: tcpaddr.String(),
  73. tcpaddr: tcpaddr,
  74. sync: sync,
  75. dial: dial,
  76. info: &defaultServerInfo,
  77. pingValue: time.Hour, // Push it back before an actual ping.
  78. }
  79. go server.pinger(true)
  80. return server
  81. }
  82. var errPoolLimit = errors.New("per-server connection limit reached")
  83. var errServerClosed = errors.New("server was closed")
  84. // AcquireSocket returns a socket for communicating with the server.
  85. // This will attempt to reuse an old connection, if one is available. Otherwise,
  86. // it will establish a new one. The returned socket is owned by the call site,
  87. // and will return to the cache when the socket has its Release method called
  88. // the same number of times as AcquireSocket + Acquire were called for it.
  89. // If the poolLimit argument is greater than zero and the number of sockets in
  90. // use in this server is greater than the provided limit, errPoolLimit is
  91. // returned.
  92. func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
  93. for {
  94. server.Lock()
  95. abended = server.abended
  96. if server.closed {
  97. server.Unlock()
  98. return nil, abended, errServerClosed
  99. }
  100. n := len(server.unusedSockets)
  101. if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
  102. server.Unlock()
  103. return nil, false, errPoolLimit
  104. }
  105. if n > 0 {
  106. socket = server.unusedSockets[n-1]
  107. server.unusedSockets[n-1] = nil // Help GC.
  108. server.unusedSockets = server.unusedSockets[:n-1]
  109. info := server.info
  110. server.Unlock()
  111. err = socket.InitialAcquire(info, timeout)
  112. if err != nil {
  113. continue
  114. }
  115. } else {
  116. server.Unlock()
  117. socket, err = server.Connect(timeout)
  118. if err == nil {
  119. server.Lock()
  120. // We've waited for the Connect, see if we got
  121. // closed in the meantime
  122. if server.closed {
  123. server.Unlock()
  124. socket.Release()
  125. socket.Close()
  126. return nil, abended, errServerClosed
  127. }
  128. server.liveSockets = append(server.liveSockets, socket)
  129. server.Unlock()
  130. }
  131. }
  132. return
  133. }
  134. panic("unreachable")
  135. }
  136. // Connect establishes a new connection to the server. This should
  137. // generally be done through server.AcquireSocket().
  138. func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) {
  139. server.RLock()
  140. master := server.info.Master
  141. dial := server.dial
  142. server.RUnlock()
  143. logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout)
  144. var conn net.Conn
  145. var err error
  146. switch {
  147. case !dial.isSet():
  148. // Cannot do this because it lacks timeout support. :-(
  149. //conn, err = net.DialTCP("tcp", nil, server.tcpaddr)
  150. conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout)
  151. if tcpconn, ok := conn.(*net.TCPConn); ok {
  152. tcpconn.SetKeepAlive(true)
  153. } else if err == nil {
  154. panic("internal error: obtained TCP connection is not a *net.TCPConn!?")
  155. }
  156. case dial.old != nil:
  157. conn, err = dial.old(server.tcpaddr)
  158. case dial.new != nil:
  159. conn, err = dial.new(&ServerAddr{server.Addr, server.tcpaddr})
  160. default:
  161. panic("dialer is set, but both dial.old and dial.new are nil")
  162. }
  163. if err != nil {
  164. logf("Connection to %s failed: %v", server.Addr, err.Error())
  165. return nil, err
  166. }
  167. logf("Connection to %s established.", server.Addr)
  168. stats.conn(+1, master)
  169. return newSocket(server, conn, timeout), nil
  170. }
  171. // Close forces closing all sockets that are alive, whether
  172. // they're currently in use or not.
  173. func (server *mongoServer) Close() {
  174. server.Lock()
  175. server.closed = true
  176. liveSockets := server.liveSockets
  177. unusedSockets := server.unusedSockets
  178. server.liveSockets = nil
  179. server.unusedSockets = nil
  180. server.Unlock()
  181. logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
  182. for i, s := range liveSockets {
  183. s.Close()
  184. liveSockets[i] = nil
  185. }
  186. for i := range unusedSockets {
  187. unusedSockets[i] = nil
  188. }
  189. }
  190. // RecycleSocket puts socket back into the unused cache.
  191. func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
  192. server.Lock()
  193. if !server.closed {
  194. server.unusedSockets = append(server.unusedSockets, socket)
  195. }
  196. server.Unlock()
  197. }
  198. func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket {
  199. for i, s := range sockets {
  200. if s == socket {
  201. copy(sockets[i:], sockets[i+1:])
  202. n := len(sockets) - 1
  203. sockets[n] = nil
  204. sockets = sockets[:n]
  205. break
  206. }
  207. }
  208. return sockets
  209. }
  210. // AbendSocket notifies the server that the given socket has terminated
  211. // abnormally, and thus should be discarded rather than cached.
  212. func (server *mongoServer) AbendSocket(socket *mongoSocket) {
  213. server.Lock()
  214. server.abended = true
  215. if server.closed {
  216. server.Unlock()
  217. return
  218. }
  219. server.liveSockets = removeSocket(server.liveSockets, socket)
  220. server.unusedSockets = removeSocket(server.unusedSockets, socket)
  221. server.Unlock()
  222. // Maybe just a timeout, but suggest a cluster sync up just in case.
  223. select {
  224. case server.sync <- true:
  225. default:
  226. }
  227. }
  228. func (server *mongoServer) SetInfo(info *mongoServerInfo) {
  229. server.Lock()
  230. server.info = info
  231. server.Unlock()
  232. }
  233. func (server *mongoServer) Info() *mongoServerInfo {
  234. server.Lock()
  235. info := server.info
  236. server.Unlock()
  237. return info
  238. }
  239. func (server *mongoServer) hasTags(serverTags []bson.D) bool {
  240. NextTagSet:
  241. for _, tags := range serverTags {
  242. NextReqTag:
  243. for _, req := range tags {
  244. for _, has := range server.info.Tags {
  245. if req.Name == has.Name {
  246. if req.Value == has.Value {
  247. continue NextReqTag
  248. }
  249. continue NextTagSet
  250. }
  251. }
  252. continue NextTagSet
  253. }
  254. return true
  255. }
  256. return false
  257. }
  258. var pingDelay = 15 * time.Second
  259. func (server *mongoServer) pinger(loop bool) {
  260. var delay time.Duration
  261. if raceDetector {
  262. // This variable is only ever touched by tests.
  263. globalMutex.Lock()
  264. delay = pingDelay
  265. globalMutex.Unlock()
  266. } else {
  267. delay = pingDelay
  268. }
  269. op := queryOp{
  270. collection: "admin.$cmd",
  271. query: bson.D{{"ping", 1}},
  272. flags: flagSlaveOk,
  273. limit: -1,
  274. }
  275. for {
  276. if loop {
  277. time.Sleep(delay)
  278. }
  279. op := op
  280. socket, _, err := server.AcquireSocket(0, delay)
  281. if err == nil {
  282. start := time.Now()
  283. _, _ = socket.SimpleQuery(&op)
  284. delay := time.Now().Sub(start)
  285. server.pingWindow[server.pingIndex] = delay
  286. server.pingIndex = (server.pingIndex + 1) % len(server.pingWindow)
  287. server.pingCount++
  288. var max time.Duration
  289. for i := 0; i < len(server.pingWindow) && uint32(i) < server.pingCount; i++ {
  290. if server.pingWindow[i] > max {
  291. max = server.pingWindow[i]
  292. }
  293. }
  294. socket.Release()
  295. server.Lock()
  296. if server.closed {
  297. loop = false
  298. }
  299. server.pingValue = max
  300. server.Unlock()
  301. logf("Ping for %s is %d ms", server.Addr, max/time.Millisecond)
  302. } else if err == errServerClosed {
  303. return
  304. }
  305. if !loop {
  306. return
  307. }
  308. }
  309. }
  310. type mongoServerSlice []*mongoServer
  311. func (s mongoServerSlice) Len() int {
  312. return len(s)
  313. }
  314. func (s mongoServerSlice) Less(i, j int) bool {
  315. return s[i].ResolvedAddr < s[j].ResolvedAddr
  316. }
  317. func (s mongoServerSlice) Swap(i, j int) {
  318. s[i], s[j] = s[j], s[i]
  319. }
  320. func (s mongoServerSlice) Sort() {
  321. sort.Sort(s)
  322. }
  323. func (s mongoServerSlice) Search(resolvedAddr string) (i int, ok bool) {
  324. n := len(s)
  325. i = sort.Search(n, func(i int) bool {
  326. return s[i].ResolvedAddr >= resolvedAddr
  327. })
  328. return i, i != n && s[i].ResolvedAddr == resolvedAddr
  329. }
  330. type mongoServers struct {
  331. slice mongoServerSlice
  332. }
  333. func (servers *mongoServers) Search(resolvedAddr string) (server *mongoServer) {
  334. if i, ok := servers.slice.Search(resolvedAddr); ok {
  335. return servers.slice[i]
  336. }
  337. return nil
  338. }
  339. func (servers *mongoServers) Add(server *mongoServer) {
  340. servers.slice = append(servers.slice, server)
  341. servers.slice.Sort()
  342. }
  343. func (servers *mongoServers) Remove(other *mongoServer) (server *mongoServer) {
  344. if i, found := servers.slice.Search(other.ResolvedAddr); found {
  345. server = servers.slice[i]
  346. copy(servers.slice[i:], servers.slice[i+1:])
  347. n := len(servers.slice) - 1
  348. servers.slice[n] = nil // Help GC.
  349. servers.slice = servers.slice[:n]
  350. }
  351. return
  352. }
  353. func (servers *mongoServers) Slice() []*mongoServer {
  354. return ([]*mongoServer)(servers.slice)
  355. }
  356. func (servers *mongoServers) Get(i int) *mongoServer {
  357. return servers.slice[i]
  358. }
  359. func (servers *mongoServers) Len() int {
  360. return len(servers.slice)
  361. }
  362. func (servers *mongoServers) Empty() bool {
  363. return len(servers.slice) == 0
  364. }
  365. func (servers *mongoServers) HasMongos() bool {
  366. for _, s := range servers.slice {
  367. if s.Info().Mongos {
  368. return true
  369. }
  370. }
  371. return false
  372. }
  373. // BestFit returns the best guess of what would be the most interesting
  374. // server to perform operations on at this point in time.
  375. func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServer {
  376. var best *mongoServer
  377. for _, next := range servers.slice {
  378. if best == nil {
  379. best = next
  380. best.RLock()
  381. if serverTags != nil && !next.info.Mongos && !best.hasTags(serverTags) {
  382. best.RUnlock()
  383. best = nil
  384. }
  385. continue
  386. }
  387. next.RLock()
  388. swap := false
  389. switch {
  390. case serverTags != nil && !next.info.Mongos && !next.hasTags(serverTags):
  391. // Must have requested tags.
  392. case mode == Secondary && next.info.Master && !next.info.Mongos:
  393. // Must be a secondary or mongos.
  394. case next.info.Master != best.info.Master && mode != Nearest:
  395. // Prefer slaves, unless the mode is PrimaryPreferred.
  396. swap = (mode == PrimaryPreferred) != best.info.Master
  397. case absDuration(next.pingValue-best.pingValue) > 15*time.Millisecond:
  398. // Prefer nearest server.
  399. swap = next.pingValue < best.pingValue
  400. case len(next.liveSockets)-len(next.unusedSockets) < len(best.liveSockets)-len(best.unusedSockets):
  401. // Prefer servers with less connections.
  402. swap = true
  403. }
  404. if swap {
  405. best.RUnlock()
  406. best = next
  407. } else {
  408. next.RUnlock()
  409. }
  410. }
  411. if best != nil {
  412. best.RUnlock()
  413. }
  414. return best
  415. }
  416. func absDuration(d time.Duration) time.Duration {
  417. if d < 0 {
  418. return -d
  419. }
  420. return d
  421. }