api.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. // +build !appengine
  5. package internal
  6. import (
  7. "bytes"
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "log"
  12. "net"
  13. "net/http"
  14. "net/url"
  15. "os"
  16. "runtime"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/golang/protobuf/proto"
  23. netcontext "golang.org/x/net/context"
  24. basepb "google.golang.org/appengine/internal/base"
  25. logpb "google.golang.org/appengine/internal/log"
  26. remotepb "google.golang.org/appengine/internal/remote_api"
  27. )
  28. const (
  29. apiPath = "/rpc_http"
  30. defaultTicketSuffix = "/default.20150612t184001.0"
  31. )
  32. var (
  33. // Incoming headers.
  34. ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
  35. dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
  36. traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
  37. curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  38. userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
  39. remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
  40. // Outgoing headers.
  41. apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
  42. apiEndpointHeaderValue = []string{"app-engine-apis"}
  43. apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
  44. apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
  45. apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
  46. apiContentType = http.CanonicalHeaderKey("Content-Type")
  47. apiContentTypeValue = []string{"application/octet-stream"}
  48. logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
  49. apiHTTPClient = &http.Client{
  50. Transport: &http.Transport{
  51. Proxy: http.ProxyFromEnvironment,
  52. Dial: limitDial,
  53. },
  54. }
  55. defaultTicketOnce sync.Once
  56. defaultTicket string
  57. backgroundContextOnce sync.Once
  58. backgroundContext netcontext.Context
  59. )
  60. func apiURL() *url.URL {
  61. host, port := "appengine.googleapis.internal", "10001"
  62. if h := os.Getenv("API_HOST"); h != "" {
  63. host = h
  64. }
  65. if p := os.Getenv("API_PORT"); p != "" {
  66. port = p
  67. }
  68. return &url.URL{
  69. Scheme: "http",
  70. Host: host + ":" + port,
  71. Path: apiPath,
  72. }
  73. }
  74. func handleHTTP(w http.ResponseWriter, r *http.Request) {
  75. c := &context{
  76. req: r,
  77. outHeader: w.Header(),
  78. apiURL: apiURL(),
  79. }
  80. r = r.WithContext(withContext(r.Context(), c))
  81. c.req = r
  82. stopFlushing := make(chan int)
  83. // Patch up RemoteAddr so it looks reasonable.
  84. if addr := r.Header.Get(userIPHeader); addr != "" {
  85. r.RemoteAddr = addr
  86. } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
  87. r.RemoteAddr = addr
  88. } else {
  89. // Should not normally reach here, but pick a sensible default anyway.
  90. r.RemoteAddr = "127.0.0.1"
  91. }
  92. // The address in the headers will most likely be of these forms:
  93. // 123.123.123.123
  94. // 2001:db8::1
  95. // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
  96. if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
  97. // Assume the remote address is only a host; add a default port.
  98. r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
  99. }
  100. // Start goroutine responsible for flushing app logs.
  101. // This is done after adding c to ctx.m (and stopped before removing it)
  102. // because flushing logs requires making an API call.
  103. go c.logFlusher(stopFlushing)
  104. executeRequestSafely(c, r)
  105. c.outHeader = nil // make sure header changes aren't respected any more
  106. stopFlushing <- 1 // any logging beyond this point will be dropped
  107. // Flush any pending logs asynchronously.
  108. c.pendingLogs.Lock()
  109. flushes := c.pendingLogs.flushes
  110. if len(c.pendingLogs.lines) > 0 {
  111. flushes++
  112. }
  113. c.pendingLogs.Unlock()
  114. flushed := make(chan struct{})
  115. go func() {
  116. defer close(flushed)
  117. // Force a log flush, because with very short requests we
  118. // may not ever flush logs.
  119. c.flushLog(true)
  120. }()
  121. w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
  122. // Avoid nil Write call if c.Write is never called.
  123. if c.outCode != 0 {
  124. w.WriteHeader(c.outCode)
  125. }
  126. if c.outBody != nil {
  127. w.Write(c.outBody)
  128. }
  129. // Wait for the last flush to complete before returning,
  130. // otherwise the security ticket will not be valid.
  131. <-flushed
  132. }
  133. func executeRequestSafely(c *context, r *http.Request) {
  134. defer func() {
  135. if x := recover(); x != nil {
  136. logf(c, 4, "%s", renderPanic(x)) // 4 == critical
  137. c.outCode = 500
  138. }
  139. }()
  140. http.DefaultServeMux.ServeHTTP(c, r)
  141. }
  142. func renderPanic(x interface{}) string {
  143. buf := make([]byte, 16<<10) // 16 KB should be plenty
  144. buf = buf[:runtime.Stack(buf, false)]
  145. // Remove the first few stack frames:
  146. // this func
  147. // the recover closure in the caller
  148. // That will root the stack trace at the site of the panic.
  149. const (
  150. skipStart = "internal.renderPanic"
  151. skipFrames = 2
  152. )
  153. start := bytes.Index(buf, []byte(skipStart))
  154. p := start
  155. for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
  156. p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
  157. if p < 0 {
  158. break
  159. }
  160. }
  161. if p >= 0 {
  162. // buf[start:p+1] is the block to remove.
  163. // Copy buf[p+1:] over buf[start:] and shrink buf.
  164. copy(buf[start:], buf[p+1:])
  165. buf = buf[:len(buf)-(p+1-start)]
  166. }
  167. // Add panic heading.
  168. head := fmt.Sprintf("panic: %v\n\n", x)
  169. if len(head) > len(buf) {
  170. // Extremely unlikely to happen.
  171. return head
  172. }
  173. copy(buf[len(head):], buf)
  174. copy(buf, head)
  175. return string(buf)
  176. }
  177. // context represents the context of an in-flight HTTP request.
  178. // It implements the appengine.Context and http.ResponseWriter interfaces.
  179. type context struct {
  180. req *http.Request
  181. outCode int
  182. outHeader http.Header
  183. outBody []byte
  184. pendingLogs struct {
  185. sync.Mutex
  186. lines []*logpb.UserAppLogLine
  187. flushes int
  188. }
  189. apiURL *url.URL
  190. }
  191. var contextKey = "holds a *context"
  192. // jointContext joins two contexts in a superficial way.
  193. // It takes values and timeouts from a base context, and only values from another context.
  194. type jointContext struct {
  195. base netcontext.Context
  196. valuesOnly netcontext.Context
  197. }
  198. func (c jointContext) Deadline() (time.Time, bool) {
  199. return c.base.Deadline()
  200. }
  201. func (c jointContext) Done() <-chan struct{} {
  202. return c.base.Done()
  203. }
  204. func (c jointContext) Err() error {
  205. return c.base.Err()
  206. }
  207. func (c jointContext) Value(key interface{}) interface{} {
  208. if val := c.base.Value(key); val != nil {
  209. return val
  210. }
  211. return c.valuesOnly.Value(key)
  212. }
  213. // fromContext returns the App Engine context or nil if ctx is not
  214. // derived from an App Engine context.
  215. func fromContext(ctx netcontext.Context) *context {
  216. c, _ := ctx.Value(&contextKey).(*context)
  217. return c
  218. }
  219. func withContext(parent netcontext.Context, c *context) netcontext.Context {
  220. ctx := netcontext.WithValue(parent, &contextKey, c)
  221. if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
  222. ctx = withNamespace(ctx, ns)
  223. }
  224. return ctx
  225. }
  226. func toContext(c *context) netcontext.Context {
  227. return withContext(netcontext.Background(), c)
  228. }
  229. func IncomingHeaders(ctx netcontext.Context) http.Header {
  230. if c := fromContext(ctx); c != nil {
  231. return c.req.Header
  232. }
  233. return nil
  234. }
  235. func ReqContext(req *http.Request) netcontext.Context {
  236. return req.Context()
  237. }
  238. func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
  239. return jointContext{
  240. base: parent,
  241. valuesOnly: req.Context(),
  242. }
  243. }
  244. // DefaultTicket returns a ticket used for background context or dev_appserver.
  245. func DefaultTicket() string {
  246. defaultTicketOnce.Do(func() {
  247. if IsDevAppServer() {
  248. defaultTicket = "testapp" + defaultTicketSuffix
  249. return
  250. }
  251. appID := partitionlessAppID()
  252. escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
  253. majVersion := VersionID(nil)
  254. if i := strings.Index(majVersion, "."); i > 0 {
  255. majVersion = majVersion[:i]
  256. }
  257. defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
  258. })
  259. return defaultTicket
  260. }
  261. func BackgroundContext() netcontext.Context {
  262. backgroundContextOnce.Do(func() {
  263. // Compute background security ticket.
  264. ticket := DefaultTicket()
  265. c := &context{
  266. req: &http.Request{
  267. Header: http.Header{
  268. ticketHeader: []string{ticket},
  269. },
  270. },
  271. apiURL: apiURL(),
  272. }
  273. backgroundContext = toContext(c)
  274. // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
  275. go c.logFlusher(make(chan int))
  276. })
  277. return backgroundContext
  278. }
  279. // RegisterTestRequest registers the HTTP request req for testing, such that
  280. // any API calls are sent to the provided URL. It returns a closure to delete
  281. // the registration.
  282. // It should only be used by aetest package.
  283. func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
  284. c := &context{
  285. req: req,
  286. apiURL: apiURL,
  287. }
  288. ctx := withContext(decorate(req.Context()), c)
  289. req = req.WithContext(ctx)
  290. c.req = req
  291. return req, func() {}
  292. }
  293. var errTimeout = &CallError{
  294. Detail: "Deadline exceeded",
  295. Code: int32(remotepb.RpcError_CANCELLED),
  296. Timeout: true,
  297. }
  298. func (c *context) Header() http.Header { return c.outHeader }
  299. // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
  300. // codes do not permit a response body (nor response entity headers such as
  301. // Content-Length, Content-Type, etc).
  302. func bodyAllowedForStatus(status int) bool {
  303. switch {
  304. case status >= 100 && status <= 199:
  305. return false
  306. case status == 204:
  307. return false
  308. case status == 304:
  309. return false
  310. }
  311. return true
  312. }
  313. func (c *context) Write(b []byte) (int, error) {
  314. if c.outCode == 0 {
  315. c.WriteHeader(http.StatusOK)
  316. }
  317. if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
  318. return 0, http.ErrBodyNotAllowed
  319. }
  320. c.outBody = append(c.outBody, b...)
  321. return len(b), nil
  322. }
  323. func (c *context) WriteHeader(code int) {
  324. if c.outCode != 0 {
  325. logf(c, 3, "WriteHeader called multiple times on request.") // error level
  326. return
  327. }
  328. c.outCode = code
  329. }
  330. func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
  331. hreq := &http.Request{
  332. Method: "POST",
  333. URL: c.apiURL,
  334. Header: http.Header{
  335. apiEndpointHeader: apiEndpointHeaderValue,
  336. apiMethodHeader: apiMethodHeaderValue,
  337. apiContentType: apiContentTypeValue,
  338. apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
  339. },
  340. Body: ioutil.NopCloser(bytes.NewReader(body)),
  341. ContentLength: int64(len(body)),
  342. Host: c.apiURL.Host,
  343. }
  344. if info := c.req.Header.Get(dapperHeader); info != "" {
  345. hreq.Header.Set(dapperHeader, info)
  346. }
  347. if info := c.req.Header.Get(traceHeader); info != "" {
  348. hreq.Header.Set(traceHeader, info)
  349. }
  350. tr := apiHTTPClient.Transport.(*http.Transport)
  351. var timedOut int32 // atomic; set to 1 if timed out
  352. t := time.AfterFunc(timeout, func() {
  353. atomic.StoreInt32(&timedOut, 1)
  354. tr.CancelRequest(hreq)
  355. })
  356. defer t.Stop()
  357. defer func() {
  358. // Check if timeout was exceeded.
  359. if atomic.LoadInt32(&timedOut) != 0 {
  360. err = errTimeout
  361. }
  362. }()
  363. hresp, err := apiHTTPClient.Do(hreq)
  364. if err != nil {
  365. return nil, &CallError{
  366. Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
  367. Code: int32(remotepb.RpcError_UNKNOWN),
  368. }
  369. }
  370. defer hresp.Body.Close()
  371. hrespBody, err := ioutil.ReadAll(hresp.Body)
  372. if hresp.StatusCode != 200 {
  373. return nil, &CallError{
  374. Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
  375. Code: int32(remotepb.RpcError_UNKNOWN),
  376. }
  377. }
  378. if err != nil {
  379. return nil, &CallError{
  380. Detail: fmt.Sprintf("service bridge response bad: %v", err),
  381. Code: int32(remotepb.RpcError_UNKNOWN),
  382. }
  383. }
  384. return hrespBody, nil
  385. }
  386. func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
  387. if ns := NamespaceFromContext(ctx); ns != "" {
  388. if fn, ok := NamespaceMods[service]; ok {
  389. fn(in, ns)
  390. }
  391. }
  392. if f, ctx, ok := callOverrideFromContext(ctx); ok {
  393. return f(ctx, service, method, in, out)
  394. }
  395. // Handle already-done contexts quickly.
  396. select {
  397. case <-ctx.Done():
  398. return ctx.Err()
  399. default:
  400. }
  401. c := fromContext(ctx)
  402. if c == nil {
  403. // Give a good error message rather than a panic lower down.
  404. return errNotAppEngineContext
  405. }
  406. // Apply transaction modifications if we're in a transaction.
  407. if t := transactionFromContext(ctx); t != nil {
  408. if t.finished {
  409. return errors.New("transaction context has expired")
  410. }
  411. applyTransaction(in, &t.transaction)
  412. }
  413. // Default RPC timeout is 60s.
  414. timeout := 60 * time.Second
  415. if deadline, ok := ctx.Deadline(); ok {
  416. timeout = deadline.Sub(time.Now())
  417. }
  418. data, err := proto.Marshal(in)
  419. if err != nil {
  420. return err
  421. }
  422. ticket := c.req.Header.Get(ticketHeader)
  423. // Use a test ticket under test environment.
  424. if ticket == "" {
  425. if appid := ctx.Value(&appIDOverrideKey); appid != nil {
  426. ticket = appid.(string) + defaultTicketSuffix
  427. }
  428. }
  429. // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
  430. if ticket == "" {
  431. ticket = DefaultTicket()
  432. }
  433. req := &remotepb.Request{
  434. ServiceName: &service,
  435. Method: &method,
  436. Request: data,
  437. RequestId: &ticket,
  438. }
  439. hreqBody, err := proto.Marshal(req)
  440. if err != nil {
  441. return err
  442. }
  443. hrespBody, err := c.post(hreqBody, timeout)
  444. if err != nil {
  445. return err
  446. }
  447. res := &remotepb.Response{}
  448. if err := proto.Unmarshal(hrespBody, res); err != nil {
  449. return err
  450. }
  451. if res.RpcError != nil {
  452. ce := &CallError{
  453. Detail: res.RpcError.GetDetail(),
  454. Code: *res.RpcError.Code,
  455. }
  456. switch remotepb.RpcError_ErrorCode(ce.Code) {
  457. case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
  458. ce.Timeout = true
  459. }
  460. return ce
  461. }
  462. if res.ApplicationError != nil {
  463. return &APIError{
  464. Service: *req.ServiceName,
  465. Detail: res.ApplicationError.GetDetail(),
  466. Code: *res.ApplicationError.Code,
  467. }
  468. }
  469. if res.Exception != nil || res.JavaException != nil {
  470. // This shouldn't happen, but let's be defensive.
  471. return &CallError{
  472. Detail: "service bridge returned exception",
  473. Code: int32(remotepb.RpcError_UNKNOWN),
  474. }
  475. }
  476. return proto.Unmarshal(res.Response, out)
  477. }
  478. func (c *context) Request() *http.Request {
  479. return c.req
  480. }
  481. func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
  482. // Truncate long log lines.
  483. // TODO(dsymonds): Check if this is still necessary.
  484. const lim = 8 << 10
  485. if len(*ll.Message) > lim {
  486. suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
  487. ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
  488. }
  489. c.pendingLogs.Lock()
  490. c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
  491. c.pendingLogs.Unlock()
  492. }
  493. var logLevelName = map[int64]string{
  494. 0: "DEBUG",
  495. 1: "INFO",
  496. 2: "WARNING",
  497. 3: "ERROR",
  498. 4: "CRITICAL",
  499. }
  500. func logf(c *context, level int64, format string, args ...interface{}) {
  501. if c == nil {
  502. panic("not an App Engine context")
  503. }
  504. s := fmt.Sprintf(format, args...)
  505. s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
  506. c.addLogLine(&logpb.UserAppLogLine{
  507. TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
  508. Level: &level,
  509. Message: &s,
  510. })
  511. // Only duplicate log to stderr if not running on App Engine second generation
  512. if !IsSecondGen() {
  513. log.Print(logLevelName[level] + ": " + s)
  514. }
  515. }
  516. // flushLog attempts to flush any pending logs to the appserver.
  517. // It should not be called concurrently.
  518. func (c *context) flushLog(force bool) (flushed bool) {
  519. c.pendingLogs.Lock()
  520. // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
  521. n, rem := 0, 30<<20
  522. for ; n < len(c.pendingLogs.lines); n++ {
  523. ll := c.pendingLogs.lines[n]
  524. // Each log line will require about 3 bytes of overhead.
  525. nb := proto.Size(ll) + 3
  526. if nb > rem {
  527. break
  528. }
  529. rem -= nb
  530. }
  531. lines := c.pendingLogs.lines[:n]
  532. c.pendingLogs.lines = c.pendingLogs.lines[n:]
  533. c.pendingLogs.Unlock()
  534. if len(lines) == 0 && !force {
  535. // Nothing to flush.
  536. return false
  537. }
  538. rescueLogs := false
  539. defer func() {
  540. if rescueLogs {
  541. c.pendingLogs.Lock()
  542. c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
  543. c.pendingLogs.Unlock()
  544. }
  545. }()
  546. buf, err := proto.Marshal(&logpb.UserAppLogGroup{
  547. LogLine: lines,
  548. })
  549. if err != nil {
  550. log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
  551. rescueLogs = true
  552. return false
  553. }
  554. req := &logpb.FlushRequest{
  555. Logs: buf,
  556. }
  557. res := &basepb.VoidProto{}
  558. c.pendingLogs.Lock()
  559. c.pendingLogs.flushes++
  560. c.pendingLogs.Unlock()
  561. if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
  562. log.Printf("internal.flushLog: Flush RPC: %v", err)
  563. rescueLogs = true
  564. return false
  565. }
  566. return true
  567. }
  568. const (
  569. // Log flushing parameters.
  570. flushInterval = 1 * time.Second
  571. forceFlushInterval = 60 * time.Second
  572. )
  573. func (c *context) logFlusher(stop <-chan int) {
  574. lastFlush := time.Now()
  575. tick := time.NewTicker(flushInterval)
  576. for {
  577. select {
  578. case <-stop:
  579. // Request finished.
  580. tick.Stop()
  581. return
  582. case <-tick.C:
  583. force := time.Now().Sub(lastFlush) > forceFlushInterval
  584. if c.flushLog(force) {
  585. lastFlush = time.Now()
  586. }
  587. }
  588. }
  589. }
  590. func ContextForTesting(req *http.Request) netcontext.Context {
  591. return toContext(&context{req: req})
  592. }