channelz_test.go 70 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package test
  19. import (
  20. "context"
  21. "crypto/tls"
  22. "fmt"
  23. "net"
  24. "reflect"
  25. "strings"
  26. "sync"
  27. "testing"
  28. "time"
  29. "golang.org/x/net/http2"
  30. "google.golang.org/grpc"
  31. _ "google.golang.org/grpc/balancer/grpclb"
  32. "google.golang.org/grpc/balancer/roundrobin"
  33. "google.golang.org/grpc/codes"
  34. "google.golang.org/grpc/connectivity"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/internal"
  37. "google.golang.org/grpc/internal/channelz"
  38. "google.golang.org/grpc/keepalive"
  39. "google.golang.org/grpc/resolver"
  40. "google.golang.org/grpc/resolver/manual"
  41. "google.golang.org/grpc/status"
  42. testpb "google.golang.org/grpc/test/grpc_testing"
  43. "google.golang.org/grpc/testdata"
  44. )
  45. func (te *test) startServers(ts testpb.TestServiceServer, num int) {
  46. for i := 0; i < num; i++ {
  47. te.startServer(ts)
  48. te.srvs = append(te.srvs, te.srv.(*grpc.Server))
  49. te.srvAddrs = append(te.srvAddrs, te.srvAddr)
  50. te.srv = nil
  51. te.srvAddr = ""
  52. }
  53. }
  54. func verifyResultWithDelay(f func() (bool, error)) error {
  55. var ok bool
  56. var err error
  57. for i := 0; i < 1000; i++ {
  58. if ok, err = f(); ok {
  59. return nil
  60. }
  61. time.Sleep(10 * time.Millisecond)
  62. }
  63. return err
  64. }
  65. func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
  66. testcases := []struct {
  67. total int
  68. start int64
  69. max int64
  70. length int64
  71. end bool
  72. }{
  73. {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
  74. {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
  75. {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
  76. {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
  77. {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
  78. {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
  79. }
  80. for _, c := range testcases {
  81. channelz.NewChannelzStorage()
  82. e := tcpClearRREnv
  83. te := newTest(t, e)
  84. te.startServers(&testServer{security: e.security}, c.total)
  85. ss, end := channelz.GetServers(c.start, c.max)
  86. if int64(len(ss)) != c.length || end != c.end {
  87. t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
  88. }
  89. te.tearDown()
  90. ss, end = channelz.GetServers(c.start, c.max)
  91. if len(ss) != 0 || !end {
  92. t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
  93. }
  94. }
  95. }
  96. func (s) TestCZGetServer(t *testing.T) {
  97. channelz.NewChannelzStorage()
  98. e := tcpClearRREnv
  99. te := newTest(t, e)
  100. te.startServer(&testServer{security: e.security})
  101. defer te.tearDown()
  102. ss, _ := channelz.GetServers(0, 0)
  103. if len(ss) != 1 {
  104. t.Fatalf("there should only be one server, not %d", len(ss))
  105. }
  106. serverID := ss[0].ID
  107. srv := channelz.GetServer(serverID)
  108. if srv == nil {
  109. t.Fatalf("server %d does not exist", serverID)
  110. }
  111. if srv.ID != serverID {
  112. t.Fatalf("server want id %d, but got %d", serverID, srv.ID)
  113. }
  114. te.tearDown()
  115. if err := verifyResultWithDelay(func() (bool, error) {
  116. srv := channelz.GetServer(serverID)
  117. if srv != nil {
  118. return false, fmt.Errorf("server %d should not exist", serverID)
  119. }
  120. return true, nil
  121. }); err != nil {
  122. t.Fatal(err)
  123. }
  124. }
  125. func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
  126. testcases := []struct {
  127. total int
  128. start int64
  129. max int64
  130. length int64
  131. end bool
  132. }{
  133. {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
  134. {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
  135. {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
  136. {total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
  137. {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
  138. {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
  139. }
  140. for _, c := range testcases {
  141. channelz.NewChannelzStorage()
  142. e := tcpClearRREnv
  143. te := newTest(t, e)
  144. var ccs []*grpc.ClientConn
  145. for i := 0; i < c.total; i++ {
  146. cc := te.clientConn()
  147. te.cc = nil
  148. // avoid making next dial blocking
  149. te.srvAddr = ""
  150. ccs = append(ccs, cc)
  151. }
  152. if err := verifyResultWithDelay(func() (bool, error) {
  153. if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end {
  154. return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
  155. }
  156. return true, nil
  157. }); err != nil {
  158. t.Fatal(err)
  159. }
  160. for _, cc := range ccs {
  161. cc.Close()
  162. }
  163. if err := verifyResultWithDelay(func() (bool, error) {
  164. if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
  165. return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
  166. }
  167. return true, nil
  168. }); err != nil {
  169. t.Fatal(err)
  170. }
  171. te.tearDown()
  172. }
  173. }
  174. func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) {
  175. channelz.NewChannelzStorage()
  176. // Make dial fails (due to no transport security specified)
  177. _, err := grpc.Dial("fake.addr")
  178. if err == nil {
  179. t.Fatal("expecting dial to fail")
  180. }
  181. if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end {
  182. t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end)
  183. }
  184. }
  185. func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
  186. channelz.NewChannelzStorage()
  187. e := tcpClearRREnv
  188. // avoid calling API to set balancer type, which will void service config's change of balancer.
  189. e.balancer = ""
  190. te := newTest(t, e)
  191. r, cleanup := manual.GenerateAndRegisterManualResolver()
  192. defer cleanup()
  193. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  194. r.InitialState(resolver.State{Addresses: resolvedAddrs})
  195. te.resolverScheme = r.Scheme()
  196. te.clientConn()
  197. defer te.tearDown()
  198. if err := verifyResultWithDelay(func() (bool, error) {
  199. tcs, _ := channelz.GetTopChannels(0, 0)
  200. if len(tcs) != 1 {
  201. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  202. }
  203. if len(tcs[0].NestedChans) != 1 {
  204. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  205. }
  206. return true, nil
  207. }); err != nil {
  208. t.Fatal(err)
  209. }
  210. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
  211. // wait for the shutdown of grpclb balancer
  212. if err := verifyResultWithDelay(func() (bool, error) {
  213. tcs, _ := channelz.GetTopChannels(0, 0)
  214. if len(tcs) != 1 {
  215. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  216. }
  217. if len(tcs[0].NestedChans) != 0 {
  218. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  219. }
  220. return true, nil
  221. }); err != nil {
  222. t.Fatal(err)
  223. }
  224. }
  225. func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
  226. channelz.NewChannelzStorage()
  227. e := tcpClearRREnv
  228. num := 3 // number of backends
  229. te := newTest(t, e)
  230. var svrAddrs []resolver.Address
  231. te.startServers(&testServer{security: e.security}, num)
  232. r, cleanup := manual.GenerateAndRegisterManualResolver()
  233. defer cleanup()
  234. for _, a := range te.srvAddrs {
  235. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  236. }
  237. r.InitialState(resolver.State{Addresses: svrAddrs})
  238. te.resolverScheme = r.Scheme()
  239. te.clientConn()
  240. defer te.tearDown()
  241. // Here, we just wait for all sockets to be up. In the future, if we implement
  242. // IDLE, we may need to make several rpc calls to create the sockets.
  243. if err := verifyResultWithDelay(func() (bool, error) {
  244. tcs, _ := channelz.GetTopChannels(0, 0)
  245. if len(tcs) != 1 {
  246. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  247. }
  248. if len(tcs[0].SubChans) != num {
  249. return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
  250. }
  251. count := 0
  252. for k := range tcs[0].SubChans {
  253. sc := channelz.GetSubChannel(k)
  254. if sc == nil {
  255. return false, fmt.Errorf("got <nil> subchannel")
  256. }
  257. count += len(sc.Sockets)
  258. }
  259. if count != num {
  260. return false, fmt.Errorf("there should be %d sockets not %d", num, count)
  261. }
  262. return true, nil
  263. }); err != nil {
  264. t.Fatal(err)
  265. }
  266. r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
  267. if err := verifyResultWithDelay(func() (bool, error) {
  268. tcs, _ := channelz.GetTopChannels(0, 0)
  269. if len(tcs) != 1 {
  270. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  271. }
  272. if len(tcs[0].SubChans) != num-1 {
  273. return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
  274. }
  275. count := 0
  276. for k := range tcs[0].SubChans {
  277. sc := channelz.GetSubChannel(k)
  278. if sc == nil {
  279. return false, fmt.Errorf("got <nil> subchannel")
  280. }
  281. count += len(sc.Sockets)
  282. }
  283. if count != num-1 {
  284. return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
  285. }
  286. return true, nil
  287. }); err != nil {
  288. t.Fatal(err)
  289. }
  290. }
  291. func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
  292. testcases := []struct {
  293. total int
  294. start int64
  295. max int64
  296. length int64
  297. end bool
  298. }{
  299. {total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
  300. {total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
  301. {total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
  302. {total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true},
  303. {total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true},
  304. {total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
  305. {total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
  306. }
  307. for _, c := range testcases {
  308. channelz.NewChannelzStorage()
  309. e := tcpClearRREnv
  310. te := newTest(t, e)
  311. te.startServer(&testServer{security: e.security})
  312. defer te.tearDown()
  313. var ccs []*grpc.ClientConn
  314. for i := 0; i < c.total; i++ {
  315. cc := te.clientConn()
  316. te.cc = nil
  317. ccs = append(ccs, cc)
  318. }
  319. var svrID int64
  320. if err := verifyResultWithDelay(func() (bool, error) {
  321. ss, _ := channelz.GetServers(0, 0)
  322. if len(ss) != 1 {
  323. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  324. }
  325. if len(ss[0].ListenSockets) != 1 {
  326. return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
  327. }
  328. startID := c.start
  329. if startID != 0 {
  330. ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total))
  331. if int64(len(ns)) < c.start {
  332. return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
  333. }
  334. startID = ns[c.start-1].ID + 1
  335. }
  336. ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
  337. if int64(len(ns)) != c.length || end != c.end {
  338. return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
  339. }
  340. svrID = ss[0].ID
  341. return true, nil
  342. }); err != nil {
  343. t.Fatal(err)
  344. }
  345. for _, cc := range ccs {
  346. cc.Close()
  347. }
  348. if err := verifyResultWithDelay(func() (bool, error) {
  349. ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
  350. if len(ns) != 0 {
  351. return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
  352. }
  353. return true, nil
  354. }); err != nil {
  355. t.Fatal(err)
  356. }
  357. }
  358. }
  359. func (s) TestCZServerListenSocketDeletion(t *testing.T) {
  360. channelz.NewChannelzStorage()
  361. s := grpc.NewServer()
  362. lis, err := net.Listen("tcp", "localhost:0")
  363. if err != nil {
  364. t.Fatalf("failed to listen: %v", err)
  365. }
  366. go s.Serve(lis)
  367. if err := verifyResultWithDelay(func() (bool, error) {
  368. ss, _ := channelz.GetServers(0, 0)
  369. if len(ss) != 1 {
  370. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  371. }
  372. if len(ss[0].ListenSockets) != 1 {
  373. return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
  374. }
  375. return true, nil
  376. }); err != nil {
  377. t.Fatal(err)
  378. }
  379. lis.Close()
  380. if err := verifyResultWithDelay(func() (bool, error) {
  381. ss, _ := channelz.GetServers(0, 0)
  382. if len(ss) != 1 {
  383. return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
  384. }
  385. if len(ss[0].ListenSockets) != 0 {
  386. return false, fmt.Errorf("there should only be %d server listen socket, not %d", 0, len(ss[0].ListenSockets))
  387. }
  388. return true, nil
  389. }); err != nil {
  390. t.Fatal(err)
  391. }
  392. s.Stop()
  393. }
  394. type dummyChannel struct{}
  395. func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
  396. return &channelz.ChannelInternalMetric{}
  397. }
  398. type dummySocket struct{}
  399. func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
  400. return &channelz.SocketInternalMetric{}
  401. }
  402. func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
  403. // +--+TopChan+---+
  404. // | |
  405. // v v
  406. // +-+SubChan1+--+ SubChan2
  407. // | |
  408. // v v
  409. // Socket1 Socket2
  410. channelz.NewChannelzStorage()
  411. topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
  412. subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
  413. subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
  414. sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
  415. sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
  416. tcs, _ := channelz.GetTopChannels(0, 0)
  417. if tcs == nil || len(tcs) != 1 {
  418. t.Fatalf("There should be one TopChannel entry")
  419. }
  420. if len(tcs[0].SubChans) != 2 {
  421. t.Fatalf("There should be two SubChannel entries")
  422. }
  423. sc := channelz.GetSubChannel(subChanID1)
  424. if sc == nil || len(sc.Sockets) != 2 {
  425. t.Fatalf("There should be two Socket entries")
  426. }
  427. channelz.RemoveEntry(topChanID)
  428. tcs, _ = channelz.GetTopChannels(0, 0)
  429. if tcs == nil || len(tcs) != 1 {
  430. t.Fatalf("There should be one TopChannel entry")
  431. }
  432. channelz.RemoveEntry(subChanID1)
  433. channelz.RemoveEntry(subChanID2)
  434. tcs, _ = channelz.GetTopChannels(0, 0)
  435. if tcs == nil || len(tcs) != 1 {
  436. t.Fatalf("There should be one TopChannel entry")
  437. }
  438. if len(tcs[0].SubChans) != 1 {
  439. t.Fatalf("There should be one SubChannel entry")
  440. }
  441. channelz.RemoveEntry(sktID1)
  442. channelz.RemoveEntry(sktID2)
  443. tcs, _ = channelz.GetTopChannels(0, 0)
  444. if tcs != nil {
  445. t.Fatalf("There should be no TopChannel entry")
  446. }
  447. }
  448. func (s) TestCZChannelMetrics(t *testing.T) {
  449. channelz.NewChannelzStorage()
  450. e := tcpClearRREnv
  451. num := 3 // number of backends
  452. te := newTest(t, e)
  453. te.maxClientSendMsgSize = newInt(8)
  454. var svrAddrs []resolver.Address
  455. te.startServers(&testServer{security: e.security}, num)
  456. r, cleanup := manual.GenerateAndRegisterManualResolver()
  457. defer cleanup()
  458. for _, a := range te.srvAddrs {
  459. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  460. }
  461. r.InitialState(resolver.State{Addresses: svrAddrs})
  462. te.resolverScheme = r.Scheme()
  463. cc := te.clientConn()
  464. defer te.tearDown()
  465. tc := testpb.NewTestServiceClient(cc)
  466. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  467. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  468. }
  469. const smallSize = 1
  470. const largeSize = 8
  471. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  472. if err != nil {
  473. t.Fatal(err)
  474. }
  475. req := &testpb.SimpleRequest{
  476. ResponseType: testpb.PayloadType_COMPRESSABLE,
  477. ResponseSize: int32(smallSize),
  478. Payload: largePayload,
  479. }
  480. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  481. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  482. }
  483. stream, err := tc.FullDuplexCall(context.Background())
  484. if err != nil {
  485. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  486. }
  487. defer stream.CloseSend()
  488. // Here, we just wait for all sockets to be up. In the future, if we implement
  489. // IDLE, we may need to make several rpc calls to create the sockets.
  490. if err := verifyResultWithDelay(func() (bool, error) {
  491. tcs, _ := channelz.GetTopChannels(0, 0)
  492. if len(tcs) != 1 {
  493. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  494. }
  495. if len(tcs[0].SubChans) != num {
  496. return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans))
  497. }
  498. var cst, csu, cf int64
  499. for k := range tcs[0].SubChans {
  500. sc := channelz.GetSubChannel(k)
  501. if sc == nil {
  502. return false, fmt.Errorf("got <nil> subchannel")
  503. }
  504. cst += sc.ChannelData.CallsStarted
  505. csu += sc.ChannelData.CallsSucceeded
  506. cf += sc.ChannelData.CallsFailed
  507. }
  508. if cst != 3 {
  509. return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
  510. }
  511. if csu != 1 {
  512. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
  513. }
  514. if cf != 1 {
  515. return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
  516. }
  517. if tcs[0].ChannelData.CallsStarted != 3 {
  518. return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted)
  519. }
  520. if tcs[0].ChannelData.CallsSucceeded != 1 {
  521. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded)
  522. }
  523. if tcs[0].ChannelData.CallsFailed != 1 {
  524. return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed)
  525. }
  526. return true, nil
  527. }); err != nil {
  528. t.Fatal(err)
  529. }
  530. }
  531. func (s) TestCZServerMetrics(t *testing.T) {
  532. channelz.NewChannelzStorage()
  533. e := tcpClearRREnv
  534. te := newTest(t, e)
  535. te.maxServerReceiveMsgSize = newInt(8)
  536. te.startServer(&testServer{security: e.security})
  537. defer te.tearDown()
  538. cc := te.clientConn()
  539. tc := testpb.NewTestServiceClient(cc)
  540. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  541. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  542. }
  543. const smallSize = 1
  544. const largeSize = 8
  545. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  546. if err != nil {
  547. t.Fatal(err)
  548. }
  549. req := &testpb.SimpleRequest{
  550. ResponseType: testpb.PayloadType_COMPRESSABLE,
  551. ResponseSize: int32(smallSize),
  552. Payload: largePayload,
  553. }
  554. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  555. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  556. }
  557. stream, err := tc.FullDuplexCall(context.Background())
  558. if err != nil {
  559. t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  560. }
  561. defer stream.CloseSend()
  562. if err := verifyResultWithDelay(func() (bool, error) {
  563. ss, _ := channelz.GetServers(0, 0)
  564. if len(ss) != 1 {
  565. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  566. }
  567. if ss[0].ServerData.CallsStarted != 3 {
  568. return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted)
  569. }
  570. if ss[0].ServerData.CallsSucceeded != 1 {
  571. return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded)
  572. }
  573. if ss[0].ServerData.CallsFailed != 1 {
  574. return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed)
  575. }
  576. return true, nil
  577. }); err != nil {
  578. t.Fatal(err)
  579. }
  580. }
  581. type testServiceClientWrapper struct {
  582. testpb.TestServiceClient
  583. mu sync.RWMutex
  584. streamsCreated int
  585. }
  586. func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
  587. t.mu.RLock()
  588. defer t.mu.RUnlock()
  589. return uint32(2*t.streamsCreated - 1)
  590. }
  591. func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
  592. t.mu.Lock()
  593. defer t.mu.Unlock()
  594. t.streamsCreated++
  595. return t.TestServiceClient.EmptyCall(ctx, in, opts...)
  596. }
  597. func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
  598. t.mu.Lock()
  599. defer t.mu.Unlock()
  600. t.streamsCreated++
  601. return t.TestServiceClient.UnaryCall(ctx, in, opts...)
  602. }
  603. func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testpb.TestService_StreamingOutputCallClient, error) {
  604. t.mu.Lock()
  605. defer t.mu.Unlock()
  606. t.streamsCreated++
  607. return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
  608. }
  609. func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_StreamingInputCallClient, error) {
  610. t.mu.Lock()
  611. defer t.mu.Unlock()
  612. t.streamsCreated++
  613. return t.TestServiceClient.StreamingInputCall(ctx, opts...)
  614. }
  615. func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_FullDuplexCallClient, error) {
  616. t.mu.Lock()
  617. defer t.mu.Unlock()
  618. t.streamsCreated++
  619. return t.TestServiceClient.FullDuplexCall(ctx, opts...)
  620. }
  621. func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testpb.TestService_HalfDuplexCallClient, error) {
  622. t.mu.Lock()
  623. defer t.mu.Unlock()
  624. t.streamsCreated++
  625. return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
  626. }
  627. func doSuccessfulUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
  628. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
  629. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  630. }
  631. }
  632. func doStreamingInputCallWithLargePayload(tc testpb.TestServiceClient, t *testing.T) {
  633. s, err := tc.StreamingInputCall(context.Background())
  634. if err != nil {
  635. t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
  636. }
  637. payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
  638. if err != nil {
  639. t.Fatal(err)
  640. }
  641. s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
  642. }
  643. func doServerSideFailedUnaryCall(tc testpb.TestServiceClient, t *testing.T) {
  644. const smallSize = 1
  645. const largeSize = 2000
  646. largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  647. if err != nil {
  648. t.Fatal(err)
  649. }
  650. req := &testpb.SimpleRequest{
  651. ResponseType: testpb.PayloadType_COMPRESSABLE,
  652. ResponseSize: int32(smallSize),
  653. Payload: largePayload,
  654. }
  655. if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
  656. t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  657. }
  658. }
  659. func doClientSideInitiatedFailedStream(tc testpb.TestServiceClient, t *testing.T) {
  660. ctx, cancel := context.WithCancel(context.Background())
  661. stream, err := tc.FullDuplexCall(ctx)
  662. if err != nil {
  663. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  664. }
  665. const smallSize = 1
  666. smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  667. if err != nil {
  668. t.Fatal(err)
  669. }
  670. sreq := &testpb.StreamingOutputCallRequest{
  671. ResponseType: testpb.PayloadType_COMPRESSABLE,
  672. ResponseParameters: []*testpb.ResponseParameters{
  673. {Size: smallSize},
  674. },
  675. Payload: smallPayload,
  676. }
  677. if err := stream.Send(sreq); err != nil {
  678. t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  679. }
  680. if _, err := stream.Recv(); err != nil {
  681. t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  682. }
  683. // By canceling the call, the client will send rst_stream to end the call, and
  684. // the stream will failed as a result.
  685. cancel()
  686. }
  687. // This func is to be used to test client side counting of failed streams.
  688. func doServerSideInitiatedFailedStreamWithRSTStream(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
  689. stream, err := tc.FullDuplexCall(context.Background())
  690. if err != nil {
  691. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  692. }
  693. const smallSize = 1
  694. smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  695. if err != nil {
  696. t.Fatal(err)
  697. }
  698. sreq := &testpb.StreamingOutputCallRequest{
  699. ResponseType: testpb.PayloadType_COMPRESSABLE,
  700. ResponseParameters: []*testpb.ResponseParameters{
  701. {Size: smallSize},
  702. },
  703. Payload: smallPayload,
  704. }
  705. if err := stream.Send(sreq); err != nil {
  706. t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  707. }
  708. if _, err := stream.Recv(); err != nil {
  709. t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  710. }
  711. rcw := l.getLastConn()
  712. if rcw != nil {
  713. rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
  714. }
  715. if _, err := stream.Recv(); err == nil {
  716. t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
  717. }
  718. }
  719. // this func is to be used to test client side counting of failed streams.
  720. func doServerSideInitiatedFailedStreamWithGoAway(tc testpb.TestServiceClient, t *testing.T, l *listenerWrapper) {
  721. // This call is just to keep the transport from shutting down (socket will be deleted
  722. // in this case, and we will not be able to get metrics).
  723. s, err := tc.FullDuplexCall(context.Background())
  724. if err != nil {
  725. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  726. }
  727. if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
  728. {
  729. Size: 1,
  730. },
  731. }}); err != nil {
  732. t.Fatalf("s.Send() failed with error: %v", err)
  733. }
  734. if _, err := s.Recv(); err != nil {
  735. t.Fatalf("s.Recv() failed with error: %v", err)
  736. }
  737. s, err = tc.FullDuplexCall(context.Background())
  738. if err != nil {
  739. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  740. }
  741. if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
  742. {
  743. Size: 1,
  744. },
  745. }}); err != nil {
  746. t.Fatalf("s.Send() failed with error: %v", err)
  747. }
  748. if _, err := s.Recv(); err != nil {
  749. t.Fatalf("s.Recv() failed with error: %v", err)
  750. }
  751. rcw := l.getLastConn()
  752. if rcw != nil {
  753. rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
  754. }
  755. if _, err := s.Recv(); err == nil {
  756. t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
  757. }
  758. }
  759. // this func is to be used to test client side counting of failed streams.
  760. func doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc testpb.TestServiceClient, t *testing.T, dw *dialerWrapper) {
  761. stream, err := tc.FullDuplexCall(context.Background())
  762. if err != nil {
  763. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  764. }
  765. // sleep here to make sure header frame being sent before the data frame we write directly below.
  766. time.Sleep(10 * time.Millisecond)
  767. payload := make([]byte, 65537)
  768. dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.(*testServiceClientWrapper).getCurrentStreamID(), payload)
  769. if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  770. t.Fatalf("%v.Recv() = %v, want error code: %v", stream, err, codes.ResourceExhausted)
  771. }
  772. }
  773. func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) {
  774. ctx, cancel := context.WithCancel(context.Background())
  775. _, err := tc.FullDuplexCall(ctx)
  776. if err != nil {
  777. t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  778. }
  779. // Allow for at least 2 keepalives (1s per ping interval)
  780. time.Sleep(4 * time.Second)
  781. cancel()
  782. }
  783. func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
  784. channelz.NewChannelzStorage()
  785. e := tcpClearRREnv
  786. te := newTest(t, e)
  787. te.maxServerReceiveMsgSize = newInt(20)
  788. te.maxClientReceiveMsgSize = newInt(20)
  789. rcw := te.startServerWithConnControl(&testServer{security: e.security})
  790. defer te.tearDown()
  791. cc := te.clientConn()
  792. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  793. doSuccessfulUnaryCall(tc, t)
  794. var scID, skID int64
  795. if err := verifyResultWithDelay(func() (bool, error) {
  796. tchan, _ := channelz.GetTopChannels(0, 0)
  797. if len(tchan) != 1 {
  798. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  799. }
  800. if len(tchan[0].SubChans) != 1 {
  801. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  802. }
  803. for scID = range tchan[0].SubChans {
  804. break
  805. }
  806. sc := channelz.GetSubChannel(scID)
  807. if sc == nil {
  808. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
  809. }
  810. if len(sc.Sockets) != 1 {
  811. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  812. }
  813. for skID = range sc.Sockets {
  814. break
  815. }
  816. skt := channelz.GetSocket(skID)
  817. sktData := skt.SocketData
  818. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  819. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
  820. }
  821. return true, nil
  822. }); err != nil {
  823. t.Fatal(err)
  824. }
  825. doServerSideFailedUnaryCall(tc, t)
  826. if err := verifyResultWithDelay(func() (bool, error) {
  827. skt := channelz.GetSocket(skID)
  828. sktData := skt.SocketData
  829. if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 1 {
  830. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.MessagesSent, sktData.MessagesReceived)
  831. }
  832. return true, nil
  833. }); err != nil {
  834. t.Fatal(err)
  835. }
  836. doClientSideInitiatedFailedStream(tc, t)
  837. if err := verifyResultWithDelay(func() (bool, error) {
  838. skt := channelz.GetSocket(skID)
  839. sktData := skt.SocketData
  840. if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 3 || sktData.MessagesReceived != 2 {
  841. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  842. }
  843. return true, nil
  844. }); err != nil {
  845. t.Fatal(err)
  846. }
  847. doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
  848. if err := verifyResultWithDelay(func() (bool, error) {
  849. skt := channelz.GetSocket(skID)
  850. sktData := skt.SocketData
  851. if sktData.StreamsStarted != 4 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 2 || sktData.MessagesSent != 4 || sktData.MessagesReceived != 3 {
  852. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  853. }
  854. return true, nil
  855. }); err != nil {
  856. t.Fatal(err)
  857. }
  858. doServerSideInitiatedFailedStreamWithGoAway(tc, t, rcw)
  859. if err := verifyResultWithDelay(func() (bool, error) {
  860. skt := channelz.GetSocket(skID)
  861. sktData := skt.SocketData
  862. if sktData.StreamsStarted != 6 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 3 || sktData.MessagesSent != 6 || sktData.MessagesReceived != 5 {
  863. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  864. }
  865. return true, nil
  866. }); err != nil {
  867. t.Fatal(err)
  868. }
  869. }
  870. // This test is to complete TestCZClientSocketMetricsStreamsAndMessagesCount and
  871. // TestCZServerSocketMetricsStreamsAndMessagesCount by adding the test case of
  872. // server sending RST_STREAM to client due to client side flow control violation.
  873. // It is separated from other cases due to setup incompatibly, i.e. max receive
  874. // size violation will mask flow control violation.
  875. func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
  876. channelz.NewChannelzStorage()
  877. e := tcpClearRREnv
  878. te := newTest(t, e)
  879. te.serverInitialWindowSize = 65536
  880. // Avoid overflowing connection level flow control window, which will lead to
  881. // transport being closed.
  882. te.serverInitialConnWindowSize = 65536 * 2
  883. te.startServer(&testServer{security: e.security})
  884. defer te.tearDown()
  885. cc, dw := te.clientConnWithConnControl()
  886. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  887. doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc, t, dw)
  888. if err := verifyResultWithDelay(func() (bool, error) {
  889. tchan, _ := channelz.GetTopChannels(0, 0)
  890. if len(tchan) != 1 {
  891. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  892. }
  893. if len(tchan[0].SubChans) != 1 {
  894. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  895. }
  896. var id int64
  897. for id = range tchan[0].SubChans {
  898. break
  899. }
  900. sc := channelz.GetSubChannel(id)
  901. if sc == nil {
  902. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  903. }
  904. if len(sc.Sockets) != 1 {
  905. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  906. }
  907. for id = range sc.Sockets {
  908. break
  909. }
  910. skt := channelz.GetSocket(id)
  911. sktData := skt.SocketData
  912. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
  913. return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
  914. }
  915. ss, _ := channelz.GetServers(0, 0)
  916. if len(ss) != 1 {
  917. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  918. }
  919. ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  920. if len(ns) != 1 {
  921. return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
  922. }
  923. sktData = ns[0].SocketData
  924. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
  925. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
  926. }
  927. return true, nil
  928. }); err != nil {
  929. t.Fatal(err)
  930. }
  931. }
  932. func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
  933. channelz.NewChannelzStorage()
  934. e := tcpClearRREnv
  935. te := newTest(t, e)
  936. // disable BDP
  937. te.serverInitialWindowSize = 65536
  938. te.serverInitialConnWindowSize = 65536
  939. te.clientInitialWindowSize = 65536
  940. te.clientInitialConnWindowSize = 65536
  941. te.startServer(&testServer{security: e.security})
  942. defer te.tearDown()
  943. cc := te.clientConn()
  944. tc := testpb.NewTestServiceClient(cc)
  945. for i := 0; i < 10; i++ {
  946. doSuccessfulUnaryCall(tc, t)
  947. }
  948. var cliSktID, svrSktID int64
  949. if err := verifyResultWithDelay(func() (bool, error) {
  950. tchan, _ := channelz.GetTopChannels(0, 0)
  951. if len(tchan) != 1 {
  952. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  953. }
  954. if len(tchan[0].SubChans) != 1 {
  955. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  956. }
  957. var id int64
  958. for id = range tchan[0].SubChans {
  959. break
  960. }
  961. sc := channelz.GetSubChannel(id)
  962. if sc == nil {
  963. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  964. }
  965. if len(sc.Sockets) != 1 {
  966. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  967. }
  968. for id = range sc.Sockets {
  969. break
  970. }
  971. skt := channelz.GetSocket(id)
  972. sktData := skt.SocketData
  973. // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  974. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  975. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  976. }
  977. ss, _ := channelz.GetServers(0, 0)
  978. if len(ss) != 1 {
  979. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  980. }
  981. ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  982. sktData = ns[0].SocketData
  983. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
  984. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  985. }
  986. cliSktID, svrSktID = id, ss[0].ID
  987. return true, nil
  988. }); err != nil {
  989. t.Fatal(err)
  990. }
  991. doStreamingInputCallWithLargePayload(tc, t)
  992. if err := verifyResultWithDelay(func() (bool, error) {
  993. skt := channelz.GetSocket(cliSktID)
  994. sktData := skt.SocketData
  995. // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  996. // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475
  997. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
  998. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  999. }
  1000. ss, _ := channelz.GetServers(0, 0)
  1001. if len(ss) != 1 {
  1002. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1003. }
  1004. ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
  1005. sktData = ns[0].SocketData
  1006. if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
  1007. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1008. }
  1009. return true, nil
  1010. }); err != nil {
  1011. t.Fatal(err)
  1012. }
  1013. // triggers transport flow control window update on server side, since unacked
  1014. // bytes should be larger than limit now. i.e. 50 + 20022 > 65536/4.
  1015. doStreamingInputCallWithLargePayload(tc, t)
  1016. if err := verifyResultWithDelay(func() (bool, error) {
  1017. skt := channelz.GetSocket(cliSktID)
  1018. sktData := skt.SocketData
  1019. // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486
  1020. // Remote: 65536
  1021. if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
  1022. return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1023. }
  1024. ss, _ := channelz.GetServers(0, 0)
  1025. if len(ss) != 1 {
  1026. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1027. }
  1028. ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
  1029. sktData = ns[0].SocketData
  1030. if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
  1031. return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
  1032. }
  1033. return true, nil
  1034. }); err != nil {
  1035. t.Fatal(err)
  1036. }
  1037. }
  1038. func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
  1039. channelz.NewChannelzStorage()
  1040. defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
  1041. internal.KeepaliveMinPingTime = time.Second
  1042. e := tcpClearRREnv
  1043. te := newTest(t, e)
  1044. te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
  1045. keepalive.ClientParameters{
  1046. Time: time.Second,
  1047. Timeout: 500 * time.Millisecond,
  1048. PermitWithoutStream: true,
  1049. }))
  1050. te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
  1051. keepalive.EnforcementPolicy{
  1052. MinTime: 500 * time.Millisecond,
  1053. PermitWithoutStream: true,
  1054. }))
  1055. te.startServer(&testServer{security: e.security})
  1056. te.clientConn() // Dial the server
  1057. defer te.tearDown()
  1058. if err := verifyResultWithDelay(func() (bool, error) {
  1059. tchan, _ := channelz.GetTopChannels(0, 0)
  1060. if len(tchan) != 1 {
  1061. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1062. }
  1063. if len(tchan[0].SubChans) != 1 {
  1064. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  1065. }
  1066. var id int64
  1067. for id = range tchan[0].SubChans {
  1068. break
  1069. }
  1070. sc := channelz.GetSubChannel(id)
  1071. if sc == nil {
  1072. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1073. }
  1074. if len(sc.Sockets) != 1 {
  1075. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  1076. }
  1077. for id = range sc.Sockets {
  1078. break
  1079. }
  1080. skt := channelz.GetSocket(id)
  1081. if skt.SocketData.KeepAlivesSent != 2 {
  1082. return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent)
  1083. }
  1084. return true, nil
  1085. }); err != nil {
  1086. t.Fatal(err)
  1087. }
  1088. }
  1089. func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
  1090. channelz.NewChannelzStorage()
  1091. e := tcpClearRREnv
  1092. te := newTest(t, e)
  1093. te.maxServerReceiveMsgSize = newInt(20)
  1094. te.maxClientReceiveMsgSize = newInt(20)
  1095. te.startServer(&testServer{security: e.security})
  1096. defer te.tearDown()
  1097. cc, _ := te.clientConnWithConnControl()
  1098. tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
  1099. var svrID int64
  1100. if err := verifyResultWithDelay(func() (bool, error) {
  1101. ss, _ := channelz.GetServers(0, 0)
  1102. if len(ss) != 1 {
  1103. return false, fmt.Errorf("there should only be one server, not %d", len(ss))
  1104. }
  1105. svrID = ss[0].ID
  1106. return true, nil
  1107. }); err != nil {
  1108. t.Fatal(err)
  1109. }
  1110. doSuccessfulUnaryCall(tc, t)
  1111. if err := verifyResultWithDelay(func() (bool, error) {
  1112. ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1113. sktData := ns[0].SocketData
  1114. if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  1115. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1116. }
  1117. return true, nil
  1118. }); err != nil {
  1119. t.Fatal(err)
  1120. }
  1121. doServerSideFailedUnaryCall(tc, t)
  1122. if err := verifyResultWithDelay(func() (bool, error) {
  1123. ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1124. sktData := ns[0].SocketData
  1125. if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
  1126. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1127. }
  1128. return true, nil
  1129. }); err != nil {
  1130. t.Fatal(err)
  1131. }
  1132. doClientSideInitiatedFailedStream(tc, t)
  1133. if err := verifyResultWithDelay(func() (bool, error) {
  1134. ns, _ := channelz.GetServerSockets(svrID, 0, 0)
  1135. sktData := ns[0].SocketData
  1136. if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
  1137. return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
  1138. }
  1139. return true, nil
  1140. }); err != nil {
  1141. t.Fatal(err)
  1142. }
  1143. }
  1144. func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
  1145. channelz.NewChannelzStorage()
  1146. e := tcpClearRREnv
  1147. te := newTest(t, e)
  1148. te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{Time: time.Second, Timeout: 500 * time.Millisecond}))
  1149. te.startServer(&testServer{security: e.security})
  1150. defer te.tearDown()
  1151. cc := te.clientConn()
  1152. tc := testpb.NewTestServiceClient(cc)
  1153. doIdleCallToInvokeKeepAlive(tc, t)
  1154. if err := verifyResultWithDelay(func() (bool, error) {
  1155. ss, _ := channelz.GetServers(0, 0)
  1156. if len(ss) != 1 {
  1157. return false, fmt.Errorf("there should be one server, not %d", len(ss))
  1158. }
  1159. ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
  1160. if len(ns) != 1 {
  1161. return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
  1162. }
  1163. if ns[0].SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives.
  1164. return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", ns[0].SocketData.KeepAlivesSent)
  1165. }
  1166. return true, nil
  1167. }); err != nil {
  1168. t.Fatal(err)
  1169. }
  1170. }
  1171. var cipherSuites = []string{
  1172. "TLS_RSA_WITH_RC4_128_SHA",
  1173. "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
  1174. "TLS_RSA_WITH_AES_128_CBC_SHA",
  1175. "TLS_RSA_WITH_AES_256_CBC_SHA",
  1176. "TLS_RSA_WITH_AES_128_GCM_SHA256",
  1177. "TLS_RSA_WITH_AES_256_GCM_SHA384",
  1178. "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
  1179. "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
  1180. "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
  1181. "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
  1182. "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
  1183. "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
  1184. "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
  1185. "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
  1186. "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
  1187. "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
  1188. "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
  1189. "TLS_FALLBACK_SCSV",
  1190. "TLS_RSA_WITH_AES_128_CBC_SHA256",
  1191. "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
  1192. "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
  1193. "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
  1194. "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
  1195. "TLS_AES_128_GCM_SHA256",
  1196. "TLS_AES_256_GCM_SHA384",
  1197. "TLS_CHACHA20_POLY1305_SHA256",
  1198. }
  1199. func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
  1200. channelz.NewChannelzStorage()
  1201. e := tcpTLSRREnv
  1202. te := newTest(t, e)
  1203. te.startServer(&testServer{security: e.security})
  1204. defer te.tearDown()
  1205. te.clientConn()
  1206. if err := verifyResultWithDelay(func() (bool, error) {
  1207. tchan, _ := channelz.GetTopChannels(0, 0)
  1208. if len(tchan) != 1 {
  1209. return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
  1210. }
  1211. if len(tchan[0].SubChans) != 1 {
  1212. return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans))
  1213. }
  1214. var id int64
  1215. for id = range tchan[0].SubChans {
  1216. break
  1217. }
  1218. sc := channelz.GetSubChannel(id)
  1219. if sc == nil {
  1220. return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
  1221. }
  1222. if len(sc.Sockets) != 1 {
  1223. return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets))
  1224. }
  1225. for id = range sc.Sockets {
  1226. break
  1227. }
  1228. skt := channelz.GetSocket(id)
  1229. cert, _ := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
  1230. securityVal, ok := skt.SocketData.Security.(*credentials.TLSChannelzSecurityValue)
  1231. if !ok {
  1232. return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
  1233. }
  1234. if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
  1235. return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
  1236. }
  1237. for _, v := range cipherSuites {
  1238. if v == securityVal.StandardName {
  1239. return true, nil
  1240. }
  1241. }
  1242. return false, fmt.Errorf("SocketData.Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites)
  1243. }); err != nil {
  1244. t.Fatal(err)
  1245. }
  1246. }
  1247. func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
  1248. channelz.NewChannelzStorage()
  1249. e := tcpClearRREnv
  1250. // avoid calling API to set balancer type, which will void service config's change of balancer.
  1251. e.balancer = ""
  1252. te := newTest(t, e)
  1253. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1254. defer cleanup()
  1255. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  1256. r.InitialState(resolver.State{Addresses: resolvedAddrs})
  1257. te.resolverScheme = r.Scheme()
  1258. te.clientConn()
  1259. defer te.tearDown()
  1260. var nestedConn int64
  1261. if err := verifyResultWithDelay(func() (bool, error) {
  1262. tcs, _ := channelz.GetTopChannels(0, 0)
  1263. if len(tcs) != 1 {
  1264. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1265. }
  1266. if len(tcs[0].NestedChans) != 1 {
  1267. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1268. }
  1269. for k := range tcs[0].NestedChans {
  1270. nestedConn = k
  1271. }
  1272. for _, e := range tcs[0].Trace.Events {
  1273. if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
  1274. return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
  1275. }
  1276. }
  1277. ncm := channelz.GetChannel(nestedConn)
  1278. if ncm.Trace == nil {
  1279. return false, fmt.Errorf("trace for nested channel should not be empty")
  1280. }
  1281. if len(ncm.Trace.Events) == 0 {
  1282. return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1283. }
  1284. if ncm.Trace.Events[0].Desc != "Channel Created" {
  1285. return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
  1286. }
  1287. return true, nil
  1288. }); err != nil {
  1289. t.Fatal(err)
  1290. }
  1291. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
  1292. // wait for the shutdown of grpclb balancer
  1293. if err := verifyResultWithDelay(func() (bool, error) {
  1294. tcs, _ := channelz.GetTopChannels(0, 0)
  1295. if len(tcs) != 1 {
  1296. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1297. }
  1298. if len(tcs[0].NestedChans) != 0 {
  1299. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1300. }
  1301. ncm := channelz.GetChannel(nestedConn)
  1302. if ncm == nil {
  1303. return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
  1304. }
  1305. if ncm.Trace == nil {
  1306. return false, fmt.Errorf("trace for nested channel should not be empty")
  1307. }
  1308. if len(ncm.Trace.Events) == 0 {
  1309. return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
  1310. }
  1311. if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
  1312. return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
  1313. }
  1314. return true, nil
  1315. }); err != nil {
  1316. t.Fatal(err)
  1317. }
  1318. }
  1319. func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
  1320. channelz.NewChannelzStorage()
  1321. e := tcpClearRREnv
  1322. te := newTest(t, e)
  1323. te.startServer(&testServer{security: e.security})
  1324. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1325. defer cleanup()
  1326. r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1327. te.resolverScheme = r.Scheme()
  1328. te.clientConn()
  1329. defer te.tearDown()
  1330. var subConn int64
  1331. // Here, we just wait for all sockets to be up. In the future, if we implement
  1332. // IDLE, we may need to make several rpc calls to create the sockets.
  1333. if err := verifyResultWithDelay(func() (bool, error) {
  1334. tcs, _ := channelz.GetTopChannels(0, 0)
  1335. if len(tcs) != 1 {
  1336. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1337. }
  1338. if len(tcs[0].SubChans) != 1 {
  1339. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1340. }
  1341. for k := range tcs[0].SubChans {
  1342. subConn = k
  1343. }
  1344. for _, e := range tcs[0].Trace.Events {
  1345. if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
  1346. return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
  1347. }
  1348. }
  1349. scm := channelz.GetSubChannel(subConn)
  1350. if scm == nil {
  1351. return false, fmt.Errorf("subChannel does not exist")
  1352. }
  1353. if scm.Trace == nil {
  1354. return false, fmt.Errorf("trace for subChannel should not be empty")
  1355. }
  1356. if len(scm.Trace.Events) == 0 {
  1357. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1358. }
  1359. if scm.Trace.Events[0].Desc != "Subchannel Created" {
  1360. return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
  1361. }
  1362. return true, nil
  1363. }); err != nil {
  1364. t.Fatal(err)
  1365. }
  1366. r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
  1367. if err := verifyResultWithDelay(func() (bool, error) {
  1368. tcs, _ := channelz.GetTopChannels(0, 0)
  1369. if len(tcs) != 1 {
  1370. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1371. }
  1372. if len(tcs[0].SubChans) != 0 {
  1373. return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
  1374. }
  1375. scm := channelz.GetSubChannel(subConn)
  1376. if scm == nil {
  1377. return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1378. }
  1379. if scm.Trace == nil {
  1380. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1381. }
  1382. if len(scm.Trace.Events) == 0 {
  1383. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1384. }
  1385. if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
  1386. return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
  1387. }
  1388. return true, nil
  1389. }); err != nil {
  1390. t.Fatal(err)
  1391. }
  1392. }
  1393. func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
  1394. channelz.NewChannelzStorage()
  1395. e := tcpClearRREnv
  1396. e.balancer = ""
  1397. te := newTest(t, e)
  1398. te.startServer(&testServer{security: e.security})
  1399. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1400. defer cleanup()
  1401. addrs := []resolver.Address{{Addr: te.srvAddr}}
  1402. r.InitialState(resolver.State{Addresses: addrs})
  1403. te.resolverScheme = r.Scheme()
  1404. te.clientConn()
  1405. defer te.tearDown()
  1406. var cid int64
  1407. // Here, we just wait for all sockets to be up. In the future, if we implement
  1408. // IDLE, we may need to make several rpc calls to create the sockets.
  1409. if err := verifyResultWithDelay(func() (bool, error) {
  1410. tcs, _ := channelz.GetTopChannels(0, 0)
  1411. if len(tcs) != 1 {
  1412. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1413. }
  1414. cid = tcs[0].ID
  1415. for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- {
  1416. if strings.Contains(tcs[0].Trace.Events[i].Desc, "resolver returned new addresses") {
  1417. break
  1418. }
  1419. if i == 0 {
  1420. return false, fmt.Errorf("events do not contain expected address resolution from empty address state. Got: %+v", tcs[0].Trace.Events)
  1421. }
  1422. }
  1423. return true, nil
  1424. }); err != nil {
  1425. t.Fatal(err)
  1426. }
  1427. r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
  1428. if err := verifyResultWithDelay(func() (bool, error) {
  1429. cm := channelz.GetChannel(cid)
  1430. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1431. if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
  1432. break
  1433. }
  1434. if i == 0 {
  1435. return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
  1436. }
  1437. }
  1438. return true, nil
  1439. }); err != nil {
  1440. t.Fatal(err)
  1441. }
  1442. newSC := `{
  1443. "methodConfig": [
  1444. {
  1445. "name": [
  1446. {
  1447. "service": "grpc.testing.TestService",
  1448. "method": "EmptyCall"
  1449. }
  1450. ],
  1451. "waitForReady": false,
  1452. "timeout": ".001s"
  1453. }
  1454. ]
  1455. }`
  1456. r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
  1457. if err := verifyResultWithDelay(func() (bool, error) {
  1458. cm := channelz.GetChannel(cid)
  1459. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1460. if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") {
  1461. break
  1462. }
  1463. if i == 0 {
  1464. return false, fmt.Errorf("events do not contain expected address resolution of new service config")
  1465. }
  1466. }
  1467. return true, nil
  1468. }); err != nil {
  1469. t.Fatal(err)
  1470. }
  1471. r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
  1472. if err := verifyResultWithDelay(func() (bool, error) {
  1473. cm := channelz.GetChannel(cid)
  1474. for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
  1475. if strings.Contains(cm.Trace.Events[i].Desc, "resolver returned an empty address list") {
  1476. break
  1477. }
  1478. if i == 0 {
  1479. return false, fmt.Errorf("events do not contain expected address resolution of empty address")
  1480. }
  1481. }
  1482. return true, nil
  1483. }); err != nil {
  1484. t.Fatal(err)
  1485. }
  1486. }
  1487. func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
  1488. channelz.NewChannelzStorage()
  1489. e := tcpClearRREnv
  1490. e.balancer = ""
  1491. te := newTest(t, e)
  1492. te.startServers(&testServer{security: e.security}, 3)
  1493. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1494. defer cleanup()
  1495. var svrAddrs []resolver.Address
  1496. for _, a := range te.srvAddrs {
  1497. svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
  1498. }
  1499. r.InitialState(resolver.State{Addresses: svrAddrs})
  1500. te.resolverScheme = r.Scheme()
  1501. cc := te.clientConn()
  1502. defer te.tearDown()
  1503. tc := testpb.NewTestServiceClient(cc)
  1504. // make sure the connection is up
  1505. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1506. defer cancel()
  1507. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1508. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1509. }
  1510. te.srvs[0].Stop()
  1511. te.srvs[1].Stop()
  1512. // Here, we just wait for all sockets to be up. In the future, if we implement
  1513. // IDLE, we may need to make several rpc calls to create the sockets.
  1514. if err := verifyResultWithDelay(func() (bool, error) {
  1515. tcs, _ := channelz.GetTopChannels(0, 0)
  1516. if len(tcs) != 1 {
  1517. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1518. }
  1519. if len(tcs[0].SubChans) != 1 {
  1520. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1521. }
  1522. var subConn int64
  1523. for k := range tcs[0].SubChans {
  1524. subConn = k
  1525. }
  1526. scm := channelz.GetSubChannel(subConn)
  1527. if scm.Trace == nil {
  1528. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1529. }
  1530. if len(scm.Trace.Events) == 0 {
  1531. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1532. }
  1533. for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
  1534. if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
  1535. break
  1536. }
  1537. if i == 0 {
  1538. return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
  1539. }
  1540. }
  1541. return true, nil
  1542. }); err != nil {
  1543. t.Fatal(err)
  1544. }
  1545. }
  1546. func (s) TestCZSubChannelConnectivityState(t *testing.T) {
  1547. channelz.NewChannelzStorage()
  1548. e := tcpClearRREnv
  1549. te := newTest(t, e)
  1550. te.startServer(&testServer{security: e.security})
  1551. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1552. defer cleanup()
  1553. r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1554. te.resolverScheme = r.Scheme()
  1555. cc := te.clientConn()
  1556. defer te.tearDown()
  1557. tc := testpb.NewTestServiceClient(cc)
  1558. // make sure the connection is up
  1559. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1560. defer cancel()
  1561. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1562. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1563. }
  1564. var subConn int64
  1565. te.srv.Stop()
  1566. if err := verifyResultWithDelay(func() (bool, error) {
  1567. // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
  1568. // to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
  1569. if subConn == 0 {
  1570. tcs, _ := channelz.GetTopChannels(0, 0)
  1571. if len(tcs) != 1 {
  1572. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1573. }
  1574. if len(tcs[0].SubChans) != 1 {
  1575. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1576. }
  1577. for k := range tcs[0].SubChans {
  1578. // get the SubChannel id for further trace inquiry.
  1579. subConn = k
  1580. }
  1581. }
  1582. scm := channelz.GetSubChannel(subConn)
  1583. if scm == nil {
  1584. return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
  1585. }
  1586. if scm.Trace == nil {
  1587. return false, fmt.Errorf("trace for SubChannel should not be empty")
  1588. }
  1589. if len(scm.Trace.Events) == 0 {
  1590. return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
  1591. }
  1592. var ready, connecting, transient, shutdown int
  1593. for _, e := range scm.Trace.Events {
  1594. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
  1595. transient++
  1596. }
  1597. }
  1598. // Make sure the SubChannel has already seen transient failure before shutting it down through
  1599. // r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}).
  1600. if transient == 0 {
  1601. return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
  1602. }
  1603. transient = 0
  1604. r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
  1605. for _, e := range scm.Trace.Events {
  1606. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
  1607. ready++
  1608. }
  1609. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
  1610. connecting++
  1611. }
  1612. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
  1613. transient++
  1614. }
  1615. if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
  1616. shutdown++
  1617. }
  1618. }
  1619. // example:
  1620. // Subchannel Created
  1621. // Subchannel's connectivity state changed to CONNECTING
  1622. // Subchannel picked a new address: "localhost:36011"
  1623. // Subchannel's connectivity state changed to READY
  1624. // Subchannel's connectivity state changed to TRANSIENT_FAILURE
  1625. // Subchannel's connectivity state changed to CONNECTING
  1626. // Subchannel picked a new address: "localhost:36011"
  1627. // Subchannel's connectivity state changed to SHUTDOWN
  1628. // Subchannel Deleted
  1629. if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
  1630. return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
  1631. }
  1632. return true, nil
  1633. }); err != nil {
  1634. t.Fatal(err)
  1635. }
  1636. }
  1637. func (s) TestCZChannelConnectivityState(t *testing.T) {
  1638. channelz.NewChannelzStorage()
  1639. e := tcpClearRREnv
  1640. te := newTest(t, e)
  1641. te.startServer(&testServer{security: e.security})
  1642. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1643. defer cleanup()
  1644. r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1645. te.resolverScheme = r.Scheme()
  1646. cc := te.clientConn()
  1647. defer te.tearDown()
  1648. tc := testpb.NewTestServiceClient(cc)
  1649. // make sure the connection is up
  1650. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  1651. defer cancel()
  1652. if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1653. t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1654. }
  1655. te.srv.Stop()
  1656. if err := verifyResultWithDelay(func() (bool, error) {
  1657. tcs, _ := channelz.GetTopChannels(0, 0)
  1658. if len(tcs) != 1 {
  1659. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1660. }
  1661. var ready, connecting, transient int
  1662. for _, e := range tcs[0].Trace.Events {
  1663. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
  1664. ready++
  1665. }
  1666. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
  1667. connecting++
  1668. }
  1669. if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
  1670. transient++
  1671. }
  1672. }
  1673. // example:
  1674. // Channel Created
  1675. // Adressses resolved (from empty address state): "localhost:40467"
  1676. // SubChannel (id: 4[]) Created
  1677. // Channel's connectivity state changed to CONNECTING
  1678. // Channel's connectivity state changed to READY
  1679. // Channel's connectivity state changed to TRANSIENT_FAILURE
  1680. // Channel's connectivity state changed to CONNECTING
  1681. // Channel's connectivity state changed to TRANSIENT_FAILURE
  1682. if ready != 1 || connecting < 1 || transient < 1 {
  1683. return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
  1684. }
  1685. return true, nil
  1686. }); err != nil {
  1687. t.Fatal(err)
  1688. }
  1689. }
  1690. func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
  1691. channelz.NewChannelzStorage()
  1692. e := tcpClearRREnv
  1693. // avoid calling API to set balancer type, which will void service config's change of balancer.
  1694. e.balancer = ""
  1695. te := newTest(t, e)
  1696. channelz.SetMaxTraceEntry(1)
  1697. defer channelz.ResetMaxTraceEntryToDefault()
  1698. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1699. defer cleanup()
  1700. resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
  1701. r.InitialState(resolver.State{Addresses: resolvedAddrs})
  1702. te.resolverScheme = r.Scheme()
  1703. te.clientConn()
  1704. defer te.tearDown()
  1705. var nestedConn int64
  1706. if err := verifyResultWithDelay(func() (bool, error) {
  1707. tcs, _ := channelz.GetTopChannels(0, 0)
  1708. if len(tcs) != 1 {
  1709. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1710. }
  1711. if len(tcs[0].NestedChans) != 1 {
  1712. return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1713. }
  1714. for k := range tcs[0].NestedChans {
  1715. nestedConn = k
  1716. }
  1717. return true, nil
  1718. }); err != nil {
  1719. t.Fatal(err)
  1720. }
  1721. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`})
  1722. // wait for the shutdown of grpclb balancer
  1723. if err := verifyResultWithDelay(func() (bool, error) {
  1724. tcs, _ := channelz.GetTopChannels(0, 0)
  1725. if len(tcs) != 1 {
  1726. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1727. }
  1728. if len(tcs[0].NestedChans) != 0 {
  1729. return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
  1730. }
  1731. return true, nil
  1732. }); err != nil {
  1733. t.Fatal(err)
  1734. }
  1735. // verify that the nested channel no longer exist due to trace referencing it got overwritten.
  1736. if err := verifyResultWithDelay(func() (bool, error) {
  1737. cm := channelz.GetChannel(nestedConn)
  1738. if cm != nil {
  1739. return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1740. }
  1741. return true, nil
  1742. }); err != nil {
  1743. t.Fatal(err)
  1744. }
  1745. }
  1746. func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
  1747. channelz.NewChannelzStorage()
  1748. e := tcpClearRREnv
  1749. te := newTest(t, e)
  1750. channelz.SetMaxTraceEntry(1)
  1751. defer channelz.ResetMaxTraceEntryToDefault()
  1752. te.startServer(&testServer{security: e.security})
  1753. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1754. defer cleanup()
  1755. r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1756. te.resolverScheme = r.Scheme()
  1757. te.clientConn()
  1758. defer te.tearDown()
  1759. var subConn int64
  1760. // Here, we just wait for all sockets to be up. In the future, if we implement
  1761. // IDLE, we may need to make several rpc calls to create the sockets.
  1762. if err := verifyResultWithDelay(func() (bool, error) {
  1763. tcs, _ := channelz.GetTopChannels(0, 0)
  1764. if len(tcs) != 1 {
  1765. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1766. }
  1767. if len(tcs[0].SubChans) != 1 {
  1768. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1769. }
  1770. for k := range tcs[0].SubChans {
  1771. subConn = k
  1772. }
  1773. return true, nil
  1774. }); err != nil {
  1775. t.Fatal(err)
  1776. }
  1777. r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
  1778. if err := verifyResultWithDelay(func() (bool, error) {
  1779. tcs, _ := channelz.GetTopChannels(0, 0)
  1780. if len(tcs) != 1 {
  1781. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1782. }
  1783. if len(tcs[0].SubChans) != 0 {
  1784. return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
  1785. }
  1786. return true, nil
  1787. }); err != nil {
  1788. t.Fatal(err)
  1789. }
  1790. // verify that the subchannel no longer exist due to trace referencing it got overwritten.
  1791. if err := verifyResultWithDelay(func() (bool, error) {
  1792. cm := channelz.GetChannel(subConn)
  1793. if cm != nil {
  1794. return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1795. }
  1796. return true, nil
  1797. }); err != nil {
  1798. t.Fatal(err)
  1799. }
  1800. }
  1801. func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
  1802. channelz.NewChannelzStorage()
  1803. e := tcpClearRREnv
  1804. te := newTest(t, e)
  1805. te.startServer(&testServer{security: e.security})
  1806. r, cleanup := manual.GenerateAndRegisterManualResolver()
  1807. defer cleanup()
  1808. r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
  1809. te.resolverScheme = r.Scheme()
  1810. te.clientConn()
  1811. var subConn int64
  1812. // Here, we just wait for all sockets to be up. In the future, if we implement
  1813. // IDLE, we may need to make several rpc calls to create the sockets.
  1814. if err := verifyResultWithDelay(func() (bool, error) {
  1815. tcs, _ := channelz.GetTopChannels(0, 0)
  1816. if len(tcs) != 1 {
  1817. return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
  1818. }
  1819. if len(tcs[0].SubChans) != 1 {
  1820. return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
  1821. }
  1822. for k := range tcs[0].SubChans {
  1823. subConn = k
  1824. }
  1825. return true, nil
  1826. }); err != nil {
  1827. t.Fatal(err)
  1828. }
  1829. te.tearDown()
  1830. // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared.
  1831. if err := verifyResultWithDelay(func() (bool, error) {
  1832. cm := channelz.GetChannel(subConn)
  1833. if cm != nil {
  1834. return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
  1835. }
  1836. return true, nil
  1837. }); err != nil {
  1838. t.Fatal(err)
  1839. }
  1840. }