client.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/json"
  9. "fmt"
  10. "log"
  11. "net/http"
  12. "net/http/httputil"
  13. "net/url"
  14. "os"
  15. "regexp"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/pkg/errors"
  20. "gopkg.in/olivere/elastic.v5/config"
  21. )
  22. const (
  23. // Version is the current version of Elastic.
  24. Version = "5.0.85"
  25. // DefaultURL is the default endpoint of Elasticsearch on the local machine.
  26. // It is used e.g. when initializing a new Client without a specific URL.
  27. DefaultURL = "http://127.0.0.1:9200"
  28. // DefaultScheme is the default protocol scheme to use when sniffing
  29. // the Elasticsearch cluster.
  30. DefaultScheme = "http"
  31. // DefaultHealthcheckEnabled specifies if healthchecks are enabled by default.
  32. DefaultHealthcheckEnabled = true
  33. // DefaultHealthcheckTimeoutStartup is the time the healthcheck waits
  34. // for a response from Elasticsearch on startup, i.e. when creating a
  35. // client. After the client is started, a shorter timeout is commonly used
  36. // (its default is specified in DefaultHealthcheckTimeout).
  37. DefaultHealthcheckTimeoutStartup = 5 * time.Second
  38. // DefaultHealthcheckTimeout specifies the time a running client waits for
  39. // a response from Elasticsearch. Notice that the healthcheck timeout
  40. // when a client is created is larger by default (see DefaultHealthcheckTimeoutStartup).
  41. DefaultHealthcheckTimeout = 1 * time.Second
  42. // DefaultHealthcheckInterval is the default interval between
  43. // two health checks of the nodes in the cluster.
  44. DefaultHealthcheckInterval = 60 * time.Second
  45. // DefaultSnifferEnabled specifies if the sniffer is enabled by default.
  46. DefaultSnifferEnabled = true
  47. // DefaultSnifferInterval is the interval between two sniffing procedures,
  48. // i.e. the lookup of all nodes in the cluster and their addition/removal
  49. // from the list of actual connections.
  50. DefaultSnifferInterval = 15 * time.Minute
  51. // DefaultSnifferTimeoutStartup is the default timeout for the sniffing
  52. // process that is initiated while creating a new client. For subsequent
  53. // sniffing processes, DefaultSnifferTimeout is used (by default).
  54. DefaultSnifferTimeoutStartup = 5 * time.Second
  55. // DefaultSnifferTimeout is the default timeout after which the
  56. // sniffing process times out. Notice that for the initial sniffing
  57. // process, DefaultSnifferTimeoutStartup is used.
  58. DefaultSnifferTimeout = 2 * time.Second
  59. // DefaultSendGetBodyAs is the HTTP method to use when elastic is sending
  60. // a GET request with a body.
  61. DefaultSendGetBodyAs = "GET"
  62. // DefaultGzipEnabled specifies if gzip compression is enabled by default.
  63. DefaultGzipEnabled = false
  64. // off is used to disable timeouts.
  65. off = -1 * time.Second
  66. )
  67. var (
  68. // ErrNoClient is raised when no Elasticsearch node is available.
  69. ErrNoClient = errors.New("no Elasticsearch node available")
  70. // ErrRetry is raised when a request cannot be executed after the configured
  71. // number of retries.
  72. ErrRetry = errors.New("cannot connect after several retries")
  73. // ErrTimeout is raised when a request timed out, e.g. when WaitForStatus
  74. // didn't return in time.
  75. ErrTimeout = errors.New("timeout")
  76. // noRetries is a retrier that does not retry.
  77. noRetries = NewStopRetrier()
  78. // noDeprecationLog is a no-op for logging deprecations.
  79. noDeprecationLog = func(*http.Request, *http.Response) {}
  80. )
  81. // ClientOptionFunc is a function that configures a Client.
  82. // It is used in NewClient.
  83. type ClientOptionFunc func(*Client) error
  84. // Client is an Elasticsearch client. Create one by calling NewClient.
  85. type Client struct {
  86. c *http.Client // net/http Client to use for requests
  87. connsMu sync.RWMutex // connsMu guards the next block
  88. conns []*conn // all connections
  89. cindex int // index into conns
  90. mu sync.RWMutex // guards the next block
  91. urls []string // set of URLs passed initially to the client
  92. running bool // true if the client's background processes are running
  93. errorlog Logger // error log for critical messages
  94. infolog Logger // information log for e.g. response times
  95. tracelog Logger // trace log for debugging
  96. deprecationlog func(*http.Request, *http.Response)
  97. scheme string // http or https
  98. healthcheckEnabled bool // healthchecks enabled or disabled
  99. healthcheckTimeoutStartup time.Duration // time the healthcheck waits for a response from Elasticsearch on startup
  100. healthcheckTimeout time.Duration // time the healthcheck waits for a response from Elasticsearch
  101. healthcheckInterval time.Duration // interval between healthchecks
  102. healthcheckStop chan bool // notify healthchecker to stop, and notify back
  103. snifferEnabled bool // sniffer enabled or disabled
  104. snifferTimeoutStartup time.Duration // time the sniffer waits for a response from nodes info API on startup
  105. snifferTimeout time.Duration // time the sniffer waits for a response from nodes info API
  106. snifferInterval time.Duration // interval between sniffing
  107. snifferCallback SnifferCallback // callback to modify the sniffing decision
  108. snifferStop chan bool // notify sniffer to stop, and notify back
  109. decoder Decoder // used to decode data sent from Elasticsearch
  110. basicAuth bool // indicates whether to send HTTP Basic Auth credentials
  111. basicAuthUsername string // username for HTTP Basic Auth
  112. basicAuthPassword string // password for HTTP Basic Auth
  113. sendGetBodyAs string // override for when sending a GET with a body
  114. requiredPlugins []string // list of required plugins
  115. gzipEnabled bool // gzip compression enabled or disabled (default)
  116. retrier Retrier // strategy for retries
  117. headers http.Header // a list of default headers to add to each request
  118. }
  119. // NewClient creates a new client to work with Elasticsearch.
  120. //
  121. // NewClient, by default, is meant to be long-lived and shared across
  122. // your application. If you need a short-lived client, e.g. for request-scope,
  123. // consider using NewSimpleClient instead.
  124. //
  125. // The caller can configure the new client by passing configuration options
  126. // to the func.
  127. //
  128. // Example:
  129. //
  130. // client, err := elastic.NewClient(
  131. // elastic.SetURL("http://127.0.0.1:9200", "http://127.0.0.1:9201"),
  132. // elastic.SetBasicAuth("user", "secret"))
  133. //
  134. // If no URL is configured, Elastic uses DefaultURL by default.
  135. //
  136. // If the sniffer is enabled (the default), the new client then sniffes
  137. // the cluster via the Nodes Info API
  138. // (see https://www.elastic.co/guide/en/elasticsearch/reference/5.2/cluster-nodes-info.html#cluster-nodes-info).
  139. // It uses the URLs specified by the caller. The caller is responsible
  140. // to only pass a list of URLs of nodes that belong to the same cluster.
  141. // This sniffing process is run on startup and periodically.
  142. // Use SnifferInterval to set the interval between two sniffs (default is
  143. // 15 minutes). In other words: By default, the client will find new nodes
  144. // in the cluster and remove those that are no longer available every
  145. // 15 minutes. Disable the sniffer by passing SetSniff(false) to NewClient.
  146. //
  147. // The list of nodes found in the sniffing process will be used to make
  148. // connections to the REST API of Elasticsearch. These nodes are also
  149. // periodically checked in a shorter time frame. This process is called
  150. // a health check. By default, a health check is done every 60 seconds.
  151. // You can set a shorter or longer interval by SetHealthcheckInterval.
  152. // Disabling health checks is not recommended, but can be done by
  153. // SetHealthcheck(false).
  154. //
  155. // Connections are automatically marked as dead or healthy while
  156. // making requests to Elasticsearch. When a request fails, Elastic will
  157. // call into the Retry strategy which can be specified with SetRetry.
  158. // The Retry strategy is also responsible for handling backoff i.e. the time
  159. // to wait before starting the next request. There are various standard
  160. // backoff implementations, e.g. ExponentialBackoff or SimpleBackoff.
  161. // Retries are disabled by default.
  162. //
  163. // If no HttpClient is configured, then http.DefaultClient is used.
  164. // You can use your own http.Client with some http.Transport for
  165. // advanced scenarios.
  166. //
  167. // An error is also returned when some configuration option is invalid or
  168. // the new client cannot sniff the cluster (if enabled).
  169. func NewClient(options ...ClientOptionFunc) (*Client, error) {
  170. // Set up the client
  171. c := &Client{
  172. c: http.DefaultClient,
  173. conns: make([]*conn, 0),
  174. cindex: -1,
  175. scheme: DefaultScheme,
  176. decoder: &DefaultDecoder{},
  177. healthcheckEnabled: DefaultHealthcheckEnabled,
  178. healthcheckTimeoutStartup: DefaultHealthcheckTimeoutStartup,
  179. healthcheckTimeout: DefaultHealthcheckTimeout,
  180. healthcheckInterval: DefaultHealthcheckInterval,
  181. healthcheckStop: make(chan bool),
  182. snifferEnabled: DefaultSnifferEnabled,
  183. snifferTimeoutStartup: DefaultSnifferTimeoutStartup,
  184. snifferTimeout: DefaultSnifferTimeout,
  185. snifferInterval: DefaultSnifferInterval,
  186. snifferCallback: nopSnifferCallback,
  187. snifferStop: make(chan bool),
  188. sendGetBodyAs: DefaultSendGetBodyAs,
  189. gzipEnabled: DefaultGzipEnabled,
  190. retrier: noRetries, // no retries by default
  191. deprecationlog: noDeprecationLog,
  192. }
  193. // Run the options on it
  194. for _, option := range options {
  195. if err := option(c); err != nil {
  196. return nil, err
  197. }
  198. }
  199. // Use a default URL and normalize them
  200. if len(c.urls) == 0 {
  201. c.urls = []string{DefaultURL}
  202. }
  203. c.urls = canonicalize(c.urls...)
  204. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  205. if !c.basicAuth {
  206. for _, urlStr := range c.urls {
  207. u, err := url.Parse(urlStr)
  208. if err == nil && u.User != nil {
  209. c.basicAuth = true
  210. c.basicAuthUsername = u.User.Username()
  211. c.basicAuthPassword, _ = u.User.Password()
  212. break
  213. }
  214. }
  215. }
  216. // Check if we can make a request to any of the specified URLs
  217. if c.healthcheckEnabled {
  218. if err := c.startupHealthcheck(c.healthcheckTimeoutStartup); err != nil {
  219. return nil, err
  220. }
  221. }
  222. if c.snifferEnabled {
  223. // Sniff the cluster initially
  224. if err := c.sniff(c.snifferTimeoutStartup); err != nil {
  225. return nil, err
  226. }
  227. } else {
  228. // Do not sniff the cluster initially. Use the provided URLs instead.
  229. for _, url := range c.urls {
  230. c.conns = append(c.conns, newConn(url, url))
  231. }
  232. }
  233. if c.healthcheckEnabled {
  234. // Perform an initial health check
  235. c.healthcheck(c.healthcheckTimeoutStartup, true)
  236. }
  237. // Ensure that we have at least one connection available
  238. if err := c.mustActiveConn(); err != nil {
  239. return nil, err
  240. }
  241. // Check the required plugins
  242. for _, plugin := range c.requiredPlugins {
  243. found, err := c.HasPlugin(plugin)
  244. if err != nil {
  245. return nil, err
  246. }
  247. if !found {
  248. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  249. }
  250. }
  251. if c.snifferEnabled {
  252. go c.sniffer() // periodically update cluster information
  253. }
  254. if c.healthcheckEnabled {
  255. go c.healthchecker() // start goroutine periodically ping all nodes of the cluster
  256. }
  257. c.mu.Lock()
  258. c.running = true
  259. c.mu.Unlock()
  260. return c, nil
  261. }
  262. // NewClientFromConfig initializes a client from a configuration.
  263. func NewClientFromConfig(cfg *config.Config) (*Client, error) {
  264. var options []ClientOptionFunc
  265. if cfg != nil {
  266. if cfg.URL != "" {
  267. options = append(options, SetURL(cfg.URL))
  268. }
  269. if cfg.Errorlog != "" {
  270. f, err := os.OpenFile(cfg.Errorlog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  271. if err != nil {
  272. return nil, errors.Wrap(err, "unable to initialize error log")
  273. }
  274. l := log.New(f, "", 0)
  275. options = append(options, SetErrorLog(l))
  276. }
  277. if cfg.Tracelog != "" {
  278. f, err := os.OpenFile(cfg.Tracelog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  279. if err != nil {
  280. return nil, errors.Wrap(err, "unable to initialize trace log")
  281. }
  282. l := log.New(f, "", 0)
  283. options = append(options, SetTraceLog(l))
  284. }
  285. if cfg.Infolog != "" {
  286. f, err := os.OpenFile(cfg.Infolog, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  287. if err != nil {
  288. return nil, errors.Wrap(err, "unable to initialize info log")
  289. }
  290. l := log.New(f, "", 0)
  291. options = append(options, SetInfoLog(l))
  292. }
  293. if cfg.Username != "" || cfg.Password != "" {
  294. options = append(options, SetBasicAuth(cfg.Username, cfg.Password))
  295. }
  296. if cfg.Sniff != nil {
  297. options = append(options, SetSniff(*cfg.Sniff))
  298. }
  299. if cfg.Healthcheck != nil {
  300. options = append(options, SetHealthcheck(*cfg.Healthcheck))
  301. }
  302. }
  303. return NewClient(options...)
  304. }
  305. // NewSimpleClient creates a new short-lived Client that can be used in
  306. // use cases where you need e.g. one client per request.
  307. //
  308. // While NewClient by default sets up e.g. periodic health checks
  309. // and sniffing for new nodes in separate goroutines, NewSimpleClient does
  310. // not and is meant as a simple replacement where you don't need all the
  311. // heavy lifting of NewClient.
  312. //
  313. // NewSimpleClient does the following by default: First, all health checks
  314. // are disabled, including timeouts and periodic checks. Second, sniffing
  315. // is disabled, including timeouts and periodic checks. The number of retries
  316. // is set to 1. NewSimpleClient also does not start any goroutines.
  317. //
  318. // Notice that you can still override settings by passing additional options,
  319. // just like with NewClient.
  320. func NewSimpleClient(options ...ClientOptionFunc) (*Client, error) {
  321. c := &Client{
  322. c: http.DefaultClient,
  323. conns: make([]*conn, 0),
  324. cindex: -1,
  325. scheme: DefaultScheme,
  326. decoder: &DefaultDecoder{},
  327. healthcheckEnabled: false,
  328. healthcheckTimeoutStartup: off,
  329. healthcheckTimeout: off,
  330. healthcheckInterval: off,
  331. healthcheckStop: make(chan bool),
  332. snifferEnabled: false,
  333. snifferTimeoutStartup: off,
  334. snifferTimeout: off,
  335. snifferInterval: off,
  336. snifferCallback: nopSnifferCallback,
  337. snifferStop: make(chan bool),
  338. sendGetBodyAs: DefaultSendGetBodyAs,
  339. gzipEnabled: DefaultGzipEnabled,
  340. retrier: noRetries, // no retries by default
  341. deprecationlog: noDeprecationLog,
  342. }
  343. // Run the options on it
  344. for _, option := range options {
  345. if err := option(c); err != nil {
  346. return nil, err
  347. }
  348. }
  349. // Use a default URL and normalize them
  350. if len(c.urls) == 0 {
  351. c.urls = []string{DefaultURL}
  352. }
  353. c.urls = canonicalize(c.urls...)
  354. // If the URLs have auth info, use them here as an alternative to SetBasicAuth
  355. if !c.basicAuth {
  356. for _, urlStr := range c.urls {
  357. u, err := url.Parse(urlStr)
  358. if err == nil && u.User != nil {
  359. c.basicAuth = true
  360. c.basicAuthUsername = u.User.Username()
  361. c.basicAuthPassword, _ = u.User.Password()
  362. break
  363. }
  364. }
  365. }
  366. for _, url := range c.urls {
  367. c.conns = append(c.conns, newConn(url, url))
  368. }
  369. // Ensure that we have at least one connection available
  370. if err := c.mustActiveConn(); err != nil {
  371. return nil, err
  372. }
  373. // Check the required plugins
  374. for _, plugin := range c.requiredPlugins {
  375. found, err := c.HasPlugin(plugin)
  376. if err != nil {
  377. return nil, err
  378. }
  379. if !found {
  380. return nil, fmt.Errorf("elastic: plugin %s not found", plugin)
  381. }
  382. }
  383. c.mu.Lock()
  384. c.running = true
  385. c.mu.Unlock()
  386. return c, nil
  387. }
  388. // SetHttpClient can be used to specify the http.Client to use when making
  389. // HTTP requests to Elasticsearch.
  390. func SetHttpClient(httpClient *http.Client) ClientOptionFunc {
  391. return func(c *Client) error {
  392. if httpClient != nil {
  393. c.c = httpClient
  394. } else {
  395. c.c = http.DefaultClient
  396. }
  397. return nil
  398. }
  399. }
  400. // SetBasicAuth can be used to specify the HTTP Basic Auth credentials to
  401. // use when making HTTP requests to Elasticsearch.
  402. func SetBasicAuth(username, password string) ClientOptionFunc {
  403. return func(c *Client) error {
  404. c.basicAuthUsername = username
  405. c.basicAuthPassword = password
  406. c.basicAuth = c.basicAuthUsername != "" || c.basicAuthPassword != ""
  407. return nil
  408. }
  409. }
  410. // SetURL defines the URL endpoints of the Elasticsearch nodes. Notice that
  411. // when sniffing is enabled, these URLs are used to initially sniff the
  412. // cluster on startup.
  413. func SetURL(urls ...string) ClientOptionFunc {
  414. return func(c *Client) error {
  415. switch len(urls) {
  416. case 0:
  417. c.urls = []string{DefaultURL}
  418. default:
  419. c.urls = urls
  420. }
  421. return nil
  422. }
  423. }
  424. // SetScheme sets the HTTP scheme to look for when sniffing (http or https).
  425. // This is http by default.
  426. func SetScheme(scheme string) ClientOptionFunc {
  427. return func(c *Client) error {
  428. c.scheme = scheme
  429. return nil
  430. }
  431. }
  432. // SetSniff enables or disables the sniffer (enabled by default).
  433. func SetSniff(enabled bool) ClientOptionFunc {
  434. return func(c *Client) error {
  435. c.snifferEnabled = enabled
  436. return nil
  437. }
  438. }
  439. // SetSnifferTimeoutStartup sets the timeout for the sniffer that is used
  440. // when creating a new client. The default is 5 seconds. Notice that the
  441. // timeout being used for subsequent sniffing processes is set with
  442. // SetSnifferTimeout.
  443. func SetSnifferTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  444. return func(c *Client) error {
  445. c.snifferTimeoutStartup = timeout
  446. return nil
  447. }
  448. }
  449. // SetSnifferTimeout sets the timeout for the sniffer that finds the
  450. // nodes in a cluster. The default is 2 seconds. Notice that the timeout
  451. // used when creating a new client on startup is usually greater and can
  452. // be set with SetSnifferTimeoutStartup.
  453. func SetSnifferTimeout(timeout time.Duration) ClientOptionFunc {
  454. return func(c *Client) error {
  455. c.snifferTimeout = timeout
  456. return nil
  457. }
  458. }
  459. // SetSnifferInterval sets the interval between two sniffing processes.
  460. // The default interval is 15 minutes.
  461. func SetSnifferInterval(interval time.Duration) ClientOptionFunc {
  462. return func(c *Client) error {
  463. c.snifferInterval = interval
  464. return nil
  465. }
  466. }
  467. // SnifferCallback defines the protocol for sniffing decisions.
  468. type SnifferCallback func(*NodesInfoNode) bool
  469. // nopSnifferCallback is the default sniffer callback: It accepts
  470. // all nodes the sniffer finds.
  471. var nopSnifferCallback = func(*NodesInfoNode) bool { return true }
  472. // SetSnifferCallback allows the caller to modify sniffer decisions.
  473. // When setting the callback, the given SnifferCallback is called for
  474. // each (healthy) node found during the sniffing process.
  475. // If the callback returns false, the node is ignored: No requests
  476. // are routed to it.
  477. func SetSnifferCallback(f SnifferCallback) ClientOptionFunc {
  478. return func(c *Client) error {
  479. if f != nil {
  480. c.snifferCallback = f
  481. }
  482. return nil
  483. }
  484. }
  485. // SetHealthcheck enables or disables healthchecks (enabled by default).
  486. func SetHealthcheck(enabled bool) ClientOptionFunc {
  487. return func(c *Client) error {
  488. c.healthcheckEnabled = enabled
  489. return nil
  490. }
  491. }
  492. // SetHealthcheckTimeoutStartup sets the timeout for the initial health check.
  493. // The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup).
  494. // Notice that timeouts for subsequent health checks can be modified with
  495. // SetHealthcheckTimeout.
  496. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc {
  497. return func(c *Client) error {
  498. c.healthcheckTimeoutStartup = timeout
  499. return nil
  500. }
  501. }
  502. // SetHealthcheckTimeout sets the timeout for periodic health checks.
  503. // The default timeout is 1 second (see DefaultHealthcheckTimeout).
  504. // Notice that a different (usually larger) timeout is used for the initial
  505. // healthcheck, which is initiated while creating a new client.
  506. // The startup timeout can be modified with SetHealthcheckTimeoutStartup.
  507. func SetHealthcheckTimeout(timeout time.Duration) ClientOptionFunc {
  508. return func(c *Client) error {
  509. c.healthcheckTimeout = timeout
  510. return nil
  511. }
  512. }
  513. // SetHealthcheckInterval sets the interval between two health checks.
  514. // The default interval is 60 seconds.
  515. func SetHealthcheckInterval(interval time.Duration) ClientOptionFunc {
  516. return func(c *Client) error {
  517. c.healthcheckInterval = interval
  518. return nil
  519. }
  520. }
  521. // SetMaxRetries sets the maximum number of retries before giving up when
  522. // performing a HTTP request to Elasticsearch.
  523. //
  524. // Deprecated: Replace with a Retry implementation.
  525. func SetMaxRetries(maxRetries int) ClientOptionFunc {
  526. return func(c *Client) error {
  527. if maxRetries < 0 {
  528. return errors.New("MaxRetries must be greater than or equal to 0")
  529. } else if maxRetries == 0 {
  530. c.retrier = noRetries
  531. } else {
  532. // Create a Retrier that will wait for 100ms (+/- jitter) between requests.
  533. // This resembles the old behavior with maxRetries.
  534. ticks := make([]int, maxRetries)
  535. for i := 0; i < len(ticks); i++ {
  536. ticks[i] = 100
  537. }
  538. backoff := NewSimpleBackoff(ticks...)
  539. c.retrier = NewBackoffRetrier(backoff)
  540. }
  541. return nil
  542. }
  543. }
  544. // SetGzip enables or disables gzip compression (disabled by default).
  545. func SetGzip(enabled bool) ClientOptionFunc {
  546. return func(c *Client) error {
  547. c.gzipEnabled = enabled
  548. return nil
  549. }
  550. }
  551. // SetDecoder sets the Decoder to use when decoding data from Elasticsearch.
  552. // DefaultDecoder is used by default.
  553. func SetDecoder(decoder Decoder) ClientOptionFunc {
  554. return func(c *Client) error {
  555. if decoder != nil {
  556. c.decoder = decoder
  557. } else {
  558. c.decoder = &DefaultDecoder{}
  559. }
  560. return nil
  561. }
  562. }
  563. // SetRequiredPlugins can be used to indicate that some plugins are required
  564. // before a Client will be created.
  565. func SetRequiredPlugins(plugins ...string) ClientOptionFunc {
  566. return func(c *Client) error {
  567. if c.requiredPlugins == nil {
  568. c.requiredPlugins = make([]string, 0)
  569. }
  570. c.requiredPlugins = append(c.requiredPlugins, plugins...)
  571. return nil
  572. }
  573. }
  574. // SetErrorLog sets the logger for critical messages like nodes joining
  575. // or leaving the cluster or failing requests. It is nil by default.
  576. func SetErrorLog(logger Logger) ClientOptionFunc {
  577. return func(c *Client) error {
  578. c.errorlog = logger
  579. return nil
  580. }
  581. }
  582. // SetInfoLog sets the logger for informational messages, e.g. requests
  583. // and their response times. It is nil by default.
  584. func SetInfoLog(logger Logger) ClientOptionFunc {
  585. return func(c *Client) error {
  586. c.infolog = logger
  587. return nil
  588. }
  589. }
  590. // SetTraceLog specifies the log.Logger to use for output of HTTP requests
  591. // and responses which is helpful during debugging. It is nil by default.
  592. func SetTraceLog(logger Logger) ClientOptionFunc {
  593. return func(c *Client) error {
  594. c.tracelog = logger
  595. return nil
  596. }
  597. }
  598. // SetSendGetBodyAs specifies the HTTP method to use when sending a GET request
  599. // with a body. It is GET by default.
  600. func SetSendGetBodyAs(httpMethod string) ClientOptionFunc {
  601. return func(c *Client) error {
  602. c.sendGetBodyAs = httpMethod
  603. return nil
  604. }
  605. }
  606. // SetRetrier specifies the retry strategy that handles errors during
  607. // HTTP request/response with Elasticsearch.
  608. func SetRetrier(retrier Retrier) ClientOptionFunc {
  609. return func(c *Client) error {
  610. if retrier == nil {
  611. retrier = noRetries // no retries by default
  612. }
  613. c.retrier = retrier
  614. return nil
  615. }
  616. }
  617. // SetHeaders adds a list of default HTTP headers that will be added to
  618. // each requests executed by PerformRequest.
  619. func SetHeaders(headers http.Header) ClientOptionFunc {
  620. return func(c *Client) error {
  621. c.headers = headers
  622. return nil
  623. }
  624. }
  625. // String returns a string representation of the client status.
  626. func (c *Client) String() string {
  627. c.connsMu.Lock()
  628. conns := c.conns
  629. c.connsMu.Unlock()
  630. var buf bytes.Buffer
  631. for i, conn := range conns {
  632. if i > 0 {
  633. buf.WriteString(", ")
  634. }
  635. buf.WriteString(conn.String())
  636. }
  637. return buf.String()
  638. }
  639. // IsRunning returns true if the background processes of the client are
  640. // running, false otherwise.
  641. func (c *Client) IsRunning() bool {
  642. c.mu.RLock()
  643. defer c.mu.RUnlock()
  644. return c.running
  645. }
  646. // Start starts the background processes like sniffing the cluster and
  647. // periodic health checks. You don't need to run Start when creating a
  648. // client with NewClient; the background processes are run by default.
  649. //
  650. // If the background processes are already running, this is a no-op.
  651. func (c *Client) Start() {
  652. c.mu.RLock()
  653. if c.running {
  654. c.mu.RUnlock()
  655. return
  656. }
  657. c.mu.RUnlock()
  658. if c.snifferEnabled {
  659. go c.sniffer()
  660. }
  661. if c.healthcheckEnabled {
  662. go c.healthchecker()
  663. }
  664. c.mu.Lock()
  665. c.running = true
  666. c.mu.Unlock()
  667. c.infof("elastic: client started")
  668. }
  669. // Stop stops the background processes that the client is running,
  670. // i.e. sniffing the cluster periodically and running health checks
  671. // on the nodes.
  672. //
  673. // If the background processes are not running, this is a no-op.
  674. func (c *Client) Stop() {
  675. c.mu.RLock()
  676. if !c.running {
  677. c.mu.RUnlock()
  678. return
  679. }
  680. c.mu.RUnlock()
  681. if c.healthcheckEnabled {
  682. c.healthcheckStop <- true
  683. <-c.healthcheckStop
  684. }
  685. if c.snifferEnabled {
  686. c.snifferStop <- true
  687. <-c.snifferStop
  688. }
  689. c.mu.Lock()
  690. c.running = false
  691. c.mu.Unlock()
  692. c.infof("elastic: client stopped")
  693. }
  694. // errorf logs to the error log.
  695. func (c *Client) errorf(format string, args ...interface{}) {
  696. if c.errorlog != nil {
  697. c.errorlog.Printf(format, args...)
  698. }
  699. }
  700. // infof logs informational messages.
  701. func (c *Client) infof(format string, args ...interface{}) {
  702. if c.infolog != nil {
  703. c.infolog.Printf(format, args...)
  704. }
  705. }
  706. // tracef logs to the trace log.
  707. func (c *Client) tracef(format string, args ...interface{}) {
  708. if c.tracelog != nil {
  709. c.tracelog.Printf(format, args...)
  710. }
  711. }
  712. // dumpRequest dumps the given HTTP request to the trace log.
  713. func (c *Client) dumpRequest(r *http.Request) {
  714. if c.tracelog != nil {
  715. out, err := httputil.DumpRequestOut(r, true)
  716. if err == nil {
  717. c.tracef("%s\n", string(out))
  718. }
  719. }
  720. }
  721. // dumpResponse dumps the given HTTP response to the trace log.
  722. func (c *Client) dumpResponse(resp *http.Response) {
  723. if c.tracelog != nil {
  724. out, err := httputil.DumpResponse(resp, true)
  725. if err == nil {
  726. c.tracef("%s\n", string(out))
  727. }
  728. }
  729. }
  730. // sniffer periodically runs sniff.
  731. func (c *Client) sniffer() {
  732. c.mu.RLock()
  733. timeout := c.snifferTimeout
  734. interval := c.snifferInterval
  735. c.mu.RUnlock()
  736. ticker := time.NewTicker(interval)
  737. defer ticker.Stop()
  738. for {
  739. select {
  740. case <-c.snifferStop:
  741. // we are asked to stop, so we signal back that we're stopping now
  742. c.snifferStop <- true
  743. return
  744. case <-ticker.C:
  745. c.sniff(timeout)
  746. }
  747. }
  748. }
  749. // sniff uses the Node Info API to return the list of nodes in the cluster.
  750. // It uses the list of URLs passed on startup plus the list of URLs found
  751. // by the preceding sniffing process (if sniffing is enabled).
  752. //
  753. // If sniffing is disabled, this is a no-op.
  754. func (c *Client) sniff(timeout time.Duration) error {
  755. c.mu.RLock()
  756. if !c.snifferEnabled {
  757. c.mu.RUnlock()
  758. return nil
  759. }
  760. // Use all available URLs provided to sniff the cluster.
  761. var urls []string
  762. urlsMap := make(map[string]bool)
  763. // Add all URLs provided on startup
  764. for _, url := range c.urls {
  765. urlsMap[url] = true
  766. urls = append(urls, url)
  767. }
  768. c.mu.RUnlock()
  769. // Add all URLs found by sniffing
  770. c.connsMu.RLock()
  771. for _, conn := range c.conns {
  772. if !conn.IsDead() {
  773. url := conn.URL()
  774. if _, found := urlsMap[url]; !found {
  775. urls = append(urls, url)
  776. }
  777. }
  778. }
  779. c.connsMu.RUnlock()
  780. if len(urls) == 0 {
  781. return errors.Wrap(ErrNoClient, "no URLs found")
  782. }
  783. // Start sniffing on all found URLs
  784. ch := make(chan []*conn, len(urls))
  785. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  786. defer cancel()
  787. for _, url := range urls {
  788. go func(url string) { ch <- c.sniffNode(ctx, url) }(url)
  789. }
  790. // Wait for the results to come back, or the process times out.
  791. for {
  792. select {
  793. case conns := <-ch:
  794. if len(conns) > 0 {
  795. c.updateConns(conns)
  796. return nil
  797. }
  798. case <-ctx.Done():
  799. // We get here if no cluster responds in time
  800. return errors.Wrap(ErrNoClient, "sniff timeout")
  801. }
  802. }
  803. }
  804. // sniffNode sniffs a single node. This method is run as a goroutine
  805. // in sniff. If successful, it returns the list of node URLs extracted
  806. // from the result of calling Nodes Info API. Otherwise, an empty array
  807. // is returned.
  808. func (c *Client) sniffNode(ctx context.Context, url string) []*conn {
  809. var nodes []*conn
  810. // Call the Nodes Info API at /_nodes/http
  811. req, err := NewRequest("GET", url+"/_nodes/http")
  812. if err != nil {
  813. return nodes
  814. }
  815. c.mu.RLock()
  816. if c.basicAuth {
  817. req.SetBasicAuth(c.basicAuthUsername, c.basicAuthPassword)
  818. }
  819. c.mu.RUnlock()
  820. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  821. if err != nil {
  822. return nodes
  823. }
  824. if res == nil {
  825. return nodes
  826. }
  827. if res.Body != nil {
  828. defer res.Body.Close()
  829. }
  830. var info NodesInfoResponse
  831. if err := json.NewDecoder(res.Body).Decode(&info); err == nil {
  832. if len(info.Nodes) > 0 {
  833. for nodeID, node := range info.Nodes {
  834. if c.snifferCallback(node) {
  835. if node.HTTP != nil && len(node.HTTP.PublishAddress) > 0 {
  836. url := c.extractHostname(c.scheme, node.HTTP.PublishAddress)
  837. if url != "" {
  838. nodes = append(nodes, newConn(nodeID, url))
  839. }
  840. }
  841. }
  842. }
  843. }
  844. }
  845. return nodes
  846. }
  847. // reSniffHostAndPort is used to extract hostname and port from a result
  848. // from a Nodes Info API (example: "inet[/127.0.0.1:9200]").
  849. var reSniffHostAndPort = regexp.MustCompile(`\/([^:]*):([0-9]+)\]`)
  850. func (c *Client) extractHostname(scheme, address string) string {
  851. if strings.HasPrefix(address, "inet") {
  852. m := reSniffHostAndPort.FindStringSubmatch(address)
  853. if len(m) == 3 {
  854. return fmt.Sprintf("%s://%s:%s", scheme, m[1], m[2])
  855. }
  856. }
  857. s := address
  858. if idx := strings.Index(s, "/"); idx >= 0 {
  859. s = s[idx+1:]
  860. }
  861. if strings.Index(s, ":") < 0 {
  862. return ""
  863. }
  864. return fmt.Sprintf("%s://%s", scheme, s)
  865. }
  866. // updateConns updates the clients' connections with new information
  867. // gather by a sniff operation.
  868. func (c *Client) updateConns(conns []*conn) {
  869. c.connsMu.Lock()
  870. // Build up new connections:
  871. // If we find an existing connection, use that (including no. of failures etc.).
  872. // If we find a new connection, add it.
  873. var newConns []*conn
  874. for _, conn := range conns {
  875. var found bool
  876. for _, oldConn := range c.conns {
  877. if oldConn.NodeID() == conn.NodeID() {
  878. // Take over the old connection
  879. newConns = append(newConns, oldConn)
  880. found = true
  881. break
  882. }
  883. }
  884. if !found {
  885. // New connection didn't exist, so add it to our list of new conns.
  886. c.infof("elastic: %s joined the cluster", conn.URL())
  887. newConns = append(newConns, conn)
  888. }
  889. }
  890. c.conns = newConns
  891. c.cindex = -1
  892. c.connsMu.Unlock()
  893. }
  894. // healthchecker periodically runs healthcheck.
  895. func (c *Client) healthchecker() {
  896. c.mu.RLock()
  897. timeout := c.healthcheckTimeout
  898. interval := c.healthcheckInterval
  899. c.mu.RUnlock()
  900. ticker := time.NewTicker(interval)
  901. defer ticker.Stop()
  902. for {
  903. select {
  904. case <-c.healthcheckStop:
  905. // we are asked to stop, so we signal back that we're stopping now
  906. c.healthcheckStop <- true
  907. return
  908. case <-ticker.C:
  909. c.healthcheck(timeout, false)
  910. }
  911. }
  912. }
  913. // healthcheck does a health check on all nodes in the cluster. Depending on
  914. // the node state, it marks connections as dead, sets them alive etc.
  915. // If healthchecks are disabled and force is false, this is a no-op.
  916. // The timeout specifies how long to wait for a response from Elasticsearch.
  917. func (c *Client) healthcheck(timeout time.Duration, force bool) {
  918. c.mu.RLock()
  919. if !c.healthcheckEnabled && !force {
  920. c.mu.RUnlock()
  921. return
  922. }
  923. basicAuth := c.basicAuth
  924. basicAuthUsername := c.basicAuthUsername
  925. basicAuthPassword := c.basicAuthPassword
  926. c.mu.RUnlock()
  927. c.connsMu.RLock()
  928. conns := c.conns
  929. c.connsMu.RUnlock()
  930. for _, conn := range conns {
  931. // Run the HEAD request against ES with a timeout
  932. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  933. defer cancel()
  934. // Goroutine executes the HTTP request, returns an error and sets status
  935. var status int
  936. errc := make(chan error, 1)
  937. go func(url string) {
  938. req, err := NewRequest("HEAD", url)
  939. if err != nil {
  940. errc <- err
  941. return
  942. }
  943. if basicAuth {
  944. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  945. }
  946. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  947. if res != nil {
  948. status = res.StatusCode
  949. if res.Body != nil {
  950. res.Body.Close()
  951. }
  952. }
  953. errc <- err
  954. }(conn.URL())
  955. // Wait for the Goroutine (or its timeout)
  956. select {
  957. case <-ctx.Done(): // timeout
  958. c.errorf("elastic: %s is dead", conn.URL())
  959. conn.MarkAsDead()
  960. case err := <-errc:
  961. if err != nil {
  962. c.errorf("elastic: %s is dead", conn.URL())
  963. conn.MarkAsDead()
  964. break
  965. }
  966. if status >= 200 && status < 300 {
  967. conn.MarkAsAlive()
  968. } else {
  969. conn.MarkAsDead()
  970. c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status)
  971. }
  972. }
  973. }
  974. }
  975. // startupHealthcheck is used at startup to check if the server is available
  976. // at all.
  977. func (c *Client) startupHealthcheck(timeout time.Duration) error {
  978. c.mu.Lock()
  979. urls := c.urls
  980. basicAuth := c.basicAuth
  981. basicAuthUsername := c.basicAuthUsername
  982. basicAuthPassword := c.basicAuthPassword
  983. c.mu.Unlock()
  984. // If we don't get a connection after "timeout", we bail.
  985. var lastErr error
  986. start := time.Now()
  987. for {
  988. for _, url := range urls {
  989. req, err := http.NewRequest("HEAD", url, nil)
  990. if err != nil {
  991. return err
  992. }
  993. if basicAuth {
  994. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  995. }
  996. ctx, cancel := context.WithTimeout(req.Context(), timeout)
  997. defer cancel()
  998. req = req.WithContext(ctx)
  999. res, err := c.c.Do(req)
  1000. if err == nil && res != nil && res.StatusCode >= 200 && res.StatusCode < 300 {
  1001. return nil
  1002. } else if err != nil {
  1003. lastErr = err
  1004. }
  1005. }
  1006. time.Sleep(1 * time.Second)
  1007. if time.Since(start) > timeout {
  1008. break
  1009. }
  1010. }
  1011. if lastErr != nil {
  1012. return errors.Wrapf(ErrNoClient, "health check timeout: %v", lastErr)
  1013. }
  1014. return errors.Wrap(ErrNoClient, "health check timeout")
  1015. }
  1016. // next returns the next available connection, or ErrNoClient.
  1017. func (c *Client) next() (*conn, error) {
  1018. // We do round-robin here.
  1019. // TODO(oe) This should be a pluggable strategy, like the Selector in the official clients.
  1020. c.connsMu.Lock()
  1021. defer c.connsMu.Unlock()
  1022. i := 0
  1023. numConns := len(c.conns)
  1024. for {
  1025. i++
  1026. if i > numConns {
  1027. break // we visited all conns: they all seem to be dead
  1028. }
  1029. c.cindex++
  1030. if c.cindex >= numConns {
  1031. c.cindex = 0
  1032. }
  1033. conn := c.conns[c.cindex]
  1034. if !conn.IsDead() {
  1035. return conn, nil
  1036. }
  1037. }
  1038. // We have a deadlock here: All nodes are marked as dead.
  1039. // If sniffing is disabled, connections will never be marked alive again.
  1040. // So we are marking them as alive--if sniffing is disabled.
  1041. // They'll then be picked up in the next call to PerformRequest.
  1042. if !c.snifferEnabled {
  1043. c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns))
  1044. for _, conn := range c.conns {
  1045. conn.MarkAsAlive()
  1046. }
  1047. }
  1048. // We tried hard, but there is no node available
  1049. return nil, errors.Wrap(ErrNoClient, "no available connection")
  1050. }
  1051. // mustActiveConn returns nil if there is an active connection,
  1052. // otherwise ErrNoClient is returned.
  1053. func (c *Client) mustActiveConn() error {
  1054. c.connsMu.Lock()
  1055. defer c.connsMu.Unlock()
  1056. for _, c := range c.conns {
  1057. if !c.IsDead() {
  1058. return nil
  1059. }
  1060. }
  1061. return errors.Wrap(ErrNoClient, "no active connection found")
  1062. }
  1063. // -- PerformRequest --
  1064. // PerformRequestOptions must be passed into PerformRequest.
  1065. type PerformRequestOptions struct {
  1066. Method string
  1067. Path string
  1068. Params url.Values
  1069. Body interface{}
  1070. ContentType string
  1071. IgnoreErrors []int
  1072. Retrier Retrier
  1073. Headers http.Header
  1074. MaxResponseSize int64
  1075. }
  1076. // PerformRequest does a HTTP request to Elasticsearch.
  1077. // See PerformRequestWithContentType for details.
  1078. func (c *Client) PerformRequest(ctx context.Context, method, path string, params url.Values, body interface{}, ignoreErrors ...int) (*Response, error) {
  1079. return c.PerformRequestWithOptions(ctx, PerformRequestOptions{
  1080. Method: method,
  1081. Path: path,
  1082. Params: params,
  1083. Body: body,
  1084. ContentType: "application/json",
  1085. IgnoreErrors: ignoreErrors,
  1086. })
  1087. }
  1088. // PerformRequestWithContentType executes a HTTP request with a specific content type.
  1089. // It returns a response (which might be nil) and an error on failure.
  1090. //
  1091. // Optionally, a list of HTTP error codes to ignore can be passed.
  1092. // This is necessary for services that expect e.g. HTTP status 404 as a
  1093. // valid outcome (Exists, IndicesExists, IndicesTypeExists).
  1094. func (c *Client) PerformRequestWithContentType(ctx context.Context, method, path string, params url.Values, body interface{}, contentType string, ignoreErrors ...int) (*Response, error) {
  1095. return c.PerformRequestWithOptions(ctx, PerformRequestOptions{
  1096. Method: method,
  1097. Path: path,
  1098. Params: params,
  1099. Body: body,
  1100. ContentType: contentType,
  1101. IgnoreErrors: ignoreErrors,
  1102. })
  1103. }
  1104. // PerformRequestWithOptions executes a HTTP request with the specified options.
  1105. // It returns a response (which might be nil) and an error on failure.
  1106. func (c *Client) PerformRequestWithOptions(ctx context.Context, opt PerformRequestOptions) (*Response, error) {
  1107. start := time.Now().UTC()
  1108. c.mu.RLock()
  1109. timeout := c.healthcheckTimeout
  1110. basicAuth := c.basicAuth
  1111. basicAuthUsername := c.basicAuthUsername
  1112. basicAuthPassword := c.basicAuthPassword
  1113. sendGetBodyAs := c.sendGetBodyAs
  1114. gzipEnabled := c.gzipEnabled
  1115. healthcheckEnabled := c.healthcheckEnabled
  1116. retrier := c.retrier
  1117. if opt.Retrier != nil {
  1118. retrier = opt.Retrier
  1119. }
  1120. defaultHeaders := c.headers
  1121. c.mu.RUnlock()
  1122. var err error
  1123. var conn *conn
  1124. var req *Request
  1125. var resp *Response
  1126. var retried bool
  1127. var n int
  1128. // Change method if sendGetBodyAs is specified.
  1129. if opt.Method == "GET" && opt.Body != nil && sendGetBodyAs != "GET" {
  1130. opt.Method = sendGetBodyAs
  1131. }
  1132. for {
  1133. pathWithParams := opt.Path
  1134. if len(opt.Params) > 0 {
  1135. pathWithParams += "?" + opt.Params.Encode()
  1136. }
  1137. // Get a connection
  1138. conn, err = c.next()
  1139. if errors.Cause(err) == ErrNoClient {
  1140. n++
  1141. if !retried {
  1142. // Force a healtcheck as all connections seem to be dead.
  1143. c.healthcheck(timeout, false)
  1144. if healthcheckEnabled {
  1145. retried = true
  1146. continue
  1147. }
  1148. }
  1149. wait, ok, rerr := retrier.Retry(ctx, n, nil, nil, err)
  1150. if rerr != nil {
  1151. return nil, rerr
  1152. }
  1153. if !ok {
  1154. return nil, err
  1155. }
  1156. retried = true
  1157. time.Sleep(wait)
  1158. continue // try again
  1159. }
  1160. if err != nil {
  1161. c.errorf("elastic: cannot get connection from pool")
  1162. return nil, err
  1163. }
  1164. req, err = NewRequest(opt.Method, conn.URL()+pathWithParams)
  1165. if err != nil {
  1166. c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err)
  1167. return nil, err
  1168. }
  1169. if basicAuth {
  1170. req.SetBasicAuth(basicAuthUsername, basicAuthPassword)
  1171. }
  1172. if opt.ContentType != "" {
  1173. req.Header.Set("Content-Type", opt.ContentType)
  1174. }
  1175. if len(opt.Headers) > 0 {
  1176. for key, value := range opt.Headers {
  1177. for _, v := range value {
  1178. req.Header.Add(key, v)
  1179. }
  1180. }
  1181. }
  1182. if len(defaultHeaders) > 0 {
  1183. for key, value := range defaultHeaders {
  1184. for _, v := range value {
  1185. req.Header.Add(key, v)
  1186. }
  1187. }
  1188. }
  1189. // Set body
  1190. if opt.Body != nil {
  1191. err = req.SetBody(opt.Body, gzipEnabled)
  1192. if err != nil {
  1193. c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err)
  1194. return nil, err
  1195. }
  1196. }
  1197. // Tracing
  1198. c.dumpRequest((*http.Request)(req))
  1199. // Get response
  1200. res, err := c.c.Do((*http.Request)(req).WithContext(ctx))
  1201. if IsContextErr(err) {
  1202. // Proceed, but don't mark the node as dead
  1203. return nil, err
  1204. }
  1205. if err != nil {
  1206. n++
  1207. wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err)
  1208. if rerr != nil {
  1209. c.errorf("elastic: %s is dead", conn.URL())
  1210. conn.MarkAsDead()
  1211. return nil, rerr
  1212. }
  1213. if !ok {
  1214. c.errorf("elastic: %s is dead", conn.URL())
  1215. conn.MarkAsDead()
  1216. return nil, err
  1217. }
  1218. retried = true
  1219. time.Sleep(wait)
  1220. continue // try again
  1221. }
  1222. if res.Body != nil {
  1223. defer res.Body.Close()
  1224. }
  1225. // Tracing
  1226. c.dumpResponse(res)
  1227. // Log deprecation warnings as errors
  1228. if len(res.Header["Warning"]) > 0 {
  1229. c.deprecationlog((*http.Request)(req), res)
  1230. for _, warning := range res.Header["Warning"] {
  1231. c.errorf("Deprecation warning: %s", warning)
  1232. }
  1233. }
  1234. // Check for errors
  1235. if err := checkResponse((*http.Request)(req), res, opt.IgnoreErrors...); err != nil {
  1236. // No retry if request succeeded
  1237. // We still try to return a response.
  1238. resp, _ = c.newResponse(res, opt.MaxResponseSize)
  1239. return resp, err
  1240. }
  1241. // We successfully made a request with this connection
  1242. conn.MarkAsHealthy()
  1243. resp, err = c.newResponse(res, opt.MaxResponseSize)
  1244. if err != nil {
  1245. return nil, err
  1246. }
  1247. break
  1248. }
  1249. duration := time.Now().UTC().Sub(start)
  1250. c.infof("%s %s [status:%d, request:%.3fs]",
  1251. strings.ToUpper(opt.Method),
  1252. req.URL,
  1253. resp.StatusCode,
  1254. float64(int64(duration/time.Millisecond))/1000)
  1255. return resp, nil
  1256. }
  1257. // -- Document APIs --
  1258. // Index a document.
  1259. func (c *Client) Index() *IndexService {
  1260. return NewIndexService(c)
  1261. }
  1262. // Get a document.
  1263. func (c *Client) Get() *GetService {
  1264. return NewGetService(c)
  1265. }
  1266. // MultiGet retrieves multiple documents in one roundtrip.
  1267. func (c *Client) MultiGet() *MgetService {
  1268. return NewMgetService(c)
  1269. }
  1270. // Mget retrieves multiple documents in one roundtrip.
  1271. func (c *Client) Mget() *MgetService {
  1272. return NewMgetService(c)
  1273. }
  1274. // Delete a document.
  1275. func (c *Client) Delete() *DeleteService {
  1276. return NewDeleteService(c)
  1277. }
  1278. // DeleteByQuery deletes documents as found by a query.
  1279. func (c *Client) DeleteByQuery(indices ...string) *DeleteByQueryService {
  1280. return NewDeleteByQueryService(c).Index(indices...)
  1281. }
  1282. // Update a document.
  1283. func (c *Client) Update() *UpdateService {
  1284. return NewUpdateService(c)
  1285. }
  1286. // UpdateByQuery performs an update on a set of documents.
  1287. func (c *Client) UpdateByQuery(indices ...string) *UpdateByQueryService {
  1288. return NewUpdateByQueryService(c).Index(indices...)
  1289. }
  1290. // Bulk is the entry point to mass insert/update/delete documents.
  1291. func (c *Client) Bulk() *BulkService {
  1292. return NewBulkService(c)
  1293. }
  1294. // BulkProcessor allows setting up a concurrent processor of bulk requests.
  1295. func (c *Client) BulkProcessor() *BulkProcessorService {
  1296. return NewBulkProcessorService(c)
  1297. }
  1298. // Reindex copies data from a source index into a destination index.
  1299. //
  1300. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-reindex.html
  1301. // for details on the Reindex API.
  1302. func (c *Client) Reindex() *ReindexService {
  1303. return NewReindexService(c)
  1304. }
  1305. // TermVectors returns information and statistics on terms in the fields
  1306. // of a particular document.
  1307. func (c *Client) TermVectors(index, typ string) *TermvectorsService {
  1308. builder := NewTermvectorsService(c)
  1309. builder = builder.Index(index).Type(typ)
  1310. return builder
  1311. }
  1312. // MultiTermVectors returns information and statistics on terms in the fields
  1313. // of multiple documents.
  1314. func (c *Client) MultiTermVectors() *MultiTermvectorService {
  1315. return NewMultiTermvectorService(c)
  1316. }
  1317. // -- Search APIs --
  1318. // Search is the entry point for searches.
  1319. func (c *Client) Search(indices ...string) *SearchService {
  1320. return NewSearchService(c).Index(indices...)
  1321. }
  1322. // Suggest returns a service to return suggestions.
  1323. func (c *Client) Suggest(indices ...string) *SuggestService {
  1324. return NewSuggestService(c).Index(indices...)
  1325. }
  1326. // MultiSearch is the entry point for multi searches.
  1327. func (c *Client) MultiSearch() *MultiSearchService {
  1328. return NewMultiSearchService(c)
  1329. }
  1330. // Count documents.
  1331. func (c *Client) Count(indices ...string) *CountService {
  1332. return NewCountService(c).Index(indices...)
  1333. }
  1334. // Explain computes a score explanation for a query and a specific document.
  1335. func (c *Client) Explain(index, typ, id string) *ExplainService {
  1336. return NewExplainService(c).Index(index).Type(typ).Id(id)
  1337. }
  1338. // TODO Search Template
  1339. // TODO Search Exists API
  1340. // Validate allows a user to validate a potentially expensive query without executing it.
  1341. func (c *Client) Validate(indices ...string) *ValidateService {
  1342. return NewValidateService(c).Index(indices...)
  1343. }
  1344. // SearchShards returns statistical information about nodes and shards.
  1345. func (c *Client) SearchShards(indices ...string) *SearchShardsService {
  1346. return NewSearchShardsService(c).Index(indices...)
  1347. }
  1348. // FieldCaps returns statistical information about fields in indices.
  1349. func (c *Client) FieldCaps(indices ...string) *FieldCapsService {
  1350. return NewFieldCapsService(c).Index(indices...)
  1351. }
  1352. // FieldStats returns statistical information about fields in indices.
  1353. func (c *Client) FieldStats(indices ...string) *FieldStatsService {
  1354. return NewFieldStatsService(c).Index(indices...)
  1355. }
  1356. // Exists checks if a document exists.
  1357. func (c *Client) Exists() *ExistsService {
  1358. return NewExistsService(c)
  1359. }
  1360. // Scroll through documents. Use this to efficiently scroll through results
  1361. // while returning the results to a client.
  1362. func (c *Client) Scroll(indices ...string) *ScrollService {
  1363. return NewScrollService(c).Index(indices...)
  1364. }
  1365. // ClearScroll can be used to clear search contexts manually.
  1366. func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService {
  1367. return NewClearScrollService(c).ScrollId(scrollIds...)
  1368. }
  1369. // -- Indices APIs --
  1370. // CreateIndex returns a service to create a new index.
  1371. func (c *Client) CreateIndex(name string) *IndicesCreateService {
  1372. return NewIndicesCreateService(c).Index(name)
  1373. }
  1374. // DeleteIndex returns a service to delete an index.
  1375. func (c *Client) DeleteIndex(indices ...string) *IndicesDeleteService {
  1376. return NewIndicesDeleteService(c).Index(indices)
  1377. }
  1378. // IndexExists allows to check if an index exists.
  1379. func (c *Client) IndexExists(indices ...string) *IndicesExistsService {
  1380. return NewIndicesExistsService(c).Index(indices)
  1381. }
  1382. // ShrinkIndex returns a service to shrink one index into another.
  1383. func (c *Client) ShrinkIndex(source, target string) *IndicesShrinkService {
  1384. return NewIndicesShrinkService(c).Source(source).Target(target)
  1385. }
  1386. // RolloverIndex rolls an alias over to a new index when the existing index
  1387. // is considered to be too large or too old.
  1388. func (c *Client) RolloverIndex(alias string) *IndicesRolloverService {
  1389. return NewIndicesRolloverService(c).Alias(alias)
  1390. }
  1391. // TypeExists allows to check if one or more types exist in one or more indices.
  1392. func (c *Client) TypeExists() *IndicesExistsTypeService {
  1393. return NewIndicesExistsTypeService(c)
  1394. }
  1395. // IndexStats provides statistics on different operations happining
  1396. // in one or more indices.
  1397. func (c *Client) IndexStats(indices ...string) *IndicesStatsService {
  1398. return NewIndicesStatsService(c).Index(indices...)
  1399. }
  1400. // OpenIndex opens an index.
  1401. func (c *Client) OpenIndex(name string) *IndicesOpenService {
  1402. return NewIndicesOpenService(c).Index(name)
  1403. }
  1404. // CloseIndex closes an index.
  1405. func (c *Client) CloseIndex(name string) *IndicesCloseService {
  1406. return NewIndicesCloseService(c).Index(name)
  1407. }
  1408. // IndexGet retrieves information about one or more indices.
  1409. // IndexGet is only available for Elasticsearch 1.4 or later.
  1410. func (c *Client) IndexGet(indices ...string) *IndicesGetService {
  1411. return NewIndicesGetService(c).Index(indices...)
  1412. }
  1413. // IndexGetSettings retrieves settings of all, one or more indices.
  1414. func (c *Client) IndexGetSettings(indices ...string) *IndicesGetSettingsService {
  1415. return NewIndicesGetSettingsService(c).Index(indices...)
  1416. }
  1417. // IndexPutSettings sets settings for all, one or more indices.
  1418. func (c *Client) IndexPutSettings(indices ...string) *IndicesPutSettingsService {
  1419. return NewIndicesPutSettingsService(c).Index(indices...)
  1420. }
  1421. // IndexSegments retrieves low level segment information for all, one or more indices.
  1422. func (c *Client) IndexSegments(indices ...string) *IndicesSegmentsService {
  1423. return NewIndicesSegmentsService(c).Index(indices...)
  1424. }
  1425. // IndexAnalyze performs the analysis process on a text and returns the
  1426. // token breakdown of the text.
  1427. func (c *Client) IndexAnalyze() *IndicesAnalyzeService {
  1428. return NewIndicesAnalyzeService(c)
  1429. }
  1430. // Forcemerge optimizes one or more indices.
  1431. // It replaces the deprecated Optimize API.
  1432. func (c *Client) Forcemerge(indices ...string) *IndicesForcemergeService {
  1433. return NewIndicesForcemergeService(c).Index(indices...)
  1434. }
  1435. // Refresh asks Elasticsearch to refresh one or more indices.
  1436. func (c *Client) Refresh(indices ...string) *RefreshService {
  1437. return NewRefreshService(c).Index(indices...)
  1438. }
  1439. // Flush asks Elasticsearch to free memory from the index and
  1440. // flush data to disk.
  1441. func (c *Client) Flush(indices ...string) *IndicesFlushService {
  1442. return NewIndicesFlushService(c).Index(indices...)
  1443. }
  1444. // Alias enables the caller to add and/or remove aliases.
  1445. func (c *Client) Alias() *AliasService {
  1446. return NewAliasService(c)
  1447. }
  1448. // Aliases returns aliases by index name(s).
  1449. func (c *Client) Aliases() *AliasesService {
  1450. return NewAliasesService(c)
  1451. }
  1452. // GetTemplate gets a search template.
  1453. // Use IndexXXXTemplate funcs to manage index templates.
  1454. func (c *Client) GetTemplate() *GetTemplateService {
  1455. return NewGetTemplateService(c)
  1456. }
  1457. // PutTemplate creates or updates a search template.
  1458. // Use IndexXXXTemplate funcs to manage index templates.
  1459. func (c *Client) PutTemplate() *PutTemplateService {
  1460. return NewPutTemplateService(c)
  1461. }
  1462. // DeleteTemplate deletes a search template.
  1463. // Use IndexXXXTemplate funcs to manage index templates.
  1464. func (c *Client) DeleteTemplate() *DeleteTemplateService {
  1465. return NewDeleteTemplateService(c)
  1466. }
  1467. // IndexGetTemplate gets an index template.
  1468. // Use XXXTemplate funcs to manage search templates.
  1469. func (c *Client) IndexGetTemplate(names ...string) *IndicesGetTemplateService {
  1470. return NewIndicesGetTemplateService(c).Name(names...)
  1471. }
  1472. // IndexTemplateExists gets check if an index template exists.
  1473. // Use XXXTemplate funcs to manage search templates.
  1474. func (c *Client) IndexTemplateExists(name string) *IndicesExistsTemplateService {
  1475. return NewIndicesExistsTemplateService(c).Name(name)
  1476. }
  1477. // IndexPutTemplate creates or updates an index template.
  1478. // Use XXXTemplate funcs to manage search templates.
  1479. func (c *Client) IndexPutTemplate(name string) *IndicesPutTemplateService {
  1480. return NewIndicesPutTemplateService(c).Name(name)
  1481. }
  1482. // IndexDeleteTemplate deletes an index template.
  1483. // Use XXXTemplate funcs to manage search templates.
  1484. func (c *Client) IndexDeleteTemplate(name string) *IndicesDeleteTemplateService {
  1485. return NewIndicesDeleteTemplateService(c).Name(name)
  1486. }
  1487. // GetMapping gets a mapping.
  1488. func (c *Client) GetMapping() *IndicesGetMappingService {
  1489. return NewIndicesGetMappingService(c)
  1490. }
  1491. // PutMapping registers a mapping.
  1492. func (c *Client) PutMapping() *IndicesPutMappingService {
  1493. return NewIndicesPutMappingService(c)
  1494. }
  1495. // GetFieldMapping gets mapping for fields.
  1496. func (c *Client) GetFieldMapping() *IndicesGetFieldMappingService {
  1497. return NewIndicesGetFieldMappingService(c)
  1498. }
  1499. // -- cat APIs --
  1500. // TODO cat aliases
  1501. // TODO cat allocation
  1502. // TODO cat count
  1503. // TODO cat fielddata
  1504. // TODO cat health
  1505. // TODO cat indices
  1506. // TODO cat master
  1507. // TODO cat nodes
  1508. // TODO cat pending tasks
  1509. // TODO cat plugins
  1510. // TODO cat recovery
  1511. // TODO cat thread pool
  1512. // TODO cat shards
  1513. // TODO cat segments
  1514. // CatAliases returns information about aliases.
  1515. func (c *Client) CatAliases() *CatAliasesService {
  1516. return NewCatAliasesService(c)
  1517. }
  1518. // CatAllocation returns information about the allocation across nodes.
  1519. func (c *Client) CatAllocation() *CatAllocationService {
  1520. return NewCatAllocationService(c)
  1521. }
  1522. // CatCount returns document counts for indices.
  1523. func (c *Client) CatCount() *CatCountService {
  1524. return NewCatCountService(c)
  1525. }
  1526. // CatHealth returns information about cluster health.
  1527. func (c *Client) CatHealth() *CatHealthService {
  1528. return NewCatHealthService(c)
  1529. }
  1530. // CatIndices returns information about indices.
  1531. func (c *Client) CatIndices() *CatIndicesService {
  1532. return NewCatIndicesService(c)
  1533. }
  1534. // CatShards returns information about shards.
  1535. func (c *Client) CatShards() *CatShardsService {
  1536. return NewCatShardsService(c)
  1537. }
  1538. // -- Ingest APIs --
  1539. // IngestPutPipeline adds pipelines and updates existing pipelines in
  1540. // the cluster.
  1541. func (c *Client) IngestPutPipeline(id string) *IngestPutPipelineService {
  1542. return NewIngestPutPipelineService(c).Id(id)
  1543. }
  1544. // IngestGetPipeline returns pipelines based on ID.
  1545. func (c *Client) IngestGetPipeline(ids ...string) *IngestGetPipelineService {
  1546. return NewIngestGetPipelineService(c).Id(ids...)
  1547. }
  1548. // IngestDeletePipeline deletes a pipeline by ID.
  1549. func (c *Client) IngestDeletePipeline(id string) *IngestDeletePipelineService {
  1550. return NewIngestDeletePipelineService(c).Id(id)
  1551. }
  1552. // IngestSimulatePipeline executes a specific pipeline against the set of
  1553. // documents provided in the body of the request.
  1554. func (c *Client) IngestSimulatePipeline() *IngestSimulatePipelineService {
  1555. return NewIngestSimulatePipelineService(c)
  1556. }
  1557. // -- Cluster APIs --
  1558. // ClusterHealth retrieves the health of the cluster.
  1559. func (c *Client) ClusterHealth() *ClusterHealthService {
  1560. return NewClusterHealthService(c)
  1561. }
  1562. // ClusterState retrieves the state of the cluster.
  1563. func (c *Client) ClusterState() *ClusterStateService {
  1564. return NewClusterStateService(c)
  1565. }
  1566. // ClusterStats retrieves cluster statistics.
  1567. func (c *Client) ClusterStats() *ClusterStatsService {
  1568. return NewClusterStatsService(c)
  1569. }
  1570. // NodesInfo retrieves one or more or all of the cluster nodes information.
  1571. func (c *Client) NodesInfo() *NodesInfoService {
  1572. return NewNodesInfoService(c)
  1573. }
  1574. // NodesStats retrieves one or more or all of the cluster nodes statistics.
  1575. func (c *Client) NodesStats() *NodesStatsService {
  1576. return NewNodesStatsService(c)
  1577. }
  1578. // TasksCancel cancels tasks running on the specified nodes.
  1579. func (c *Client) TasksCancel() *TasksCancelService {
  1580. return NewTasksCancelService(c)
  1581. }
  1582. // TasksList retrieves the list of tasks running on the specified nodes.
  1583. func (c *Client) TasksList() *TasksListService {
  1584. return NewTasksListService(c)
  1585. }
  1586. // TasksGetTask retrieves a task running on the cluster.
  1587. func (c *Client) TasksGetTask() *TasksGetTaskService {
  1588. return NewTasksGetTaskService(c)
  1589. }
  1590. // TODO Pending cluster tasks
  1591. // TODO Cluster Reroute
  1592. // TODO Cluster Update Settings
  1593. // TODO Nodes Stats
  1594. // TODO Nodes hot_threads
  1595. // -- Snapshot and Restore --
  1596. // TODO Snapshot Delete
  1597. // TODO Snapshot Get
  1598. // TODO Snapshot Restore
  1599. // TODO Snapshot Status
  1600. // SnapshotCreate creates a snapshot.
  1601. func (c *Client) SnapshotCreate(repository string, snapshot string) *SnapshotCreateService {
  1602. return NewSnapshotCreateService(c).Repository(repository).Snapshot(snapshot)
  1603. }
  1604. // SnapshotCreateRepository creates or updates a snapshot repository.
  1605. func (c *Client) SnapshotCreateRepository(repository string) *SnapshotCreateRepositoryService {
  1606. return NewSnapshotCreateRepositoryService(c).Repository(repository)
  1607. }
  1608. // SnapshotDeleteRepository deletes a snapshot repository.
  1609. func (c *Client) SnapshotDeleteRepository(repositories ...string) *SnapshotDeleteRepositoryService {
  1610. return NewSnapshotDeleteRepositoryService(c).Repository(repositories...)
  1611. }
  1612. // SnapshotGetRepository gets a snapshot repository.
  1613. func (c *Client) SnapshotGetRepository(repositories ...string) *SnapshotGetRepositoryService {
  1614. return NewSnapshotGetRepositoryService(c).Repository(repositories...)
  1615. }
  1616. // SnapshotVerifyRepository verifies a snapshot repository.
  1617. func (c *Client) SnapshotVerifyRepository(repository string) *SnapshotVerifyRepositoryService {
  1618. return NewSnapshotVerifyRepositoryService(c).Repository(repository)
  1619. }
  1620. // -- Helpers and shortcuts --
  1621. // ElasticsearchVersion returns the version number of Elasticsearch
  1622. // running on the given URL.
  1623. func (c *Client) ElasticsearchVersion(url string) (string, error) {
  1624. res, _, err := c.Ping(url).Do(context.Background())
  1625. if err != nil {
  1626. return "", err
  1627. }
  1628. return res.Version.Number, nil
  1629. }
  1630. // IndexNames returns the names of all indices in the cluster.
  1631. func (c *Client) IndexNames() ([]string, error) {
  1632. res, err := c.IndexGetSettings().Index("_all").Do(context.Background())
  1633. if err != nil {
  1634. return nil, err
  1635. }
  1636. var names []string
  1637. for name := range res {
  1638. names = append(names, name)
  1639. }
  1640. return names, nil
  1641. }
  1642. // Ping checks if a given node in a cluster exists and (optionally)
  1643. // returns some basic information about the Elasticsearch server,
  1644. // e.g. the Elasticsearch version number.
  1645. //
  1646. // Notice that you need to specify a URL here explicitly.
  1647. func (c *Client) Ping(url string) *PingService {
  1648. return NewPingService(c).URL(url)
  1649. }
  1650. // WaitForStatus waits for the cluster to have the given status.
  1651. // This is a shortcut method for the ClusterHealth service.
  1652. //
  1653. // WaitForStatus waits for the specified timeout, e.g. "10s".
  1654. // If the cluster will have the given state within the timeout, nil is returned.
  1655. // If the request timed out, ErrTimeout is returned.
  1656. func (c *Client) WaitForStatus(status string, timeout string) error {
  1657. health, err := c.ClusterHealth().WaitForStatus(status).Timeout(timeout).Do(context.Background())
  1658. if err != nil {
  1659. return err
  1660. }
  1661. if health.TimedOut {
  1662. return ErrTimeout
  1663. }
  1664. return nil
  1665. }
  1666. // WaitForGreenStatus waits for the cluster to have the "green" status.
  1667. // See WaitForStatus for more details.
  1668. func (c *Client) WaitForGreenStatus(timeout string) error {
  1669. return c.WaitForStatus("green", timeout)
  1670. }
  1671. // WaitForYellowStatus waits for the cluster to have the "yellow" status.
  1672. // See WaitForStatus for more details.
  1673. func (c *Client) WaitForYellowStatus(timeout string) error {
  1674. return c.WaitForStatus("yellow", timeout)
  1675. }