You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
8.6 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package swarm
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "os"
  7. "os/user"
  8. "strings"
  9. "time"
  10. "github.com/ethereum/go-ethereum/common/hexutil"
  11. "github.com/ethereum/go-ethereum/node"
  12. "github.com/ethereum/go-ethereum/crypto"
  13. "github.com/ethereum/go-ethereum/p2p"
  14. "github.com/ethereum/go-ethereum/log"
  15. "github.com/ethereum/go-ethereum/p2p/enode"
  16. "github.com/ethereum/go-ethereum/swarm"
  17. swarmapi "github.com/ethereum/go-ethereum/swarm/api"
  18. "github.com/ethereum/go-ethereum/swarm/network"
  19. "github.com/ethereum/go-ethereum/swarm/pss"
  20. )
  21. const (
  22. // MaxPeers is the maximum number of p2p peer connections
  23. MaxPeers = 10
  24. )
  25. // SwarmBootnodes list of bootnodes for the SWARM network
  26. var SwarmBootnodes = []string{
  27. // EF Swarm Bootnode - AWS - eu-central-1
  28. "enode://4c113504601930bf2000c29bcd98d1716b6167749f58bad703bae338332fe93cc9d9204f08afb44100dc7bea479205f5d162df579f9a8f76f8b402d339709023@3.122.203.99:30301",
  29. // EF Swarm Bootnode - AWS - us-west-2
  30. "enode://89f2ede3371bff1ad9f2088f2012984e280287a4e2b68007c2a6ad994909c51886b4a8e9e2ecc97f9910aca538398e0a5804b0ee80a187fde1ba4f32626322ba@52.35.212.179:30301",
  31. }
  32. func newNode(key *ecdsa.PrivateKey, port int, httpport int, wsport int,
  33. datadir string, modules ...string) (*node.Node, *node.Config, error) {
  34. if port == 0 {
  35. port = 30100
  36. }
  37. cfg := &node.DefaultConfig
  38. if key != nil {
  39. cfg.P2P.PrivateKey = key
  40. }
  41. cfg.P2P.MaxPeers = MaxPeers
  42. cfg.P2P.ListenAddr = fmt.Sprintf("0.0.0.0:%d", port)
  43. cfg.P2P.EnableMsgEvents = true
  44. cfg.P2P.NoDiscovery = false
  45. cfg.P2P.DiscoveryV5 = true
  46. cfg.IPCPath = datadir + "/node.ipc"
  47. cfg.DataDir = datadir
  48. if httpport > 0 {
  49. cfg.HTTPHost = node.DefaultHTTPHost
  50. cfg.HTTPPort = httpport
  51. cfg.HTTPCors = []string{"*"}
  52. }
  53. if wsport > 0 {
  54. cfg.WSHost = node.DefaultWSHost
  55. cfg.WSPort = wsport
  56. cfg.WSOrigins = []string{"*"}
  57. for i := 0; i < len(modules); i++ {
  58. cfg.WSModules = append(cfg.WSModules, modules[i])
  59. }
  60. }
  61. stack, err := node.New(cfg)
  62. if err != nil {
  63. return nil, nil, fmt.Errorf("ServiceNode create fail: %v", err)
  64. }
  65. return stack, cfg, nil
  66. }
  67. func newSwarm(privkey *ecdsa.PrivateKey, datadir string, port int) (*swarm.Swarm, *swarmapi.Config, node.ServiceConstructor) {
  68. // create swarm service
  69. swarmCfg := swarmapi.NewConfig()
  70. swarmCfg.SyncEnabled = true
  71. swarmCfg.Port = fmt.Sprintf("%d", port)
  72. swarmCfg.Path = datadir
  73. swarmCfg.HiveParams.Discovery = true
  74. swarmCfg.Discovery = true
  75. swarmCfg.Pss.MsgTTL = time.Second * 10
  76. swarmCfg.Pss.CacheTTL = time.Second * 30
  77. swarmCfg.Pss.AllowRaw = true
  78. swarmCfg.Init(privkey)
  79. swarmNode, err := swarm.NewSwarm(swarmCfg, nil)
  80. if err != nil {
  81. log.Crit("cannot crate swarm node")
  82. }
  83. // register swarm service to the node
  84. var swarmService node.ServiceConstructor = func(ctx *node.ServiceContext) (node.Service, error) {
  85. return swarmNode, nil
  86. }
  87. return swarmNode, swarmCfg, swarmService
  88. }
  89. type swarmPorts struct {
  90. WebSockets int
  91. HTTPRPC int
  92. Bzz int
  93. P2P int
  94. }
  95. func NewSwarmPorts() *swarmPorts {
  96. sp := new(swarmPorts)
  97. sp.WebSockets = 8544
  98. sp.HTTPRPC = 8543
  99. sp.Bzz = 8542
  100. sp.P2P = 31000
  101. return sp
  102. }
  103. type pssMsg struct {
  104. Msg []byte
  105. Peer *p2p.Peer
  106. Asym bool
  107. Keyid string
  108. }
  109. type pssSub struct {
  110. Unregister func()
  111. Delivery (chan pssMsg)
  112. Address string
  113. }
  114. type SimplePss struct {
  115. Node *node.Node
  116. NodeConfig *node.Config
  117. EnodeID string
  118. Datadir string
  119. Key *ecdsa.PrivateKey
  120. Pss *pss.API
  121. PssPubKey string
  122. PssAddr pss.PssAddress
  123. PssTopics map[string]*pssSub
  124. Hive *network.Hive
  125. Ports *swarmPorts
  126. }
  127. func (sn *SimplePss) SetLog(level string) error {
  128. // ensure good log formats for terminal
  129. // handle verbosity flag
  130. loglevel, err := log.LvlFromString(level)
  131. if err != nil {
  132. return err
  133. }
  134. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  135. hf := log.LvlFilterHandler(loglevel, hs)
  136. h := log.CallerFileHandler(hf)
  137. log.Root().SetHandler(h)
  138. return nil
  139. }
  140. func (sn *SimplePss) PrintStats() {
  141. // statistics thread
  142. go func() {
  143. for {
  144. if sn.Node.Server() != nil && sn.Hive != nil {
  145. addr := fmt.Sprintf("%x", sn.PssAddr)
  146. var addrs [][]byte
  147. addrs = append(addrs, []byte(addr))
  148. peerCount := sn.Node.Server().PeerCount()
  149. log.Info(fmt.Sprintf("PeerCount:%d NeighDepth:%d", peerCount, sn.Hive.NeighbourhoodDepth))
  150. }
  151. time.Sleep(time.Second * 5)
  152. }
  153. }()
  154. }
  155. func (sn *SimplePss) SetDatadir(datadir string) {
  156. sn.Datadir = datadir
  157. }
  158. func (sn *SimplePss) SetKey(key *ecdsa.PrivateKey) {
  159. sn.Key = key
  160. }
  161. func (sn *SimplePss) Init() error {
  162. var err error
  163. if len(sn.Datadir) < 1 {
  164. usr, err := user.Current()
  165. if err != nil {
  166. return err
  167. }
  168. sn.Datadir = usr.HomeDir + "/.dvote/swarm"
  169. os.MkdirAll(sn.Datadir, 0755)
  170. }
  171. sn.SetLog("info")
  172. sn.Ports = NewSwarmPorts()
  173. // create node
  174. sn.Node, sn.NodeConfig, err = newNode(sn.Key, sn.Ports.P2P,
  175. sn.Ports.HTTPRPC, sn.Ports.WebSockets, sn.Datadir, "pss")
  176. if err != nil {
  177. return err
  178. }
  179. // set node key, if not set use the storage one or generate it
  180. if sn.Key == nil {
  181. sn.Key = sn.NodeConfig.NodeKey()
  182. }
  183. // create and register Swarm service
  184. swarmNode, _, swarmHandler := newSwarm(sn.Key, sn.Datadir, sn.Ports.Bzz)
  185. err = sn.Node.Register(swarmHandler)
  186. if err != nil {
  187. return fmt.Errorf("swarm register fail %v", err)
  188. }
  189. // start the node
  190. sn.Node.Start()
  191. for _, url := range SwarmBootnodes {
  192. log.Info("Add bootnode " + url)
  193. node, _ := enode.ParseV4(url)
  194. sn.Node.Server().AddPeer(node)
  195. }
  196. // wait to connect to the p2p network
  197. _, cancel := context.WithTimeout(context.Background(), time.Second)
  198. defer cancel()
  199. time.Sleep(time.Second * 5)
  200. // Get the services API
  201. for _, a := range swarmNode.APIs() {
  202. switch a.Service.(type) {
  203. case *network.Hive:
  204. sn.Hive = a.Service.(*network.Hive)
  205. case *pss.API:
  206. sn.Pss = a.Service.(*pss.API)
  207. }
  208. }
  209. // Create topics map
  210. sn.PssTopics = make(map[string]*pssSub)
  211. // Set some extra data
  212. sn.EnodeID = sn.Node.Server().NodeInfo().Enode
  213. sn.PssPubKey = hexutil.Encode(crypto.FromECDSAPub(sn.Pss.PublicKey()))
  214. sn.PssAddr, err = sn.Pss.BaseAddr()
  215. if err != nil {
  216. return fmt.Errorf("pss API fail %v", err)
  217. }
  218. // Print some information
  219. log.Info(fmt.Sprintf("My PSS pubkey is %s", sn.PssPubKey))
  220. log.Info(fmt.Sprintf("My PSS address is %x", sn.PssAddr))
  221. // Run statistics goroutine
  222. sn.PrintStats()
  223. return nil
  224. }
  225. func strTopic(topic string) pss.Topic {
  226. return pss.BytesToTopic([]byte(topic))
  227. }
  228. func strSymKey(key string) []byte {
  229. symKey := make([]byte, 32)
  230. copy(symKey, []byte(key))
  231. return symKey
  232. }
  233. func strAddress(addr string) pss.PssAddress {
  234. var pssAddress pss.PssAddress
  235. pssAddress = []byte(addr)
  236. return pssAddress
  237. }
  238. func (sn *SimplePss) PssSub(subType, key, topic, address string) error {
  239. pssTopic := strTopic(topic)
  240. pssAddress := strAddress(address)
  241. if subType == "sym" {
  242. _, err := sn.Pss.SetSymmetricKey(strSymKey(key), pssTopic, pssAddress, true)
  243. if err != nil {
  244. return err
  245. }
  246. }
  247. sn.PssTopics[topic] = new(pssSub)
  248. sn.PssTopics[topic].Address = address
  249. sn.PssTopics[topic].Delivery = make(chan pssMsg)
  250. var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error {
  251. log.Debug("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid))
  252. sn.PssTopics[topic].Delivery <- pssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid}
  253. return nil
  254. }
  255. topicHandler := pss.NewHandler(pssHandler)
  256. if subType == "raw" {
  257. topicHandler = topicHandler.WithProxBin().WithRaw()
  258. }
  259. sn.PssTopics[topic].Unregister = sn.Pss.Register(&pssTopic, topicHandler)
  260. log.Info(fmt.Sprintf("Subscribed to [%s] topic %s", subType, pssTopic.String()))
  261. return nil
  262. }
  263. func (sn *SimplePss) PssPub(subType, key, topic, msg, address string) error {
  264. var err error
  265. dstAddr := strAddress(address)
  266. dstTopic := strTopic(topic)
  267. if subType == "sym" {
  268. symKeyId, err := sn.Pss.SetSymmetricKey(strSymKey(key), dstTopic, dstAddr, false)
  269. if err != nil {
  270. return err
  271. }
  272. // send symetric message
  273. err = sn.Pss.SendSym(symKeyId, strTopic(topic), hexutil.Bytes(msg))
  274. }
  275. if subType == "raw" {
  276. // sed raw message
  277. err = sn.Pss.SendRaw(hexutil.Bytes(address), dstTopic, hexutil.Bytes(msg))
  278. }
  279. if subType == "asym" {
  280. // add 0x prefix if not present
  281. if hasHexPrefix := strings.HasPrefix(key, "0x"); !hasHexPrefix {
  282. key = "0x" + key
  283. }
  284. // check if topic+address is already set for a pubKey
  285. _, err := sn.Pss.GetPeerAddress(key, dstTopic)
  286. if err != nil {
  287. pubKeyBytes, err := hexutil.Decode(key)
  288. if err != nil {
  289. return err
  290. }
  291. err = sn.Pss.SetPeerPublicKey(pubKeyBytes, dstTopic, dstAddr)
  292. if err != nil {
  293. return err
  294. }
  295. }
  296. // send asymetric message
  297. err = sn.Pss.SendAsym(key, dstTopic, hexutil.Bytes(msg))
  298. }
  299. return err
  300. }