update_by_query.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. "strings"
  10. "gopkg.in/olivere/elastic.v5/uritemplates"
  11. )
  12. // UpdateByQueryService is documented at https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html.
  13. type UpdateByQueryService struct {
  14. client *Client
  15. pretty bool
  16. index []string
  17. typ []string
  18. script *Script
  19. query Query
  20. body interface{}
  21. xSource []string
  22. xSourceExclude []string
  23. xSourceInclude []string
  24. allowNoIndices *bool
  25. analyzeWildcard *bool
  26. analyzer string
  27. conflicts string
  28. defaultOperator string
  29. docvalueFields []string
  30. df string
  31. expandWildcards string
  32. explain *bool
  33. fielddataFields []string
  34. from *int
  35. ignoreUnavailable *bool
  36. lenient *bool
  37. lowercaseExpandedTerms *bool
  38. pipeline string
  39. preference string
  40. q string
  41. refresh string
  42. requestCache *bool
  43. requestsPerSecond *int
  44. routing []string
  45. scroll string
  46. scrollSize *int
  47. searchTimeout string
  48. searchType string
  49. size *int
  50. sort []string
  51. stats []string
  52. storedFields []string
  53. suggestField string
  54. suggestMode string
  55. suggestSize *int
  56. suggestText string
  57. terminateAfter *int
  58. timeout string
  59. trackScores *bool
  60. version *bool
  61. versionType *bool
  62. waitForActiveShards string
  63. waitForCompletion *bool
  64. }
  65. // NewUpdateByQueryService creates a new UpdateByQueryService.
  66. func NewUpdateByQueryService(client *Client) *UpdateByQueryService {
  67. return &UpdateByQueryService{
  68. client: client,
  69. }
  70. }
  71. // Index is a list of index names to search; use `_all` or empty string to
  72. // perform the operation on all indices.
  73. func (s *UpdateByQueryService) Index(index ...string) *UpdateByQueryService {
  74. s.index = append(s.index, index...)
  75. return s
  76. }
  77. // Type is a list of document types to search; leave empty to perform
  78. // the operation on all types.
  79. func (s *UpdateByQueryService) Type(typ ...string) *UpdateByQueryService {
  80. s.typ = append(s.typ, typ...)
  81. return s
  82. }
  83. // Pretty indicates that the JSON response be indented and human readable.
  84. func (s *UpdateByQueryService) Pretty(pretty bool) *UpdateByQueryService {
  85. s.pretty = pretty
  86. return s
  87. }
  88. // Script sets an update script.
  89. func (s *UpdateByQueryService) Script(script *Script) *UpdateByQueryService {
  90. s.script = script
  91. return s
  92. }
  93. // Body specifies the body of the request. It overrides data being specified via
  94. // SearchService or Script.
  95. func (s *UpdateByQueryService) Body(body string) *UpdateByQueryService {
  96. s.body = body
  97. return s
  98. }
  99. // XSource is true or false to return the _source field or not,
  100. // or a list of fields to return.
  101. func (s *UpdateByQueryService) XSource(xSource ...string) *UpdateByQueryService {
  102. s.xSource = append(s.xSource, xSource...)
  103. return s
  104. }
  105. // XSourceExclude represents a list of fields to exclude from the returned _source field.
  106. func (s *UpdateByQueryService) XSourceExclude(xSourceExclude ...string) *UpdateByQueryService {
  107. s.xSourceExclude = append(s.xSourceExclude, xSourceExclude...)
  108. return s
  109. }
  110. // XSourceInclude represents a list of fields to extract and return from the _source field.
  111. func (s *UpdateByQueryService) XSourceInclude(xSourceInclude ...string) *UpdateByQueryService {
  112. s.xSourceInclude = append(s.xSourceInclude, xSourceInclude...)
  113. return s
  114. }
  115. // AllowNoIndices indicates whether to ignore if a wildcard indices expression
  116. // resolves into no concrete indices. (This includes `_all` string or when
  117. // no indices have been specified).
  118. func (s *UpdateByQueryService) AllowNoIndices(allowNoIndices bool) *UpdateByQueryService {
  119. s.allowNoIndices = &allowNoIndices
  120. return s
  121. }
  122. // AnalyzeWildcard specifies whether wildcard and prefix queries should be
  123. // analyzed (default: false).
  124. func (s *UpdateByQueryService) AnalyzeWildcard(analyzeWildcard bool) *UpdateByQueryService {
  125. s.analyzeWildcard = &analyzeWildcard
  126. return s
  127. }
  128. // Analyzer specifies the analyzer to use for the query string.
  129. func (s *UpdateByQueryService) Analyzer(analyzer string) *UpdateByQueryService {
  130. s.analyzer = analyzer
  131. return s
  132. }
  133. // Conflicts indicates what to do when the process detects version conflicts.
  134. // Possible values are "proceed" and "abort".
  135. func (s *UpdateByQueryService) Conflicts(conflicts string) *UpdateByQueryService {
  136. s.conflicts = conflicts
  137. return s
  138. }
  139. // AbortOnVersionConflict aborts the request on version conflicts.
  140. // It is an alias to setting Conflicts("abort").
  141. func (s *UpdateByQueryService) AbortOnVersionConflict() *UpdateByQueryService {
  142. s.conflicts = "abort"
  143. return s
  144. }
  145. // ProceedOnVersionConflict aborts the request on version conflicts.
  146. // It is an alias to setting Conflicts("proceed").
  147. func (s *UpdateByQueryService) ProceedOnVersionConflict() *UpdateByQueryService {
  148. s.conflicts = "proceed"
  149. return s
  150. }
  151. // DefaultOperator is the default operator for query string query (AND or OR).
  152. func (s *UpdateByQueryService) DefaultOperator(defaultOperator string) *UpdateByQueryService {
  153. s.defaultOperator = defaultOperator
  154. return s
  155. }
  156. // DF specifies the field to use as default where no field prefix is given in the query string.
  157. func (s *UpdateByQueryService) DF(df string) *UpdateByQueryService {
  158. s.df = df
  159. return s
  160. }
  161. // DocvalueFields specifies the list of fields to return as the docvalue representation of a field for each hit.
  162. func (s *UpdateByQueryService) DocvalueFields(docvalueFields ...string) *UpdateByQueryService {
  163. s.docvalueFields = docvalueFields
  164. return s
  165. }
  166. // ExpandWildcards indicates whether to expand wildcard expression to
  167. // concrete indices that are open, closed or both.
  168. func (s *UpdateByQueryService) ExpandWildcards(expandWildcards string) *UpdateByQueryService {
  169. s.expandWildcards = expandWildcards
  170. return s
  171. }
  172. // Explain specifies whether to return detailed information about score
  173. // computation as part of a hit.
  174. func (s *UpdateByQueryService) Explain(explain bool) *UpdateByQueryService {
  175. s.explain = &explain
  176. return s
  177. }
  178. // FielddataFields is a list of fields to return as the field data
  179. // representation of a field for each hit.
  180. func (s *UpdateByQueryService) FielddataFields(fielddataFields ...string) *UpdateByQueryService {
  181. s.fielddataFields = append(s.fielddataFields, fielddataFields...)
  182. return s
  183. }
  184. // From is the starting offset (default: 0).
  185. func (s *UpdateByQueryService) From(from int) *UpdateByQueryService {
  186. s.from = &from
  187. return s
  188. }
  189. // IgnoreUnavailable indicates whether specified concrete indices should be
  190. // ignored when unavailable (missing or closed).
  191. func (s *UpdateByQueryService) IgnoreUnavailable(ignoreUnavailable bool) *UpdateByQueryService {
  192. s.ignoreUnavailable = &ignoreUnavailable
  193. return s
  194. }
  195. // Lenient specifies whether format-based query failures
  196. // (such as providing text to a numeric field) should be ignored.
  197. func (s *UpdateByQueryService) Lenient(lenient bool) *UpdateByQueryService {
  198. s.lenient = &lenient
  199. return s
  200. }
  201. // LowercaseExpandedTerms specifies whether query terms should be lowercased.
  202. func (s *UpdateByQueryService) LowercaseExpandedTerms(lowercaseExpandedTerms bool) *UpdateByQueryService {
  203. s.lowercaseExpandedTerms = &lowercaseExpandedTerms
  204. return s
  205. }
  206. // Pipeline specifies the ingest pipeline to set on index requests made by this action (default: none).
  207. func (s *UpdateByQueryService) Pipeline(pipeline string) *UpdateByQueryService {
  208. s.pipeline = pipeline
  209. return s
  210. }
  211. // Preference specifies the node or shard the operation should be performed on
  212. // (default: random).
  213. func (s *UpdateByQueryService) Preference(preference string) *UpdateByQueryService {
  214. s.preference = preference
  215. return s
  216. }
  217. // Q specifies the query in the Lucene query string syntax.
  218. func (s *UpdateByQueryService) Q(q string) *UpdateByQueryService {
  219. s.q = q
  220. return s
  221. }
  222. // Query sets a query definition using the Query DSL.
  223. func (s *UpdateByQueryService) Query(query Query) *UpdateByQueryService {
  224. s.query = query
  225. return s
  226. }
  227. // Refresh indicates whether the effected indexes should be refreshed.
  228. func (s *UpdateByQueryService) Refresh(refresh string) *UpdateByQueryService {
  229. s.refresh = refresh
  230. return s
  231. }
  232. // RequestCache specifies if request cache should be used for this request
  233. // or not, defaults to index level setting.
  234. func (s *UpdateByQueryService) RequestCache(requestCache bool) *UpdateByQueryService {
  235. s.requestCache = &requestCache
  236. return s
  237. }
  238. // RequestsPerSecond sets the throttle on this request in sub-requests per second.
  239. // -1 means set no throttle as does "unlimited" which is the only non-float this accepts.
  240. func (s *UpdateByQueryService) RequestsPerSecond(requestsPerSecond int) *UpdateByQueryService {
  241. s.requestsPerSecond = &requestsPerSecond
  242. return s
  243. }
  244. // Routing is a list of specific routing values.
  245. func (s *UpdateByQueryService) Routing(routing ...string) *UpdateByQueryService {
  246. s.routing = append(s.routing, routing...)
  247. return s
  248. }
  249. // Scroll specifies how long a consistent view of the index should be maintained
  250. // for scrolled search.
  251. func (s *UpdateByQueryService) Scroll(scroll string) *UpdateByQueryService {
  252. s.scroll = scroll
  253. return s
  254. }
  255. // ScrollSize is the size on the scroll request powering the update_by_query.
  256. func (s *UpdateByQueryService) ScrollSize(scrollSize int) *UpdateByQueryService {
  257. s.scrollSize = &scrollSize
  258. return s
  259. }
  260. // SearchTimeout defines an explicit timeout for each search request.
  261. // Defaults to no timeout.
  262. func (s *UpdateByQueryService) SearchTimeout(searchTimeout string) *UpdateByQueryService {
  263. s.searchTimeout = searchTimeout
  264. return s
  265. }
  266. // SearchType is the search operation type. Possible values are
  267. // "query_then_fetch" and "dfs_query_then_fetch".
  268. func (s *UpdateByQueryService) SearchType(searchType string) *UpdateByQueryService {
  269. s.searchType = searchType
  270. return s
  271. }
  272. // Size represents the number of hits to return (default: 10).
  273. func (s *UpdateByQueryService) Size(size int) *UpdateByQueryService {
  274. s.size = &size
  275. return s
  276. }
  277. // Sort is a list of <field>:<direction> pairs.
  278. func (s *UpdateByQueryService) Sort(sort ...string) *UpdateByQueryService {
  279. s.sort = append(s.sort, sort...)
  280. return s
  281. }
  282. // SortByField adds a sort order.
  283. func (s *UpdateByQueryService) SortByField(field string, ascending bool) *UpdateByQueryService {
  284. if ascending {
  285. s.sort = append(s.sort, fmt.Sprintf("%s:asc", field))
  286. } else {
  287. s.sort = append(s.sort, fmt.Sprintf("%s:desc", field))
  288. }
  289. return s
  290. }
  291. // Stats specifies specific tag(s) of the request for logging and statistical purposes.
  292. func (s *UpdateByQueryService) Stats(stats ...string) *UpdateByQueryService {
  293. s.stats = append(s.stats, stats...)
  294. return s
  295. }
  296. // StoredFields specifies the list of stored fields to return as part of a hit.
  297. func (s *UpdateByQueryService) StoredFields(storedFields ...string) *UpdateByQueryService {
  298. s.storedFields = storedFields
  299. return s
  300. }
  301. // SuggestField specifies which field to use for suggestions.
  302. func (s *UpdateByQueryService) SuggestField(suggestField string) *UpdateByQueryService {
  303. s.suggestField = suggestField
  304. return s
  305. }
  306. // SuggestMode specifies the suggest mode. Possible values are
  307. // "missing", "popular", and "always".
  308. func (s *UpdateByQueryService) SuggestMode(suggestMode string) *UpdateByQueryService {
  309. s.suggestMode = suggestMode
  310. return s
  311. }
  312. // SuggestSize specifies how many suggestions to return in response.
  313. func (s *UpdateByQueryService) SuggestSize(suggestSize int) *UpdateByQueryService {
  314. s.suggestSize = &suggestSize
  315. return s
  316. }
  317. // SuggestText specifies the source text for which the suggestions should be returned.
  318. func (s *UpdateByQueryService) SuggestText(suggestText string) *UpdateByQueryService {
  319. s.suggestText = suggestText
  320. return s
  321. }
  322. // TerminateAfter indicates the maximum number of documents to collect
  323. // for each shard, upon reaching which the query execution will terminate early.
  324. func (s *UpdateByQueryService) TerminateAfter(terminateAfter int) *UpdateByQueryService {
  325. s.terminateAfter = &terminateAfter
  326. return s
  327. }
  328. // Timeout is the time each individual bulk request should wait for shards
  329. // that are unavailable.
  330. func (s *UpdateByQueryService) Timeout(timeout string) *UpdateByQueryService {
  331. s.timeout = timeout
  332. return s
  333. }
  334. // TimeoutInMillis sets the timeout in milliseconds.
  335. func (s *UpdateByQueryService) TimeoutInMillis(timeoutInMillis int) *UpdateByQueryService {
  336. s.timeout = fmt.Sprintf("%dms", timeoutInMillis)
  337. return s
  338. }
  339. // TrackScores indicates whether to calculate and return scores even if
  340. // they are not used for sorting.
  341. func (s *UpdateByQueryService) TrackScores(trackScores bool) *UpdateByQueryService {
  342. s.trackScores = &trackScores
  343. return s
  344. }
  345. // Version specifies whether to return document version as part of a hit.
  346. func (s *UpdateByQueryService) Version(version bool) *UpdateByQueryService {
  347. s.version = &version
  348. return s
  349. }
  350. // VersionType indicates if the document increment the version number (internal)
  351. // on hit or not (reindex).
  352. func (s *UpdateByQueryService) VersionType(versionType bool) *UpdateByQueryService {
  353. s.versionType = &versionType
  354. return s
  355. }
  356. // WaitForActiveShards sets the number of shard copies that must be active before proceeding
  357. // with the update by query operation. Defaults to 1, meaning the primary shard only.
  358. // Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal
  359. // to the total number of copies for the shard (number of replicas + 1).
  360. func (s *UpdateByQueryService) WaitForActiveShards(waitForActiveShards string) *UpdateByQueryService {
  361. s.waitForActiveShards = waitForActiveShards
  362. return s
  363. }
  364. // WaitForCompletion indicates if the request should block until the reindex is complete.
  365. func (s *UpdateByQueryService) WaitForCompletion(waitForCompletion bool) *UpdateByQueryService {
  366. s.waitForCompletion = &waitForCompletion
  367. return s
  368. }
  369. // buildURL builds the URL for the operation.
  370. func (s *UpdateByQueryService) buildURL() (string, url.Values, error) {
  371. // Build URL
  372. var err error
  373. var path string
  374. if len(s.typ) > 0 {
  375. path, err = uritemplates.Expand("/{index}/{type}/_update_by_query", map[string]string{
  376. "index": strings.Join(s.index, ","),
  377. "type": strings.Join(s.typ, ","),
  378. })
  379. } else {
  380. path, err = uritemplates.Expand("/{index}/_update_by_query", map[string]string{
  381. "index": strings.Join(s.index, ","),
  382. })
  383. }
  384. if err != nil {
  385. return "", url.Values{}, err
  386. }
  387. // Add query string parameters
  388. params := url.Values{}
  389. if s.pretty {
  390. params.Set("pretty", "1")
  391. }
  392. if len(s.xSource) > 0 {
  393. params.Set("_source", strings.Join(s.xSource, ","))
  394. }
  395. if len(s.xSourceExclude) > 0 {
  396. params.Set("_source_exclude", strings.Join(s.xSourceExclude, ","))
  397. }
  398. if len(s.xSourceInclude) > 0 {
  399. params.Set("_source_include", strings.Join(s.xSourceInclude, ","))
  400. }
  401. if s.allowNoIndices != nil {
  402. params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices))
  403. }
  404. if s.analyzer != "" {
  405. params.Set("analyzer", s.analyzer)
  406. }
  407. if s.analyzeWildcard != nil {
  408. params.Set("analyze_wildcard", fmt.Sprintf("%v", *s.analyzeWildcard))
  409. }
  410. if s.conflicts != "" {
  411. params.Set("conflicts", s.conflicts)
  412. }
  413. if s.defaultOperator != "" {
  414. params.Set("default_operator", s.defaultOperator)
  415. }
  416. if s.df != "" {
  417. params.Set("df", s.df)
  418. }
  419. if s.expandWildcards != "" {
  420. params.Set("expand_wildcards", s.expandWildcards)
  421. }
  422. if s.explain != nil {
  423. params.Set("explain", fmt.Sprintf("%v", *s.explain))
  424. }
  425. if len(s.storedFields) > 0 {
  426. params.Set("stored_fields", strings.Join(s.storedFields, ","))
  427. }
  428. if len(s.docvalueFields) > 0 {
  429. params.Set("docvalue_fields", strings.Join(s.docvalueFields, ","))
  430. }
  431. if len(s.fielddataFields) > 0 {
  432. params.Set("fielddata_fields", strings.Join(s.fielddataFields, ","))
  433. }
  434. if s.from != nil {
  435. params.Set("from", fmt.Sprintf("%d", *s.from))
  436. }
  437. if s.ignoreUnavailable != nil {
  438. params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable))
  439. }
  440. if s.lenient != nil {
  441. params.Set("lenient", fmt.Sprintf("%v", *s.lenient))
  442. }
  443. if s.lowercaseExpandedTerms != nil {
  444. params.Set("lowercase_expanded_terms", fmt.Sprintf("%v", *s.lowercaseExpandedTerms))
  445. }
  446. if s.pipeline != "" {
  447. params.Set("pipeline", s.pipeline)
  448. }
  449. if s.preference != "" {
  450. params.Set("preference", s.preference)
  451. }
  452. if s.q != "" {
  453. params.Set("q", s.q)
  454. }
  455. if s.refresh != "" {
  456. params.Set("refresh", s.refresh)
  457. }
  458. if s.requestCache != nil {
  459. params.Set("request_cache", fmt.Sprintf("%v", *s.requestCache))
  460. }
  461. if len(s.routing) > 0 {
  462. params.Set("routing", strings.Join(s.routing, ","))
  463. }
  464. if s.scroll != "" {
  465. params.Set("scroll", s.scroll)
  466. }
  467. if s.scrollSize != nil {
  468. params.Set("scroll_size", fmt.Sprintf("%d", *s.scrollSize))
  469. }
  470. if s.searchTimeout != "" {
  471. params.Set("search_timeout", s.searchTimeout)
  472. }
  473. if s.searchType != "" {
  474. params.Set("search_type", s.searchType)
  475. }
  476. if s.size != nil {
  477. params.Set("size", fmt.Sprintf("%d", *s.size))
  478. }
  479. if len(s.sort) > 0 {
  480. params.Set("sort", strings.Join(s.sort, ","))
  481. }
  482. if len(s.stats) > 0 {
  483. params.Set("stats", strings.Join(s.stats, ","))
  484. }
  485. if s.suggestField != "" {
  486. params.Set("suggest_field", s.suggestField)
  487. }
  488. if s.suggestMode != "" {
  489. params.Set("suggest_mode", s.suggestMode)
  490. }
  491. if s.suggestSize != nil {
  492. params.Set("suggest_size", fmt.Sprintf("%v", *s.suggestSize))
  493. }
  494. if s.suggestText != "" {
  495. params.Set("suggest_text", s.suggestText)
  496. }
  497. if s.terminateAfter != nil {
  498. params.Set("terminate_after", fmt.Sprintf("%v", *s.terminateAfter))
  499. }
  500. if s.timeout != "" {
  501. params.Set("timeout", s.timeout)
  502. }
  503. if s.trackScores != nil {
  504. params.Set("track_scores", fmt.Sprintf("%v", *s.trackScores))
  505. }
  506. if s.version != nil {
  507. params.Set("version", fmt.Sprintf("%v", *s.version))
  508. }
  509. if s.versionType != nil {
  510. params.Set("version_type", fmt.Sprintf("%v", *s.versionType))
  511. }
  512. if s.waitForActiveShards != "" {
  513. params.Set("wait_for_active_shards", s.waitForActiveShards)
  514. }
  515. if s.waitForCompletion != nil {
  516. params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
  517. }
  518. if s.requestsPerSecond != nil {
  519. params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond))
  520. }
  521. return path, params, nil
  522. }
  523. // Validate checks if the operation is valid.
  524. func (s *UpdateByQueryService) Validate() error {
  525. var invalid []string
  526. if len(s.index) == 0 {
  527. invalid = append(invalid, "Index")
  528. }
  529. if len(invalid) > 0 {
  530. return fmt.Errorf("missing required fields: %v", invalid)
  531. }
  532. return nil
  533. }
  534. // getBody returns the body part of the document request.
  535. func (s *UpdateByQueryService) getBody() (interface{}, error) {
  536. if s.body != nil {
  537. return s.body, nil
  538. }
  539. source := make(map[string]interface{})
  540. if s.script != nil {
  541. src, err := s.script.Source()
  542. if err != nil {
  543. return nil, err
  544. }
  545. source["script"] = src
  546. }
  547. if s.query != nil {
  548. src, err := s.query.Source()
  549. if err != nil {
  550. return nil, err
  551. }
  552. source["query"] = src
  553. }
  554. return source, nil
  555. }
  556. // Do executes the operation.
  557. func (s *UpdateByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error) {
  558. // Check pre-conditions
  559. if err := s.Validate(); err != nil {
  560. return nil, err
  561. }
  562. // Get URL for request
  563. path, params, err := s.buildURL()
  564. if err != nil {
  565. return nil, err
  566. }
  567. // Setup HTTP request body
  568. body, err := s.getBody()
  569. if err != nil {
  570. return nil, err
  571. }
  572. // Get HTTP response
  573. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  574. if err != nil {
  575. return nil, err
  576. }
  577. // Return operation response (BulkIndexByScrollResponse is defined in DeleteByQuery)
  578. ret := new(BulkIndexByScrollResponse)
  579. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  580. return nil, err
  581. }
  582. return ret, nil
  583. }
  584. // DoAsync executes the reindexing operation asynchronously by starting a new task.
  585. // Callers need to use the Task Management API to watch the outcome of the reindexing
  586. // operation.
  587. func (s *UpdateByQueryService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
  588. // Check pre-conditions
  589. if err := s.Validate(); err != nil {
  590. return nil, err
  591. }
  592. // DoAsync only makes sense with WaitForCompletion set to true
  593. if s.waitForCompletion != nil && *s.waitForCompletion {
  594. return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
  595. }
  596. f := false
  597. s.waitForCompletion = &f
  598. // Get URL for request
  599. path, params, err := s.buildURL()
  600. if err != nil {
  601. return nil, err
  602. }
  603. // Setup HTTP request body
  604. body, err := s.getBody()
  605. if err != nil {
  606. return nil, err
  607. }
  608. // Get HTTP response
  609. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  610. if err != nil {
  611. return nil, err
  612. }
  613. // Return operation response
  614. ret := new(StartTaskResult)
  615. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  616. return nil, err
  617. }
  618. return ret, nil
  619. }