You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
12 KiB

  1. package p2p
  2. import "io"
  3. import "net"
  4. import "fmt"
  5. import "time"
  6. import "testing"
  7. import "container/list"
  8. import "encoding/binary"
  9. import "github.com/romana/rlog"
  10. import log "github.com/sirupsen/logrus"
  11. import "github.com/deroproject/derosuite/globals"
  12. // all communications flow in little endian
  13. const LEVIN_SIGNATURE = 0x0101010101012101 //Bender's nightmare
  14. const LEVIN_SIGNATURE_DATA = 0x0102010101011101
  15. const LEVIN_PROTOCOL_VER_0 = 0
  16. const LEVIN_PROTOCOL_VER_1 = 1
  17. const LEVIN_PACKET_REQUEST = 0x00000001
  18. const LEVIN_PACKET_RESPONSE = 0x00000002
  19. // the whole structure should be packed into 33 bytes
  20. type Levin_Header struct {
  21. Signature uint64
  22. CB uint64 // this contains data size appended to buffer
  23. ReturnData bool
  24. Command uint32
  25. ReturnCode int32
  26. Flags uint32
  27. Protocol_Version uint32
  28. }
  29. // all response will have the signature in big endian form
  30. type Levin_Data_Header struct {
  31. Signature uint64 // LEVIN_SIGNATURE_DATA
  32. //Boost_Header byte
  33. Data []byte
  34. }
  35. // sets timeout based on connection state, so as stale connections are cleared quickly
  36. func set_timeout(connection *Connection) {
  37. if connection.State == HANDSHAKE_PENDING {
  38. connection.Conn.SetReadDeadline(time.Now().Add(20 * time.Second)) // new connections have 20 seconds to handshake
  39. } else {
  40. connection.Conn.SetReadDeadline(time.Now().Add(300 * time.Second)) // good connections have 5 mins to communicate
  41. }
  42. }
  43. /* this is the entire connection handler, all incoming/outgoing connections end up here */
  44. func Handle_Connection(conn net.Conn, remote_addr *net.TCPAddr, incoming bool) {
  45. var connection Connection
  46. var levin_header Levin_Header
  47. connection.Incoming = incoming
  48. connection.Conn = conn
  49. var idle int
  50. connection.Addr = remote_addr // since we may be connecting via socks, get target IP
  51. connection.Command_queue = list.New() // init command queue
  52. connection.State = HANDSHAKE_PENDING
  53. if incoming {
  54. connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "INC"})
  55. } else {
  56. connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "OUT"})
  57. }
  58. defer func() {
  59. if r := recover(); r != nil {
  60. connection.logger.Fatalf("Recovered while handling connection", r)
  61. }
  62. }()
  63. Connection_Add(&connection) // add connection to pool
  64. if !incoming {
  65. Send_Handshake(&connection) // send handshake
  66. }
  67. // goroutine to exit the connection if signalled
  68. go func() {
  69. ticker := time.NewTicker(1 * time.Second) // 1 second ticker
  70. for {
  71. select {
  72. case <-ticker.C:
  73. idle++
  74. // if idle more than 13 secs, we should send a timed sync
  75. if idle > 13 {
  76. if connection.State != HANDSHAKE_PENDING {
  77. connection.State = IDLE
  78. }
  79. Send_Timed_Sync(&connection)
  80. //connection.logger.Debugf("We should send a timed sync")
  81. idle = 0
  82. }
  83. case <-Exit_Event: // p2p is shutting down, close the connection
  84. connection.Exit = true
  85. ticker.Stop() // release resources of timer
  86. Connection_Delete(&connection)
  87. conn.Close()
  88. return // close the connection and close the routine
  89. }
  90. if connection.Exit { // release resources of timer
  91. ticker.Stop()
  92. Connection_Delete(&connection)
  93. return
  94. }
  95. }
  96. }()
  97. for {
  98. if connection.Exit {
  99. connection.logger.Debugf("Connection exited")
  100. conn.Close()
  101. return
  102. }
  103. // wait and read header
  104. header_data := make([]byte, 33, 33) // size of levin header
  105. idle = 0
  106. rlog.Tracef(10, "waiting to read header bytes from network %s\n", globals.CTXString(connection.logger))
  107. set_timeout(&connection)
  108. read_bytes, err := io.ReadFull(conn, header_data)
  109. if err != nil {
  110. rlog.Tracef(4, "Error while reading levin header exiting err:%s\n", err)
  111. connection.Exit = true
  112. continue
  113. }
  114. rlog.Tracef(10, "Read %d bytes from network\n", read_bytes)
  115. if connection.State != HANDSHAKE_PENDING {
  116. connection.State = ACTIVE
  117. }
  118. err = levin_header.DeSerialize(header_data)
  119. if err != nil {
  120. rlog.Tracef(4, "Error while DeSerializing levin header exiting err:%s\n", err)
  121. connection.Exit = true
  122. continue
  123. }
  124. // read data as per requirement
  125. data := make([]byte, levin_header.CB, levin_header.CB)
  126. set_timeout(&connection)
  127. read_bytes, err = io.ReadFull(conn, data)
  128. rlog.Tracef(10, "Read %d bytes from network for data \n", read_bytes)
  129. if err != nil {
  130. rlog.Tracef(4, "Error while reading levin data exiting err:%s\n", err)
  131. connection.Exit = true
  132. continue
  133. }
  134. name := COMMAND_NAME[levin_header.Command]
  135. if name == "" {
  136. connection.logger.Warnf("No Such command %d exiting\n", levin_header.Command)
  137. connection.Exit = true
  138. continue
  139. }
  140. //connection.logger.WithFields(log.Fields{
  141. // "command": name,
  142. // "flags": levin_header.Flags}).Debugf("Incoming Command")
  143. if levin_header.Flags == LEVIN_PACKET_RESPONSE {
  144. if connection.Command_queue.Len() < 1 {
  145. connection.logger.Warnf("Invalid Response ( we have not queued anything\n")
  146. connection.Exit = true
  147. continue
  148. }
  149. front_command := connection.Command_queue.Front()
  150. if levin_header.Command != front_command.Value.(uint32) {
  151. connection.logger.Warnf("Invalid Response ( we queued some other command\n")
  152. connection.Exit = true
  153. continue
  154. }
  155. connection.Lock()
  156. connection.Command_queue.Remove(front_command)
  157. connection.Unlock()
  158. switch levin_header.Command {
  159. case P2P_COMMAND_HANDSHAKE: // Parse incoming handshake response
  160. Handle_P2P_Handshake_Command_Response(&connection, &levin_header, data)
  161. // if response is OK, mark conncection as good and add it to list
  162. case P2P_COMMAND_TIMED_SYNC: // we never send timed response
  163. // connection.logger.Infof("Response for timed sync arrived")
  164. Handle_P2P_Timed_Sync_Response(&connection, &levin_header, data)
  165. case P2P_COMMAND_PING: // we never send ping packets
  166. case P2P_COMMAND_REQUEST_SUPPORT_FLAGS: // we never send flags packet
  167. }
  168. }
  169. if levin_header.Flags == LEVIN_PACKET_REQUEST {
  170. switch levin_header.Command {
  171. case P2P_COMMAND_HANDSHAKE: // send response
  172. connection.logger.Debugf("Incoming handshake command")
  173. Handle_P2P_Handshake_Command(&connection, &levin_header, data)
  174. case P2P_COMMAND_REQUEST_SUPPORT_FLAGS: // send reponse
  175. Handle_P2P_Support_Flags(&connection, &levin_header, data)
  176. case P2P_COMMAND_TIMED_SYNC:
  177. Handle_P2P_Timed_Sync(&connection, &levin_header, data)
  178. // crypto note core protocols commands related to blockchain
  179. // peer wants to syncronise his chain to ours
  180. case BC_NOTIFY_REQUEST_CHAIN:
  181. Handle_BC_Notify_Chain(&connection, &levin_header, data)
  182. // we want to syncronise our chain to peers
  183. case BC_NOTIFY_RESPONSE_CHAIN_ENTRY:
  184. Handle_BC_Notify_Response_Chain_Entry(&connection, &levin_header, data)
  185. case BC_NOTIFY_REQUEST_GET_OBJECTS: // peer requested some object
  186. Handle_BC_Notify_Request_GetObjects(&connection, &levin_header, data)
  187. case BC_NOTIFY_RESPONSE_GET_OBJECTS: // peer responded to our object requests
  188. Handle_BC_Notify_Response_GetObjects(&connection, &levin_header, data)
  189. case BC_NOTIFY_NEW_TRANSACTIONS:
  190. Handle_BC_Notify_New_Transactions(&connection, &levin_header, data)
  191. case BC_NOTIFY_NEW_BLOCK:
  192. Handle_BC_Notify_New_Block(&connection, &levin_header, data)
  193. }
  194. }
  195. }
  196. }
  197. /* this operation can never fail */
  198. func SerializeLevinHeader(header Levin_Header) []byte {
  199. packed_buffer := make([]byte, 33, 33)
  200. binary.LittleEndian.PutUint64(packed_buffer[0:8], LEVIN_SIGNATURE) // packed 8 bytes
  201. binary.LittleEndian.PutUint64(packed_buffer[8:16], header.CB) // packed 8 + 8 bytes
  202. if header.ReturnData {
  203. packed_buffer[16] = 1 // packed 8 + 8 + 1
  204. }
  205. binary.LittleEndian.PutUint32(packed_buffer[17:17+4], header.Command) // packed 8+8+1+4 bytes
  206. binary.LittleEndian.PutUint32(packed_buffer[21:21+4], uint32(header.ReturnCode)) // packed 8+8+1+4 bytes
  207. binary.LittleEndian.PutUint32(packed_buffer[25:25+4], header.Flags) // packed 8+8+1+4 bytes
  208. binary.LittleEndian.PutUint32(packed_buffer[29:29+4], LEVIN_PROTOCOL_VER_1) // packed 8+8+1+4 bytes
  209. return packed_buffer
  210. }
  211. func (header Levin_Header) Serialize() ([]byte, int) {
  212. packed_buffer := make([]byte, 33, 33)
  213. binary.LittleEndian.PutUint64(packed_buffer[0:8], LEVIN_SIGNATURE) // packed 8 bytes
  214. binary.LittleEndian.PutUint64(packed_buffer[8:16], header.CB) // packed 8 + 8 bytes
  215. if header.ReturnData {
  216. packed_buffer[16] = 1 // packed 8 + 8 + 1
  217. }
  218. binary.LittleEndian.PutUint32(packed_buffer[17:17+4], header.Command) // packed 8+8+1+4 bytes
  219. binary.LittleEndian.PutUint32(packed_buffer[21:21+4], uint32(header.ReturnCode)) // packed 8+8+1+4 bytes
  220. binary.LittleEndian.PutUint32(packed_buffer[25:25+4], header.Flags) // packed 8+8+1+4 bytes
  221. binary.LittleEndian.PutUint32(packed_buffer[29:29+4], LEVIN_PROTOCOL_VER_1) // packed 8+8+1+4 bytes
  222. return packed_buffer, len(packed_buffer)
  223. }
  224. // extract structure info from hardcoded node
  225. func (header *Levin_Header) DeSerialize(packed_buffer []byte) (err error) {
  226. if len(packed_buffer) != 33 {
  227. return fmt.Errorf("Insufficient header bytes")
  228. }
  229. header.Signature = binary.LittleEndian.Uint64(packed_buffer[0:8]) // packed 8 bytes
  230. if header.Signature != LEVIN_SIGNATURE {
  231. return fmt.Errorf("Incorrect Levin Signature")
  232. }
  233. header.CB = binary.LittleEndian.Uint64(packed_buffer[8:16]) // packed 8 + 8 bytes
  234. if packed_buffer[16] == 0 {
  235. header.ReturnData = false // packed 8 + 8 + 1
  236. } else {
  237. header.ReturnData = true // packed 8 + 8 + 1
  238. }
  239. header.Command = binary.LittleEndian.Uint32(packed_buffer[17 : 17+4]) // packed 8+8+1+4 bytes
  240. header.ReturnCode = (int32)(binary.LittleEndian.Uint32(packed_buffer[21 : 21+4])) // packed 8+8+1+4 bytes
  241. header.Flags = binary.LittleEndian.Uint32(packed_buffer[25 : 25+4]) // packed 8+8+1+4 bytes
  242. header.Protocol_Version = binary.LittleEndian.Uint32(packed_buffer[29 : 29+4]) // packed 8+8+1+4 bytes
  243. return nil
  244. }
  245. func (header Levin_Data_Header) Serialize() ([]byte, int) {
  246. var packed_buffer []byte
  247. // if nothing is to be placed
  248. if len(header.Data) == 0 {
  249. packed_buffer = make([]byte, 8+2, 8+2) // 10 bytes minimum heade
  250. binary.LittleEndian.PutUint64(packed_buffer[0:8], LEVIN_SIGNATURE_DATA) // packed 8 bytes
  251. packed_buffer[8] = 1
  252. packed_buffer[9] = 0
  253. return packed_buffer, len(packed_buffer)
  254. }
  255. packed_buffer = make([]byte, 8+2+len(header.Data), 8+2+len(header.Data))
  256. binary.LittleEndian.PutUint64(packed_buffer[0:8], LEVIN_SIGNATURE_DATA) // packed 8 bytes
  257. packed_buffer[8] = 1
  258. packed_buffer[9] = 8
  259. copy(packed_buffer[10:], header.Data)
  260. return packed_buffer, len(packed_buffer)
  261. }
  262. // extract structure info from hardcoded node
  263. func (header *Levin_Data_Header) DeSerialize(packed_buffer []byte) (err error) {
  264. if len(packed_buffer) < 10 {
  265. return fmt.Errorf("Insufficient header bytes")
  266. }
  267. header.Signature = binary.LittleEndian.Uint64(packed_buffer[0:8]) // packed 8 bytes
  268. if header.Signature != LEVIN_SIGNATURE_DATA {
  269. return fmt.Errorf("WRONG LEVIN_SIGNATURE_DATA")
  270. }
  271. if len(packed_buffer)-8 == 2 {
  272. return nil
  273. }
  274. header.Data = make([]byte, len(packed_buffer)-8+2, len(packed_buffer)-8+2)
  275. // ignore 2 bytes
  276. // packed_buffer[8]=1 // version
  277. // packed_buffer[9]=8 // boost 8 , this can be anything as per boost level
  278. copy(header.Data, packed_buffer[10:])
  279. return nil
  280. }
  281. func DeSerializeLevinHeader(packed_buffer []byte, header *Levin_Header) error {
  282. if len(packed_buffer) != 33 {
  283. return fmt.Errorf("Insufficient header bytes")
  284. }
  285. header.Signature = binary.LittleEndian.Uint64(packed_buffer[0:8]) // packed 8 bytes
  286. header.CB = binary.LittleEndian.Uint64(packed_buffer[8:16]) // packed 8 + 8 bytes
  287. if packed_buffer[16] == 0 {
  288. header.ReturnData = false // packed 8 + 8 + 1
  289. } else {
  290. header.ReturnData = true // packed 8 + 8 + 1
  291. }
  292. header.Command = binary.LittleEndian.Uint32(packed_buffer[17 : 17+4]) // packed 8+8+1+4 bytes
  293. header.ReturnCode = (int32)(binary.LittleEndian.Uint32(packed_buffer[21 : 21+4])) // packed 8+8+1+4 bytes
  294. header.Flags = binary.LittleEndian.Uint32(packed_buffer[25 : 25+4]) // packed 8+8+1+4 bytes
  295. header.Protocol_Version = binary.LittleEndian.Uint32(packed_buffer[29 : 29+4]) // packed 8+8+1+4 bytes
  296. return nil
  297. }
  298. func TestSerializeDeserialize(t *testing.T) {
  299. }