|
- // Copyright 2011 Google Inc. All rights reserved.
- // Use of this source code is governed by the Apache 2.0
- // license that can be found in the LICENSE file.
- /*
- Package delay provides a way to execute code outside the scope of a
- user request by using the taskqueue API.
- To declare a function that may be executed later, call Func
- in a top-level assignment context, passing it an arbitrary string key
- and a function whose first argument is of type context.Context.
- The key is used to look up the function so it can be called later.
- var laterFunc = delay.Func("key", myFunc)
- It is also possible to use a function literal.
- var laterFunc = delay.Func("key", func(c context.Context, x string) {
- // ...
- })
- To call a function, invoke its Call method.
- laterFunc.Call(c, "something")
- A function may be called any number of times. If the function has any
- return arguments, and the last one is of type error, the function may
- return a non-nil error to signal that the function should be retried.
- The arguments to functions may be of any type that is encodable by the gob
- package. If an argument is of interface type, it is the client's responsibility
- to register with the gob package whatever concrete type may be passed for that
- argument; see http://golang.org/pkg/gob/#Register for details.
- Any errors during initialization or execution of a function will be
- logged to the application logs. Error logs that occur during initialization will
- be associated with the request that invoked the Call method.
- The state of a function invocation that has not yet successfully
- executed is preserved by combining the file name in which it is declared
- with the string key that was passed to the Func function. Updating an app
- with pending function invocations should safe as long as the relevant
- functions have the (filename, key) combination preserved. The filename is
- parsed according to these rules:
- * Paths in package main are shortened to just the file name (github.com/foo/foo.go -> foo.go)
- * Paths are stripped to just package paths (/go/src/github.com/foo/bar.go -> github.com/foo/bar.go)
- * Module versions are stripped (/go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go -> github.com/foo/bar/baz.go)
- There is some inherent risk of pending function invocations being lost during
- an update that contains large changes. For example, switching from using GOPATH
- to go.mod is a large change that may inadvertently cause file paths to change.
- The delay package uses the Task Queue API to create tasks that call the
- reserved application path "/_ah/queue/go/delay".
- This path must not be marked as "login: required" in app.yaml;
- it must be marked as "login: admin" or have no access restriction.
- */
- package delay // import "google.golang.org/appengine/delay"
- import (
- "bytes"
- stdctx "context"
- "encoding/gob"
- "errors"
- "fmt"
- "go/build"
- stdlog "log"
- "net/http"
- "path/filepath"
- "reflect"
- "regexp"
- "runtime"
- "strings"
- "golang.org/x/net/context"
- "google.golang.org/appengine"
- "google.golang.org/appengine/internal"
- "google.golang.org/appengine/log"
- "google.golang.org/appengine/taskqueue"
- )
- // Function represents a function that may have a delayed invocation.
- type Function struct {
- fv reflect.Value // Kind() == reflect.Func
- key string
- err error // any error during initialization
- }
- const (
- // The HTTP path for invocations.
- path = "/_ah/queue/go/delay"
- // Use the default queue.
- queue = ""
- )
- type contextKey int
- var (
- // registry of all delayed functions
- funcs = make(map[string]*Function)
- // precomputed types
- errorType = reflect.TypeOf((*error)(nil)).Elem()
- // errors
- errFirstArg = errors.New("first argument must be context.Context")
- errOutsideDelayFunc = errors.New("request headers are only available inside a delay.Func")
- // context keys
- headersContextKey contextKey = 0
- stdContextType = reflect.TypeOf((*stdctx.Context)(nil)).Elem()
- netContextType = reflect.TypeOf((*context.Context)(nil)).Elem()
- )
- func isContext(t reflect.Type) bool {
- return t == stdContextType || t == netContextType
- }
- var modVersionPat = regexp.MustCompile("@v[^/]+")
- // fileKey finds a stable representation of the caller's file path.
- // For calls from package main: strip all leading path entries, leaving just the filename.
- // For calls from anywhere else, strip $GOPATH/src, leaving just the package path and file path.
- func fileKey(file string) (string, error) {
- if !internal.IsSecondGen() || internal.MainPath == "" {
- return file, nil
- }
- // If the caller is in the same Dir as mainPath, then strip everything but the file name.
- if filepath.Dir(file) == internal.MainPath {
- return filepath.Base(file), nil
- }
- // If the path contains "_gopath/src/", which is what the builder uses for
- // apps which don't use go modules, strip everything up to and including src.
- // Or, if the path starts with /tmp/staging, then we're importing a package
- // from the app's module (and we must be using go modules), and we have a
- // path like /tmp/staging1234/srv/... so strip everything up to and
- // including the first /srv/.
- // And be sure to look at the GOPATH, for local development.
- s := string(filepath.Separator)
- for _, s := range []string{filepath.Join("_gopath", "src") + s, s + "srv" + s, filepath.Join(build.Default.GOPATH, "src") + s} {
- if idx := strings.Index(file, s); idx > 0 {
- return file[idx+len(s):], nil
- }
- }
- // Finally, if that all fails then we must be using go modules, and the file is a module,
- // so the path looks like /go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go
- // So... remove everything up to and including mod, plus the @.... version string.
- m := "/mod/"
- if idx := strings.Index(file, m); idx > 0 {
- file = file[idx+len(m):]
- } else {
- return file, fmt.Errorf("fileKey: unknown file path format for %q", file)
- }
- return modVersionPat.ReplaceAllString(file, ""), nil
- }
- // Func declares a new Function. The second argument must be a function with a
- // first argument of type context.Context.
- // This function must be called at program initialization time. That means it
- // must be called in a global variable declaration or from an init function.
- // This restriction is necessary because the instance that delays a function
- // call may not be the one that executes it. Only the code executed at program
- // initialization time is guaranteed to have been run by an instance before it
- // receives a request.
- func Func(key string, i interface{}) *Function {
- f := &Function{fv: reflect.ValueOf(i)}
- // Derive unique, somewhat stable key for this func.
- _, file, _, _ := runtime.Caller(1)
- fk, err := fileKey(file)
- if err != nil {
- // Not fatal, but log the error
- stdlog.Printf("delay: %v", err)
- }
- f.key = fk + ":" + key
- t := f.fv.Type()
- if t.Kind() != reflect.Func {
- f.err = errors.New("not a function")
- return f
- }
- if t.NumIn() == 0 || !isContext(t.In(0)) {
- f.err = errFirstArg
- return f
- }
- // Register the function's arguments with the gob package.
- // This is required because they are marshaled inside a []interface{}.
- // gob.Register only expects to be called during initialization;
- // that's fine because this function expects the same.
- for i := 0; i < t.NumIn(); i++ {
- // Only concrete types may be registered. If the argument has
- // interface type, the client is resposible for registering the
- // concrete types it will hold.
- if t.In(i).Kind() == reflect.Interface {
- continue
- }
- gob.Register(reflect.Zero(t.In(i)).Interface())
- }
- if old := funcs[f.key]; old != nil {
- old.err = fmt.Errorf("multiple functions registered for %s in %s", key, file)
- }
- funcs[f.key] = f
- return f
- }
- type invocation struct {
- Key string
- Args []interface{}
- }
- // Call invokes a delayed function.
- // err := f.Call(c, ...)
- // is equivalent to
- // t, _ := f.Task(...)
- // _, err := taskqueue.Add(c, t, "")
- func (f *Function) Call(c context.Context, args ...interface{}) error {
- t, err := f.Task(args...)
- if err != nil {
- return err
- }
- _, err = taskqueueAdder(c, t, queue)
- return err
- }
- // Task creates a Task that will invoke the function.
- // Its parameters may be tweaked before adding it to a queue.
- // Users should not modify the Path or Payload fields of the returned Task.
- func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) {
- if f.err != nil {
- return nil, fmt.Errorf("delay: func is invalid: %v", f.err)
- }
- nArgs := len(args) + 1 // +1 for the context.Context
- ft := f.fv.Type()
- minArgs := ft.NumIn()
- if ft.IsVariadic() {
- minArgs--
- }
- if nArgs < minArgs {
- return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs)
- }
- if !ft.IsVariadic() && nArgs > minArgs {
- return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs)
- }
- // Check arg types.
- for i := 1; i < nArgs; i++ {
- at := reflect.TypeOf(args[i-1])
- var dt reflect.Type
- if i < minArgs {
- // not a variadic arg
- dt = ft.In(i)
- } else {
- // a variadic arg
- dt = ft.In(minArgs).Elem()
- }
- // nil arguments won't have a type, so they need special handling.
- if at == nil {
- // nil interface
- switch dt.Kind() {
- case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
- continue // may be nil
- }
- return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt)
- }
- switch at.Kind() {
- case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
- av := reflect.ValueOf(args[i-1])
- if av.IsNil() {
- // nil value in interface; not supported by gob, so we replace it
- // with a nil interface value
- args[i-1] = nil
- }
- }
- if !at.AssignableTo(dt) {
- return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt)
- }
- }
- inv := invocation{
- Key: f.key,
- Args: args,
- }
- buf := new(bytes.Buffer)
- if err := gob.NewEncoder(buf).Encode(inv); err != nil {
- return nil, fmt.Errorf("delay: gob encoding failed: %v", err)
- }
- return &taskqueue.Task{
- Path: path,
- Payload: buf.Bytes(),
- }, nil
- }
- // Request returns the special task-queue HTTP request headers for the current
- // task queue handler. Returns an error if called from outside a delay.Func.
- func RequestHeaders(c context.Context) (*taskqueue.RequestHeaders, error) {
- if ret, ok := c.Value(headersContextKey).(*taskqueue.RequestHeaders); ok {
- return ret, nil
- }
- return nil, errOutsideDelayFunc
- }
- var taskqueueAdder = taskqueue.Add // for testing
- func init() {
- http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
- runFunc(appengine.NewContext(req), w, req)
- })
- }
- func runFunc(c context.Context, w http.ResponseWriter, req *http.Request) {
- defer req.Body.Close()
- c = context.WithValue(c, headersContextKey, taskqueue.ParseRequestHeaders(req.Header))
- var inv invocation
- if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil {
- log.Errorf(c, "delay: failed decoding task payload: %v", err)
- log.Warningf(c, "delay: dropping task")
- return
- }
- f := funcs[inv.Key]
- if f == nil {
- log.Errorf(c, "delay: no func with key %q found", inv.Key)
- log.Warningf(c, "delay: dropping task")
- return
- }
- ft := f.fv.Type()
- in := []reflect.Value{reflect.ValueOf(c)}
- for _, arg := range inv.Args {
- var v reflect.Value
- if arg != nil {
- v = reflect.ValueOf(arg)
- } else {
- // Task was passed a nil argument, so we must construct
- // the zero value for the argument here.
- n := len(in) // we're constructing the nth argument
- var at reflect.Type
- if !ft.IsVariadic() || n < ft.NumIn()-1 {
- at = ft.In(n)
- } else {
- at = ft.In(ft.NumIn() - 1).Elem()
- }
- v = reflect.Zero(at)
- }
- in = append(in, v)
- }
- out := f.fv.Call(in)
- if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType {
- if errv := out[n-1]; !errv.IsNil() {
- log.Errorf(c, "delay: func failed (will retry): %v", errv.Interface())
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- }
- }
|