reindex.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. )
  10. // ReindexService is a method to copy documents from one index to another.
  11. // It is documented at https://www.elastic.co/guide/en/elasticsearch/reference/5.0/docs-reindex.html.
  12. type ReindexService struct {
  13. client *Client
  14. pretty bool
  15. refresh string
  16. timeout string
  17. waitForActiveShards string
  18. waitForCompletion *bool
  19. requestsPerSecond *int
  20. slices *int
  21. body interface{}
  22. source *ReindexSource
  23. destination *ReindexDestination
  24. conflicts string
  25. size *int
  26. script *Script
  27. }
  28. // NewReindexService creates a new ReindexService.
  29. func NewReindexService(client *Client) *ReindexService {
  30. return &ReindexService{
  31. client: client,
  32. }
  33. }
  34. // WaitForActiveShards sets the number of shard copies that must be active before
  35. // proceeding with the reindex operation. Defaults to 1, meaning the primary shard only.
  36. // Set to `all` for all shard copies, otherwise set to any non-negative value less than or
  37. // equal to the total number of copies for the shard (number of replicas + 1).
  38. func (s *ReindexService) WaitForActiveShards(waitForActiveShards string) *ReindexService {
  39. s.waitForActiveShards = waitForActiveShards
  40. return s
  41. }
  42. // RequestsPerSecond specifies the throttle to set on this request in sub-requests per second.
  43. // -1 means set no throttle as does "unlimited" which is the only non-float this accepts.
  44. func (s *ReindexService) RequestsPerSecond(requestsPerSecond int) *ReindexService {
  45. s.requestsPerSecond = &requestsPerSecond
  46. return s
  47. }
  48. // Slices specifies the number of slices this task should be divided into. Defaults to 1.
  49. func (s *ReindexService) Slices(slices int) *ReindexService {
  50. s.slices = &slices
  51. return s
  52. }
  53. // Refresh indicates whether Elasticsearch should refresh the effected indexes
  54. // immediately.
  55. func (s *ReindexService) Refresh(refresh string) *ReindexService {
  56. s.refresh = refresh
  57. return s
  58. }
  59. // Timeout is the time each individual bulk request should wait for shards
  60. // that are unavailable.
  61. func (s *ReindexService) Timeout(timeout string) *ReindexService {
  62. s.timeout = timeout
  63. return s
  64. }
  65. // WaitForCompletion indicates whether Elasticsearch should block until the
  66. // reindex is complete.
  67. func (s *ReindexService) WaitForCompletion(waitForCompletion bool) *ReindexService {
  68. s.waitForCompletion = &waitForCompletion
  69. return s
  70. }
  71. // Pretty indicates that the JSON response be indented and human readable.
  72. func (s *ReindexService) Pretty(pretty bool) *ReindexService {
  73. s.pretty = pretty
  74. return s
  75. }
  76. // Source specifies the source of the reindexing process.
  77. func (s *ReindexService) Source(source *ReindexSource) *ReindexService {
  78. s.source = source
  79. return s
  80. }
  81. // SourceIndex specifies the source index of the reindexing process.
  82. func (s *ReindexService) SourceIndex(index string) *ReindexService {
  83. if s.source == nil {
  84. s.source = NewReindexSource()
  85. }
  86. s.source = s.source.Index(index)
  87. return s
  88. }
  89. // Destination specifies the destination of the reindexing process.
  90. func (s *ReindexService) Destination(destination *ReindexDestination) *ReindexService {
  91. s.destination = destination
  92. return s
  93. }
  94. // DestinationIndex specifies the destination index of the reindexing process.
  95. func (s *ReindexService) DestinationIndex(index string) *ReindexService {
  96. if s.destination == nil {
  97. s.destination = NewReindexDestination()
  98. }
  99. s.destination = s.destination.Index(index)
  100. return s
  101. }
  102. // DestinationIndexAndType specifies both the destination index and type
  103. // of the reindexing process.
  104. func (s *ReindexService) DestinationIndexAndType(index, typ string) *ReindexService {
  105. if s.destination == nil {
  106. s.destination = NewReindexDestination()
  107. }
  108. s.destination = s.destination.Index(index)
  109. s.destination = s.destination.Type(typ)
  110. return s
  111. }
  112. // Conflicts indicates what to do when the process detects version conflicts.
  113. // Possible values are "proceed" and "abort".
  114. func (s *ReindexService) Conflicts(conflicts string) *ReindexService {
  115. s.conflicts = conflicts
  116. return s
  117. }
  118. // AbortOnVersionConflict aborts the request on version conflicts.
  119. // It is an alias to setting Conflicts("abort").
  120. func (s *ReindexService) AbortOnVersionConflict() *ReindexService {
  121. s.conflicts = "abort"
  122. return s
  123. }
  124. // ProceedOnVersionConflict aborts the request on version conflicts.
  125. // It is an alias to setting Conflicts("proceed").
  126. func (s *ReindexService) ProceedOnVersionConflict() *ReindexService {
  127. s.conflicts = "proceed"
  128. return s
  129. }
  130. // Size sets an upper limit for the number of processed documents.
  131. func (s *ReindexService) Size(size int) *ReindexService {
  132. s.size = &size
  133. return s
  134. }
  135. // Script allows for modification of the documents as they are reindexed
  136. // from source to destination.
  137. func (s *ReindexService) Script(script *Script) *ReindexService {
  138. s.script = script
  139. return s
  140. }
  141. // Body specifies the body of the request to send to Elasticsearch.
  142. // It overrides settings specified with other setters, e.g. Query.
  143. func (s *ReindexService) Body(body interface{}) *ReindexService {
  144. s.body = body
  145. return s
  146. }
  147. // buildURL builds the URL for the operation.
  148. func (s *ReindexService) buildURL() (string, url.Values, error) {
  149. // Build URL path
  150. path := "/_reindex"
  151. // Add query string parameters
  152. params := url.Values{}
  153. if s.pretty {
  154. params.Set("pretty", "1")
  155. }
  156. if s.refresh != "" {
  157. params.Set("refresh", s.refresh)
  158. }
  159. if s.timeout != "" {
  160. params.Set("timeout", s.timeout)
  161. }
  162. if s.requestsPerSecond != nil {
  163. params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond))
  164. }
  165. if s.slices != nil {
  166. params.Set("slices", fmt.Sprintf("%v", *s.slices))
  167. }
  168. if s.waitForActiveShards != "" {
  169. params.Set("wait_for_active_shards", s.waitForActiveShards)
  170. }
  171. if s.waitForCompletion != nil {
  172. params.Set("wait_for_completion", fmt.Sprintf("%v", *s.waitForCompletion))
  173. }
  174. return path, params, nil
  175. }
  176. // Validate checks if the operation is valid.
  177. func (s *ReindexService) Validate() error {
  178. var invalid []string
  179. if s.body != nil {
  180. return nil
  181. }
  182. if s.source == nil {
  183. invalid = append(invalid, "Source")
  184. } else {
  185. if len(s.source.indices) == 0 {
  186. invalid = append(invalid, "Source.Index")
  187. }
  188. }
  189. if s.destination == nil {
  190. invalid = append(invalid, "Destination")
  191. }
  192. if len(invalid) > 0 {
  193. return fmt.Errorf("missing required fields: %v", invalid)
  194. }
  195. return nil
  196. }
  197. // getBody returns the body part of the document request.
  198. func (s *ReindexService) getBody() (interface{}, error) {
  199. if s.body != nil {
  200. return s.body, nil
  201. }
  202. body := make(map[string]interface{})
  203. if s.conflicts != "" {
  204. body["conflicts"] = s.conflicts
  205. }
  206. if s.size != nil {
  207. body["size"] = *s.size
  208. }
  209. if s.script != nil {
  210. out, err := s.script.Source()
  211. if err != nil {
  212. return nil, err
  213. }
  214. body["script"] = out
  215. }
  216. src, err := s.source.Source()
  217. if err != nil {
  218. return nil, err
  219. }
  220. body["source"] = src
  221. dst, err := s.destination.Source()
  222. if err != nil {
  223. return nil, err
  224. }
  225. body["dest"] = dst
  226. return body, nil
  227. }
  228. // Do executes the operation.
  229. func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error) {
  230. // Check pre-conditions
  231. if err := s.Validate(); err != nil {
  232. return nil, err
  233. }
  234. // Get URL for request
  235. path, params, err := s.buildURL()
  236. if err != nil {
  237. return nil, err
  238. }
  239. // Setup HTTP request body
  240. body, err := s.getBody()
  241. if err != nil {
  242. return nil, err
  243. }
  244. // Get HTTP response
  245. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  246. if err != nil {
  247. return nil, err
  248. }
  249. // Return operation response
  250. ret := new(BulkIndexByScrollResponse)
  251. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  252. return nil, err
  253. }
  254. return ret, nil
  255. }
  256. // DoAsync executes the reindexing operation asynchronously by starting a new task.
  257. // Callers need to use the Task Management API to watch the outcome of the reindexing
  258. // operation.
  259. func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
  260. // Check pre-conditions
  261. if err := s.Validate(); err != nil {
  262. return nil, err
  263. }
  264. // DoAsync only makes sense with WaitForCompletion set to true
  265. if s.waitForCompletion != nil && *s.waitForCompletion {
  266. return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
  267. }
  268. f := false
  269. s.waitForCompletion = &f
  270. // Get URL for request
  271. path, params, err := s.buildURL()
  272. if err != nil {
  273. return nil, err
  274. }
  275. // Setup HTTP request body
  276. body, err := s.getBody()
  277. if err != nil {
  278. return nil, err
  279. }
  280. // Get HTTP response
  281. res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
  282. if err != nil {
  283. return nil, err
  284. }
  285. // Return operation response
  286. ret := new(StartTaskResult)
  287. if err := s.client.decoder.Decode(res.Body, ret); err != nil {
  288. return nil, err
  289. }
  290. return ret, nil
  291. }
  292. // -- Source of Reindex --
  293. // ReindexSource specifies the source of a Reindex process.
  294. type ReindexSource struct {
  295. searchType string
  296. indices []string
  297. types []string
  298. routing *string
  299. preference *string
  300. requestCache *bool
  301. scroll string
  302. query Query
  303. sorts []SortInfo
  304. sorters []Sorter
  305. searchSource *SearchSource
  306. remoteInfo *ReindexRemoteInfo
  307. }
  308. // NewReindexSource creates a new ReindexSource.
  309. func NewReindexSource() *ReindexSource {
  310. return &ReindexSource{}
  311. }
  312. // SearchType is the search operation type. Possible values are
  313. // "query_then_fetch" and "dfs_query_then_fetch".
  314. func (r *ReindexSource) SearchType(searchType string) *ReindexSource {
  315. r.searchType = searchType
  316. return r
  317. }
  318. func (r *ReindexSource) SearchTypeDfsQueryThenFetch() *ReindexSource {
  319. return r.SearchType("dfs_query_then_fetch")
  320. }
  321. func (r *ReindexSource) SearchTypeQueryThenFetch() *ReindexSource {
  322. return r.SearchType("query_then_fetch")
  323. }
  324. func (r *ReindexSource) Index(indices ...string) *ReindexSource {
  325. r.indices = append(r.indices, indices...)
  326. return r
  327. }
  328. func (r *ReindexSource) Type(types ...string) *ReindexSource {
  329. r.types = append(r.types, types...)
  330. return r
  331. }
  332. func (r *ReindexSource) Preference(preference string) *ReindexSource {
  333. r.preference = &preference
  334. return r
  335. }
  336. func (r *ReindexSource) RequestCache(requestCache bool) *ReindexSource {
  337. r.requestCache = &requestCache
  338. return r
  339. }
  340. func (r *ReindexSource) Scroll(scroll string) *ReindexSource {
  341. r.scroll = scroll
  342. return r
  343. }
  344. func (r *ReindexSource) Query(query Query) *ReindexSource {
  345. r.query = query
  346. return r
  347. }
  348. // Sort adds a sort order.
  349. func (s *ReindexSource) Sort(field string, ascending bool) *ReindexSource {
  350. s.sorts = append(s.sorts, SortInfo{Field: field, Ascending: ascending})
  351. return s
  352. }
  353. // SortWithInfo adds a sort order.
  354. func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
  355. s.sorts = append(s.sorts, info)
  356. return s
  357. }
  358. // SortBy adds a sort order.
  359. func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
  360. s.sorters = append(s.sorters, sorter...)
  361. return s
  362. }
  363. // RemoteInfo sets up reindexing from a remote cluster.
  364. func (s *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
  365. s.remoteInfo = ri
  366. return s
  367. }
  368. // Source returns a serializable JSON request for the request.
  369. func (r *ReindexSource) Source() (interface{}, error) {
  370. source := make(map[string]interface{})
  371. if r.query != nil {
  372. src, err := r.query.Source()
  373. if err != nil {
  374. return nil, err
  375. }
  376. source["query"] = src
  377. } else if r.searchSource != nil {
  378. src, err := r.searchSource.Source()
  379. if err != nil {
  380. return nil, err
  381. }
  382. source["source"] = src
  383. }
  384. if r.searchType != "" {
  385. source["search_type"] = r.searchType
  386. }
  387. switch len(r.indices) {
  388. case 0:
  389. case 1:
  390. source["index"] = r.indices[0]
  391. default:
  392. source["index"] = r.indices
  393. }
  394. switch len(r.types) {
  395. case 0:
  396. case 1:
  397. source["type"] = r.types[0]
  398. default:
  399. source["type"] = r.types
  400. }
  401. if r.preference != nil && *r.preference != "" {
  402. source["preference"] = *r.preference
  403. }
  404. if r.requestCache != nil {
  405. source["request_cache"] = fmt.Sprintf("%v", *r.requestCache)
  406. }
  407. if r.scroll != "" {
  408. source["scroll"] = r.scroll
  409. }
  410. if r.remoteInfo != nil {
  411. src, err := r.remoteInfo.Source()
  412. if err != nil {
  413. return nil, err
  414. }
  415. source["remote"] = src
  416. }
  417. if len(r.sorters) > 0 {
  418. var sortarr []interface{}
  419. for _, sorter := range r.sorters {
  420. src, err := sorter.Source()
  421. if err != nil {
  422. return nil, err
  423. }
  424. sortarr = append(sortarr, src)
  425. }
  426. source["sort"] = sortarr
  427. } else if len(r.sorts) > 0 {
  428. var sortarr []interface{}
  429. for _, sort := range r.sorts {
  430. src, err := sort.Source()
  431. if err != nil {
  432. return nil, err
  433. }
  434. sortarr = append(sortarr, src)
  435. }
  436. source["sort"] = sortarr
  437. }
  438. return source, nil
  439. }
  440. // ReindexRemoteInfo contains information for reindexing from a remote cluster.
  441. type ReindexRemoteInfo struct {
  442. host string
  443. username string
  444. password string
  445. socketTimeout string // e.g. "1m" or "30s"
  446. connectTimeout string // e.g. "1m" or "30s"
  447. }
  448. // NewReindexRemoteInfo creates a new ReindexRemoteInfo.
  449. func NewReindexRemoteInfo() *ReindexRemoteInfo {
  450. return &ReindexRemoteInfo{}
  451. }
  452. // Host sets the host information of the remote cluster.
  453. // It must be of the form "http(s)://<hostname>:<port>"
  454. func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo {
  455. ri.host = host
  456. return ri
  457. }
  458. // Username sets the username to authenticate with the remote cluster.
  459. func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo {
  460. ri.username = username
  461. return ri
  462. }
  463. // Password sets the password to authenticate with the remote cluster.
  464. func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo {
  465. ri.password = password
  466. return ri
  467. }
  468. // SocketTimeout sets the socket timeout to connect with the remote cluster.
  469. // Use ES compatible values like e.g. "30s" or "1m".
  470. func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo {
  471. ri.socketTimeout = timeout
  472. return ri
  473. }
  474. // ConnectTimeout sets the connection timeout to connect with the remote cluster.
  475. // Use ES compatible values like e.g. "30s" or "1m".
  476. func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo {
  477. ri.connectTimeout = timeout
  478. return ri
  479. }
  480. // Source returns the serializable JSON data for the request.
  481. func (ri *ReindexRemoteInfo) Source() (interface{}, error) {
  482. res := make(map[string]interface{})
  483. res["host"] = ri.host
  484. if len(ri.username) > 0 {
  485. res["username"] = ri.username
  486. }
  487. if len(ri.password) > 0 {
  488. res["password"] = ri.password
  489. }
  490. if len(ri.socketTimeout) > 0 {
  491. res["socket_timeout"] = ri.socketTimeout
  492. }
  493. if len(ri.connectTimeout) > 0 {
  494. res["connect_timeout"] = ri.connectTimeout
  495. }
  496. return res, nil
  497. }
  498. // -source Destination of Reindex --
  499. // ReindexDestination is the destination of a Reindex API call.
  500. // It is basically the meta data of a BulkIndexRequest.
  501. //
  502. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-reindex.html
  503. // fsourcer details.
  504. type ReindexDestination struct {
  505. index string
  506. typ string
  507. routing string
  508. parent string
  509. opType string
  510. version int64 // default is MATCH_ANY
  511. versionType string // default is "internal"
  512. }
  513. // NewReindexDestination returns a new ReindexDestination.
  514. func NewReindexDestination() *ReindexDestination {
  515. return &ReindexDestination{}
  516. }
  517. // Index specifies name of the Elasticsearch index to use as the destination
  518. // of a reindexing process.
  519. func (r *ReindexDestination) Index(index string) *ReindexDestination {
  520. r.index = index
  521. return r
  522. }
  523. // Type specifies the Elasticsearch type to use for reindexing.
  524. func (r *ReindexDestination) Type(typ string) *ReindexDestination {
  525. r.typ = typ
  526. return r
  527. }
  528. // Routing specifies a routing value for the reindexing request.
  529. // It can be "keep", "discard", or start with "=". The latter specifies
  530. // the routing on the bulk request.
  531. func (r *ReindexDestination) Routing(routing string) *ReindexDestination {
  532. r.routing = routing
  533. return r
  534. }
  535. // Keep sets the routing on the bulk request sent for each match to the routing
  536. // of the match (the default).
  537. func (r *ReindexDestination) Keep() *ReindexDestination {
  538. r.routing = "keep"
  539. return r
  540. }
  541. // Discard sets the routing on the bulk request sent for each match to null.
  542. func (r *ReindexDestination) Discard() *ReindexDestination {
  543. r.routing = "discard"
  544. return r
  545. }
  546. // Parent specifies the identifier of the parent document (if available).
  547. func (r *ReindexDestination) Parent(parent string) *ReindexDestination {
  548. r.parent = parent
  549. return r
  550. }
  551. // OpType specifies if this request should follow create-only or upsert
  552. // behavior. This follows the OpType of the standard document index API.
  553. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html#operation-type
  554. // for details.
  555. func (r *ReindexDestination) OpType(opType string) *ReindexDestination {
  556. r.opType = opType
  557. return r
  558. }
  559. // Version indicates the version of the document as part of an optimistic
  560. // concurrency model.
  561. func (r *ReindexDestination) Version(version int64) *ReindexDestination {
  562. r.version = version
  563. return r
  564. }
  565. // VersionType specifies how versions are created.
  566. func (r *ReindexDestination) VersionType(versionType string) *ReindexDestination {
  567. r.versionType = versionType
  568. return r
  569. }
  570. // Source returns a serializable JSON request for the request.
  571. func (r *ReindexDestination) Source() (interface{}, error) {
  572. source := make(map[string]interface{})
  573. if r.index != "" {
  574. source["index"] = r.index
  575. }
  576. if r.typ != "" {
  577. source["type"] = r.typ
  578. }
  579. if r.routing != "" {
  580. source["routing"] = r.routing
  581. }
  582. if r.opType != "" {
  583. source["op_type"] = r.opType
  584. }
  585. if r.parent != "" {
  586. source["parent"] = r.parent
  587. }
  588. if r.version > 0 {
  589. source["version"] = r.version
  590. }
  591. if r.versionType != "" {
  592. source["version_type"] = r.versionType
  593. }
  594. return source, nil
  595. }