You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
6.2 KiB

  1. /*
  2. The remote package provides the pieces to allow Ginkgo test suites to report to remote listeners.
  3. This is used, primarily, to enable streaming parallel test output but has, in principal, broader applications (e.g. streaming test output to a browser).
  4. */
  5. package remote
  6. import (
  7. "encoding/json"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "sync"
  12. "github.com/onsi/ginkgo/internal/spec_iterator"
  13. "github.com/onsi/ginkgo/config"
  14. "github.com/onsi/ginkgo/reporters"
  15. "github.com/onsi/ginkgo/types"
  16. )
  17. /*
  18. Server spins up on an automatically selected port and listens for communication from the forwarding reporter.
  19. It then forwards that communication to attached reporters.
  20. */
  21. type Server struct {
  22. listener net.Listener
  23. reporters []reporters.Reporter
  24. alives []func() bool
  25. lock *sync.Mutex
  26. beforeSuiteData types.RemoteBeforeSuiteData
  27. parallelTotal int
  28. counter int
  29. }
  30. //Create a new server, automatically selecting a port
  31. func NewServer(parallelTotal int) (*Server, error) {
  32. listener, err := net.Listen("tcp", "127.0.0.1:0")
  33. if err != nil {
  34. return nil, err
  35. }
  36. return &Server{
  37. listener: listener,
  38. lock: &sync.Mutex{},
  39. alives: make([]func() bool, parallelTotal),
  40. beforeSuiteData: types.RemoteBeforeSuiteData{Data: nil, State: types.RemoteBeforeSuiteStatePending},
  41. parallelTotal: parallelTotal,
  42. }, nil
  43. }
  44. //Start the server. You don't need to `go s.Start()`, just `s.Start()`
  45. func (server *Server) Start() {
  46. httpServer := &http.Server{}
  47. mux := http.NewServeMux()
  48. httpServer.Handler = mux
  49. //streaming endpoints
  50. mux.HandleFunc("/SpecSuiteWillBegin", server.specSuiteWillBegin)
  51. mux.HandleFunc("/BeforeSuiteDidRun", server.beforeSuiteDidRun)
  52. mux.HandleFunc("/AfterSuiteDidRun", server.afterSuiteDidRun)
  53. mux.HandleFunc("/SpecWillRun", server.specWillRun)
  54. mux.HandleFunc("/SpecDidComplete", server.specDidComplete)
  55. mux.HandleFunc("/SpecSuiteDidEnd", server.specSuiteDidEnd)
  56. //synchronization endpoints
  57. mux.HandleFunc("/BeforeSuiteState", server.handleBeforeSuiteState)
  58. mux.HandleFunc("/RemoteAfterSuiteData", server.handleRemoteAfterSuiteData)
  59. mux.HandleFunc("/counter", server.handleCounter)
  60. mux.HandleFunc("/has-counter", server.handleHasCounter) //for backward compatibility
  61. go httpServer.Serve(server.listener)
  62. }
  63. //Stop the server
  64. func (server *Server) Close() {
  65. server.listener.Close()
  66. }
  67. //The address the server can be reached it. Pass this into the `ForwardingReporter`.
  68. func (server *Server) Address() string {
  69. return "http://" + server.listener.Addr().String()
  70. }
  71. //
  72. // Streaming Endpoints
  73. //
  74. //The server will forward all received messages to Ginkgo reporters registered with `RegisterReporters`
  75. func (server *Server) readAll(request *http.Request) []byte {
  76. defer request.Body.Close()
  77. body, _ := ioutil.ReadAll(request.Body)
  78. return body
  79. }
  80. func (server *Server) RegisterReporters(reporters ...reporters.Reporter) {
  81. server.reporters = reporters
  82. }
  83. func (server *Server) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
  84. body := server.readAll(request)
  85. var data struct {
  86. Config config.GinkgoConfigType `json:"config"`
  87. Summary *types.SuiteSummary `json:"suite-summary"`
  88. }
  89. json.Unmarshal(body, &data)
  90. for _, reporter := range server.reporters {
  91. reporter.SpecSuiteWillBegin(data.Config, data.Summary)
  92. }
  93. }
  94. func (server *Server) beforeSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
  95. body := server.readAll(request)
  96. var setupSummary *types.SetupSummary
  97. json.Unmarshal(body, &setupSummary)
  98. for _, reporter := range server.reporters {
  99. reporter.BeforeSuiteDidRun(setupSummary)
  100. }
  101. }
  102. func (server *Server) afterSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
  103. body := server.readAll(request)
  104. var setupSummary *types.SetupSummary
  105. json.Unmarshal(body, &setupSummary)
  106. for _, reporter := range server.reporters {
  107. reporter.AfterSuiteDidRun(setupSummary)
  108. }
  109. }
  110. func (server *Server) specWillRun(writer http.ResponseWriter, request *http.Request) {
  111. body := server.readAll(request)
  112. var specSummary *types.SpecSummary
  113. json.Unmarshal(body, &specSummary)
  114. for _, reporter := range server.reporters {
  115. reporter.SpecWillRun(specSummary)
  116. }
  117. }
  118. func (server *Server) specDidComplete(writer http.ResponseWriter, request *http.Request) {
  119. body := server.readAll(request)
  120. var specSummary *types.SpecSummary
  121. json.Unmarshal(body, &specSummary)
  122. for _, reporter := range server.reporters {
  123. reporter.SpecDidComplete(specSummary)
  124. }
  125. }
  126. func (server *Server) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
  127. body := server.readAll(request)
  128. var suiteSummary *types.SuiteSummary
  129. json.Unmarshal(body, &suiteSummary)
  130. for _, reporter := range server.reporters {
  131. reporter.SpecSuiteDidEnd(suiteSummary)
  132. }
  133. }
  134. //
  135. // Synchronization Endpoints
  136. //
  137. func (server *Server) RegisterAlive(node int, alive func() bool) {
  138. server.lock.Lock()
  139. defer server.lock.Unlock()
  140. server.alives[node-1] = alive
  141. }
  142. func (server *Server) nodeIsAlive(node int) bool {
  143. server.lock.Lock()
  144. defer server.lock.Unlock()
  145. alive := server.alives[node-1]
  146. if alive == nil {
  147. return true
  148. }
  149. return alive()
  150. }
  151. func (server *Server) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
  152. if request.Method == "POST" {
  153. dec := json.NewDecoder(request.Body)
  154. dec.Decode(&(server.beforeSuiteData))
  155. } else {
  156. beforeSuiteData := server.beforeSuiteData
  157. if beforeSuiteData.State == types.RemoteBeforeSuiteStatePending && !server.nodeIsAlive(1) {
  158. beforeSuiteData.State = types.RemoteBeforeSuiteStateDisappeared
  159. }
  160. enc := json.NewEncoder(writer)
  161. enc.Encode(beforeSuiteData)
  162. }
  163. }
  164. func (server *Server) handleRemoteAfterSuiteData(writer http.ResponseWriter, request *http.Request) {
  165. afterSuiteData := types.RemoteAfterSuiteData{
  166. CanRun: true,
  167. }
  168. for i := 2; i <= server.parallelTotal; i++ {
  169. afterSuiteData.CanRun = afterSuiteData.CanRun && !server.nodeIsAlive(i)
  170. }
  171. enc := json.NewEncoder(writer)
  172. enc.Encode(afterSuiteData)
  173. }
  174. func (server *Server) handleCounter(writer http.ResponseWriter, request *http.Request) {
  175. c := spec_iterator.Counter{}
  176. server.lock.Lock()
  177. c.Index = server.counter
  178. server.counter++
  179. server.lock.Unlock()
  180. json.NewEncoder(writer).Encode(c)
  181. }
  182. func (server *Server) handleHasCounter(writer http.ResponseWriter, request *http.Request) {
  183. writer.Write([]byte(""))
  184. }