delay.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. /*
  5. Package delay provides a way to execute code outside the scope of a
  6. user request by using the taskqueue API.
  7. To declare a function that may be executed later, call Func
  8. in a top-level assignment context, passing it an arbitrary string key
  9. and a function whose first argument is of type context.Context.
  10. The key is used to look up the function so it can be called later.
  11. var laterFunc = delay.Func("key", myFunc)
  12. It is also possible to use a function literal.
  13. var laterFunc = delay.Func("key", func(c context.Context, x string) {
  14. // ...
  15. })
  16. To call a function, invoke its Call method.
  17. laterFunc.Call(c, "something")
  18. A function may be called any number of times. If the function has any
  19. return arguments, and the last one is of type error, the function may
  20. return a non-nil error to signal that the function should be retried.
  21. The arguments to functions may be of any type that is encodable by the gob
  22. package. If an argument is of interface type, it is the client's responsibility
  23. to register with the gob package whatever concrete type may be passed for that
  24. argument; see http://golang.org/pkg/gob/#Register for details.
  25. Any errors during initialization or execution of a function will be
  26. logged to the application logs. Error logs that occur during initialization will
  27. be associated with the request that invoked the Call method.
  28. The state of a function invocation that has not yet successfully
  29. executed is preserved by combining the file name in which it is declared
  30. with the string key that was passed to the Func function. Updating an app
  31. with pending function invocations should safe as long as the relevant
  32. functions have the (filename, key) combination preserved. The filename is
  33. parsed according to these rules:
  34. * Paths in package main are shortened to just the file name (github.com/foo/foo.go -> foo.go)
  35. * Paths are stripped to just package paths (/go/src/github.com/foo/bar.go -> github.com/foo/bar.go)
  36. * Module versions are stripped (/go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go -> github.com/foo/bar/baz.go)
  37. There is some inherent risk of pending function invocations being lost during
  38. an update that contains large changes. For example, switching from using GOPATH
  39. to go.mod is a large change that may inadvertently cause file paths to change.
  40. The delay package uses the Task Queue API to create tasks that call the
  41. reserved application path "/_ah/queue/go/delay".
  42. This path must not be marked as "login: required" in app.yaml;
  43. it must be marked as "login: admin" or have no access restriction.
  44. */
  45. package delay // import "google.golang.org/appengine/delay"
  46. import (
  47. "bytes"
  48. stdctx "context"
  49. "encoding/gob"
  50. "errors"
  51. "fmt"
  52. "go/build"
  53. stdlog "log"
  54. "net/http"
  55. "path/filepath"
  56. "reflect"
  57. "regexp"
  58. "runtime"
  59. "strings"
  60. "golang.org/x/net/context"
  61. "google.golang.org/appengine"
  62. "google.golang.org/appengine/internal"
  63. "google.golang.org/appengine/log"
  64. "google.golang.org/appengine/taskqueue"
  65. )
  66. // Function represents a function that may have a delayed invocation.
  67. type Function struct {
  68. fv reflect.Value // Kind() == reflect.Func
  69. key string
  70. err error // any error during initialization
  71. }
  72. const (
  73. // The HTTP path for invocations.
  74. path = "/_ah/queue/go/delay"
  75. // Use the default queue.
  76. queue = ""
  77. )
  78. type contextKey int
  79. var (
  80. // registry of all delayed functions
  81. funcs = make(map[string]*Function)
  82. // precomputed types
  83. errorType = reflect.TypeOf((*error)(nil)).Elem()
  84. // errors
  85. errFirstArg = errors.New("first argument must be context.Context")
  86. errOutsideDelayFunc = errors.New("request headers are only available inside a delay.Func")
  87. // context keys
  88. headersContextKey contextKey = 0
  89. stdContextType = reflect.TypeOf((*stdctx.Context)(nil)).Elem()
  90. netContextType = reflect.TypeOf((*context.Context)(nil)).Elem()
  91. )
  92. func isContext(t reflect.Type) bool {
  93. return t == stdContextType || t == netContextType
  94. }
  95. var modVersionPat = regexp.MustCompile("@v[^/]+")
  96. // fileKey finds a stable representation of the caller's file path.
  97. // For calls from package main: strip all leading path entries, leaving just the filename.
  98. // For calls from anywhere else, strip $GOPATH/src, leaving just the package path and file path.
  99. func fileKey(file string) (string, error) {
  100. if !internal.IsSecondGen() || internal.MainPath == "" {
  101. return file, nil
  102. }
  103. // If the caller is in the same Dir as mainPath, then strip everything but the file name.
  104. if filepath.Dir(file) == internal.MainPath {
  105. return filepath.Base(file), nil
  106. }
  107. // If the path contains "_gopath/src/", which is what the builder uses for
  108. // apps which don't use go modules, strip everything up to and including src.
  109. // Or, if the path starts with /tmp/staging, then we're importing a package
  110. // from the app's module (and we must be using go modules), and we have a
  111. // path like /tmp/staging1234/srv/... so strip everything up to and
  112. // including the first /srv/.
  113. // And be sure to look at the GOPATH, for local development.
  114. s := string(filepath.Separator)
  115. for _, s := range []string{filepath.Join("_gopath", "src") + s, s + "srv" + s, filepath.Join(build.Default.GOPATH, "src") + s} {
  116. if idx := strings.Index(file, s); idx > 0 {
  117. return file[idx+len(s):], nil
  118. }
  119. }
  120. // Finally, if that all fails then we must be using go modules, and the file is a module,
  121. // so the path looks like /go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go
  122. // So... remove everything up to and including mod, plus the @.... version string.
  123. m := "/mod/"
  124. if idx := strings.Index(file, m); idx > 0 {
  125. file = file[idx+len(m):]
  126. } else {
  127. return file, fmt.Errorf("fileKey: unknown file path format for %q", file)
  128. }
  129. return modVersionPat.ReplaceAllString(file, ""), nil
  130. }
  131. // Func declares a new Function. The second argument must be a function with a
  132. // first argument of type context.Context.
  133. // This function must be called at program initialization time. That means it
  134. // must be called in a global variable declaration or from an init function.
  135. // This restriction is necessary because the instance that delays a function
  136. // call may not be the one that executes it. Only the code executed at program
  137. // initialization time is guaranteed to have been run by an instance before it
  138. // receives a request.
  139. func Func(key string, i interface{}) *Function {
  140. f := &Function{fv: reflect.ValueOf(i)}
  141. // Derive unique, somewhat stable key for this func.
  142. _, file, _, _ := runtime.Caller(1)
  143. fk, err := fileKey(file)
  144. if err != nil {
  145. // Not fatal, but log the error
  146. stdlog.Printf("delay: %v", err)
  147. }
  148. f.key = fk + ":" + key
  149. t := f.fv.Type()
  150. if t.Kind() != reflect.Func {
  151. f.err = errors.New("not a function")
  152. return f
  153. }
  154. if t.NumIn() == 0 || !isContext(t.In(0)) {
  155. f.err = errFirstArg
  156. return f
  157. }
  158. // Register the function's arguments with the gob package.
  159. // This is required because they are marshaled inside a []interface{}.
  160. // gob.Register only expects to be called during initialization;
  161. // that's fine because this function expects the same.
  162. for i := 0; i < t.NumIn(); i++ {
  163. // Only concrete types may be registered. If the argument has
  164. // interface type, the client is resposible for registering the
  165. // concrete types it will hold.
  166. if t.In(i).Kind() == reflect.Interface {
  167. continue
  168. }
  169. gob.Register(reflect.Zero(t.In(i)).Interface())
  170. }
  171. if old := funcs[f.key]; old != nil {
  172. old.err = fmt.Errorf("multiple functions registered for %s in %s", key, file)
  173. }
  174. funcs[f.key] = f
  175. return f
  176. }
  177. type invocation struct {
  178. Key string
  179. Args []interface{}
  180. }
  181. // Call invokes a delayed function.
  182. // err := f.Call(c, ...)
  183. // is equivalent to
  184. // t, _ := f.Task(...)
  185. // _, err := taskqueue.Add(c, t, "")
  186. func (f *Function) Call(c context.Context, args ...interface{}) error {
  187. t, err := f.Task(args...)
  188. if err != nil {
  189. return err
  190. }
  191. _, err = taskqueueAdder(c, t, queue)
  192. return err
  193. }
  194. // Task creates a Task that will invoke the function.
  195. // Its parameters may be tweaked before adding it to a queue.
  196. // Users should not modify the Path or Payload fields of the returned Task.
  197. func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) {
  198. if f.err != nil {
  199. return nil, fmt.Errorf("delay: func is invalid: %v", f.err)
  200. }
  201. nArgs := len(args) + 1 // +1 for the context.Context
  202. ft := f.fv.Type()
  203. minArgs := ft.NumIn()
  204. if ft.IsVariadic() {
  205. minArgs--
  206. }
  207. if nArgs < minArgs {
  208. return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs)
  209. }
  210. if !ft.IsVariadic() && nArgs > minArgs {
  211. return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs)
  212. }
  213. // Check arg types.
  214. for i := 1; i < nArgs; i++ {
  215. at := reflect.TypeOf(args[i-1])
  216. var dt reflect.Type
  217. if i < minArgs {
  218. // not a variadic arg
  219. dt = ft.In(i)
  220. } else {
  221. // a variadic arg
  222. dt = ft.In(minArgs).Elem()
  223. }
  224. // nil arguments won't have a type, so they need special handling.
  225. if at == nil {
  226. // nil interface
  227. switch dt.Kind() {
  228. case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
  229. continue // may be nil
  230. }
  231. return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt)
  232. }
  233. switch at.Kind() {
  234. case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
  235. av := reflect.ValueOf(args[i-1])
  236. if av.IsNil() {
  237. // nil value in interface; not supported by gob, so we replace it
  238. // with a nil interface value
  239. args[i-1] = nil
  240. }
  241. }
  242. if !at.AssignableTo(dt) {
  243. return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt)
  244. }
  245. }
  246. inv := invocation{
  247. Key: f.key,
  248. Args: args,
  249. }
  250. buf := new(bytes.Buffer)
  251. if err := gob.NewEncoder(buf).Encode(inv); err != nil {
  252. return nil, fmt.Errorf("delay: gob encoding failed: %v", err)
  253. }
  254. return &taskqueue.Task{
  255. Path: path,
  256. Payload: buf.Bytes(),
  257. }, nil
  258. }
  259. // Request returns the special task-queue HTTP request headers for the current
  260. // task queue handler. Returns an error if called from outside a delay.Func.
  261. func RequestHeaders(c context.Context) (*taskqueue.RequestHeaders, error) {
  262. if ret, ok := c.Value(headersContextKey).(*taskqueue.RequestHeaders); ok {
  263. return ret, nil
  264. }
  265. return nil, errOutsideDelayFunc
  266. }
  267. var taskqueueAdder = taskqueue.Add // for testing
  268. func init() {
  269. http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
  270. runFunc(appengine.NewContext(req), w, req)
  271. })
  272. }
  273. func runFunc(c context.Context, w http.ResponseWriter, req *http.Request) {
  274. defer req.Body.Close()
  275. c = context.WithValue(c, headersContextKey, taskqueue.ParseRequestHeaders(req.Header))
  276. var inv invocation
  277. if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil {
  278. log.Errorf(c, "delay: failed decoding task payload: %v", err)
  279. log.Warningf(c, "delay: dropping task")
  280. return
  281. }
  282. f := funcs[inv.Key]
  283. if f == nil {
  284. log.Errorf(c, "delay: no func with key %q found", inv.Key)
  285. log.Warningf(c, "delay: dropping task")
  286. return
  287. }
  288. ft := f.fv.Type()
  289. in := []reflect.Value{reflect.ValueOf(c)}
  290. for _, arg := range inv.Args {
  291. var v reflect.Value
  292. if arg != nil {
  293. v = reflect.ValueOf(arg)
  294. } else {
  295. // Task was passed a nil argument, so we must construct
  296. // the zero value for the argument here.
  297. n := len(in) // we're constructing the nth argument
  298. var at reflect.Type
  299. if !ft.IsVariadic() || n < ft.NumIn()-1 {
  300. at = ft.In(n)
  301. } else {
  302. at = ft.In(ft.NumIn() - 1).Elem()
  303. }
  304. v = reflect.Zero(at)
  305. }
  306. in = append(in, v)
  307. }
  308. out := f.fv.Call(in)
  309. if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType {
  310. if errv := out[n-1]; !errv.IsNil() {
  311. log.Errorf(c, "delay: func failed (will retry): %v", errv.Interface())
  312. w.WriteHeader(http.StatusInternalServerError)
  313. return
  314. }
  315. }
  316. }