You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
8.8 KiB

  1. package swarm
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "os"
  7. "os/user"
  8. "strings"
  9. "time"
  10. "github.com/ethereum/go-ethereum/common/hexutil"
  11. "github.com/ethereum/go-ethereum/node"
  12. "github.com/ethereum/go-ethereum/crypto"
  13. "github.com/ethereum/go-ethereum/p2p"
  14. "github.com/ethereum/go-ethereum/log"
  15. "github.com/ethereum/go-ethereum/p2p/enode"
  16. "github.com/ethereum/go-ethereum/swarm"
  17. swarmapi "github.com/ethereum/go-ethereum/swarm/api"
  18. "github.com/ethereum/go-ethereum/swarm/network"
  19. "github.com/ethereum/go-ethereum/swarm/pss"
  20. )
  21. const (
  22. // MaxPeers is the maximum number of p2p peer connections
  23. MaxPeers = 10
  24. )
  25. // SwarmBootnodes list of bootnodes for the SWARM network
  26. var SwarmBootnodes = []string{
  27. // EF Swarm Bootnode - AWS - eu-central-1
  28. "enode://4c113504601930bf2000c29bcd98d1716b6167749f58bad703bae338332fe93cc9d9204f08afb44100dc7bea479205f5d162df579f9a8f76f8b402d339709023@3.122.203.99:30301",
  29. // EF Swarm Bootnode - AWS - us-west-2
  30. "enode://89f2ede3371bff1ad9f2088f2012984e280287a4e2b68007c2a6ad994909c51886b4a8e9e2ecc97f9910aca538398e0a5804b0ee80a187fde1ba4f32626322ba@52.35.212.179:30301",
  31. }
  32. func newNode(key *ecdsa.PrivateKey, port int, httpport int, wsport int,
  33. datadir string, modules ...string) (*node.Node, *node.Config, error) {
  34. if port == 0 {
  35. port = 30100
  36. }
  37. cfg := &node.DefaultConfig
  38. if key != nil {
  39. cfg.P2P.PrivateKey = key
  40. }
  41. cfg.P2P.MaxPeers = MaxPeers
  42. cfg.P2P.ListenAddr = fmt.Sprintf("0.0.0.0:%d", port)
  43. cfg.P2P.EnableMsgEvents = true
  44. cfg.P2P.NoDiscovery = false
  45. cfg.P2P.DiscoveryV5 = true
  46. cfg.IPCPath = datadir + "/node.ipc"
  47. cfg.DataDir = datadir
  48. if httpport > 0 {
  49. cfg.HTTPHost = node.DefaultHTTPHost
  50. cfg.HTTPPort = httpport
  51. cfg.HTTPCors = []string{"*"}
  52. }
  53. if wsport > 0 {
  54. cfg.WSHost = node.DefaultWSHost
  55. cfg.WSPort = wsport
  56. cfg.WSOrigins = []string{"*"}
  57. for i := 0; i < len(modules); i++ {
  58. cfg.WSModules = append(cfg.WSModules, modules[i])
  59. }
  60. }
  61. stack, err := node.New(cfg)
  62. if err != nil {
  63. return nil, nil, fmt.Errorf("ServiceNode create fail: %v", err)
  64. }
  65. return stack, cfg, nil
  66. }
  67. func newSwarm(privkey *ecdsa.PrivateKey, datadir string, port int) (*swarm.Swarm, *swarmapi.Config, node.ServiceConstructor) {
  68. // create swarm service
  69. swarmCfg := swarmapi.NewConfig()
  70. swarmCfg.SyncEnabled = true
  71. swarmCfg.Port = fmt.Sprintf("%d", port)
  72. swarmCfg.Path = datadir
  73. swarmCfg.HiveParams.Discovery = true
  74. swarmCfg.Discovery = true
  75. swarmCfg.Pss.MsgTTL = time.Second * 10
  76. swarmCfg.Pss.CacheTTL = time.Second * 30
  77. swarmCfg.Pss.AllowRaw = true
  78. swarmCfg.Init(privkey)
  79. swarmNode, err := swarm.NewSwarm(swarmCfg, nil)
  80. if err != nil {
  81. log.Crit("cannot crate swarm node")
  82. }
  83. // register swarm service to the node
  84. var swarmService node.ServiceConstructor = func(ctx *node.ServiceContext) (node.Service, error) {
  85. return swarmNode, nil
  86. }
  87. return swarmNode, swarmCfg, swarmService
  88. }
  89. type swarmPorts struct {
  90. WebSockets int
  91. HTTPRPC int
  92. Bzz int
  93. P2P int
  94. }
  95. func NewSwarmPorts() *swarmPorts {
  96. sp := new(swarmPorts)
  97. sp.WebSockets = 8544
  98. sp.HTTPRPC = 8543
  99. sp.Bzz = 8542
  100. sp.P2P = 31000
  101. return sp
  102. }
  103. type pssSub struct {
  104. Unregister func()
  105. Delivery (chan []byte)
  106. Address string
  107. }
  108. type SwarmNet struct {
  109. Node *node.Node
  110. NodeConfig *node.Config
  111. EnodeID string
  112. Datadir string
  113. Key *ecdsa.PrivateKey
  114. Pss *pss.API
  115. PssPubKey string
  116. PssAddr pss.PssAddress
  117. PssTopics map[string]*pssSub
  118. Hive *network.Hive
  119. Ports *swarmPorts
  120. }
  121. func (sn *SwarmNet) SetLog(level string) error {
  122. // ensure good log formats for terminal
  123. // handle verbosity flag
  124. loglevel, err := log.LvlFromString(level)
  125. if err != nil {
  126. return err
  127. }
  128. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  129. hf := log.LvlFilterHandler(loglevel, hs)
  130. h := log.CallerFileHandler(hf)
  131. log.Root().SetHandler(h)
  132. return nil
  133. }
  134. func (sn *SwarmNet) PrintStats() {
  135. // statistics thread
  136. go func() {
  137. for {
  138. if sn.Node.Server() != nil && sn.Hive != nil {
  139. addr := fmt.Sprintf("%x", sn.PssAddr)
  140. var addrs [][]byte
  141. addrs = append(addrs, []byte(addr))
  142. peerCount := sn.Node.Server().PeerCount()
  143. log.Info(fmt.Sprintf("PeerCount:%d NeighDepth:%d", peerCount, sn.Hive.NeighbourhoodDepth))
  144. }
  145. time.Sleep(time.Second * 5)
  146. }
  147. }()
  148. }
  149. func (sn *SwarmNet) SetDatadir(datadir string) {
  150. sn.Datadir = datadir
  151. }
  152. func (sn *SwarmNet) SetKey(key *ecdsa.PrivateKey) {
  153. sn.Key = key
  154. }
  155. func (sn *SwarmNet) Init() error {
  156. var err error
  157. if len(sn.Datadir) < 1 {
  158. usr, err := user.Current()
  159. if err != nil {
  160. return err
  161. }
  162. sn.Datadir = usr.HomeDir + "/.dvote/swarm"
  163. os.MkdirAll(sn.Datadir, 0755)
  164. }
  165. sn.SetLog("info")
  166. sn.Ports = NewSwarmPorts()
  167. // create node
  168. sn.Node, sn.NodeConfig, err = newNode(sn.Key, sn.Ports.P2P,
  169. sn.Ports.HTTPRPC, sn.Ports.WebSockets, sn.Datadir, "pss")
  170. if err != nil {
  171. return err
  172. }
  173. // set node key, if not set use the storage one or generate it
  174. if sn.Key == nil {
  175. sn.Key = sn.NodeConfig.NodeKey()
  176. }
  177. // create and register Swarm service
  178. swarmNode, _, swarmHandler := newSwarm(sn.Key, sn.Datadir, sn.Ports.Bzz)
  179. err = sn.Node.Register(swarmHandler)
  180. if err != nil {
  181. return fmt.Errorf("swarm register fail %v", err)
  182. }
  183. // start the node
  184. sn.Node.Start()
  185. for _, url := range SwarmBootnodes {
  186. log.Info("Add bootnode " + url)
  187. node, _ := enode.ParseV4(url)
  188. sn.Node.Server().AddPeer(node)
  189. }
  190. // wait to connect to the p2p network
  191. _, cancel := context.WithTimeout(context.Background(), time.Second)
  192. defer cancel()
  193. time.Sleep(time.Second * 5)
  194. // Get the services API
  195. for _, a := range swarmNode.APIs() {
  196. switch a.Service.(type) {
  197. case *network.Hive:
  198. sn.Hive = a.Service.(*network.Hive)
  199. case *pss.API:
  200. sn.Pss = a.Service.(*pss.API)
  201. }
  202. }
  203. // Create topics map
  204. sn.PssTopics = make(map[string]*pssSub)
  205. // Set some extra data
  206. sn.EnodeID = sn.Node.Server().NodeInfo().Enode
  207. sn.PssPubKey = hexutil.Encode(crypto.FromECDSAPub(sn.Pss.PublicKey()))
  208. sn.PssAddr, err = sn.Pss.BaseAddr()
  209. if err != nil {
  210. return fmt.Errorf("pss API fail %v", err)
  211. }
  212. // Print some information
  213. log.Info(fmt.Sprintf("My PSS pubkey is %s", sn.PssPubKey))
  214. log.Info(fmt.Sprintf("My PSS address is %x", sn.PssAddr))
  215. // Run statistics goroutine
  216. sn.PrintStats()
  217. return nil
  218. }
  219. func strTopic(topic string) pss.Topic {
  220. return pss.BytesToTopic([]byte(topic))
  221. }
  222. func strSymKey(key string) []byte {
  223. symKey := make([]byte, 32)
  224. copy(symKey, []byte(key))
  225. return symKey
  226. }
  227. func strAddress(addr string) pss.PssAddress {
  228. var pssAddress pss.PssAddress
  229. pssAddress = []byte(addr)
  230. return pssAddress
  231. }
  232. func (sn *SwarmNet) PssSub(subType, key, topic, address string) error {
  233. pssTopic := strTopic(topic)
  234. pssAddress := strAddress(address)
  235. switch subType {
  236. case "sym":
  237. _, err := sn.Pss.SetSymmetricKey(strSymKey(key), pssTopic, pssAddress, true)
  238. if err != nil {
  239. return err
  240. }
  241. }
  242. sn.PssTopics[topic] = new(pssSub)
  243. sn.PssTopics[topic].Address = address
  244. sn.PssTopics[topic].Delivery = make(chan []byte)
  245. var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error {
  246. //log.Info("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid))
  247. sn.PssTopics[topic].Delivery <- msg
  248. return nil
  249. }
  250. topicHandler := pss.NewHandler(pssHandler)
  251. sn.PssTopics[topic].Unregister = sn.Pss.Register(&pssTopic, topicHandler)
  252. log.Info(fmt.Sprintf("Subscribed to topic %s", pssTopic.String()))
  253. return nil
  254. }
  255. func (sn *SwarmNet) PssPub(subType, key, topic, msg, address string) error {
  256. var err error
  257. dstAddr := strAddress(address)
  258. dstTopic := strTopic(topic)
  259. if subType == "sym" {
  260. symKeyId, err := sn.Pss.SetSymmetricKey(strSymKey(key), dstTopic, dstAddr, false)
  261. if err != nil {
  262. return err
  263. }
  264. err = sn.Pss.SendSym(symKeyId, strTopic(topic), hexutil.Bytes(msg))
  265. }
  266. if subType == "raw" {
  267. err = sn.Pss.SendRaw(hexutil.Bytes(dstAddr), dstTopic, hexutil.Bytes(msg))
  268. }
  269. if subType == "asym" {
  270. if hasHexPrefix := strings.HasPrefix(key, "0x"); !hasHexPrefix {
  271. key = "0x" + key
  272. }
  273. topics, addresses, err := sn.Pss.GetPublickeyPeers(key)
  274. if err != nil {
  275. return err
  276. }
  277. topicFound := false
  278. for i, t := range topics {
  279. if dstTopic == t && fmt.Sprintf("%x", addresses[i]) == fmt.Sprintf("%x", dstAddr) {
  280. topicFound = true
  281. break
  282. }
  283. }
  284. if !topicFound {
  285. pubKeyBytes, err := hexutil.Decode(key)
  286. if err != nil {
  287. return err
  288. }
  289. err = sn.Pss.SetPeerPublicKey(pubKeyBytes, dstTopic, dstAddr)
  290. if err != nil {
  291. return err
  292. }
  293. }
  294. err = sn.Pss.SendAsym(key, dstTopic, hexutil.Bytes(msg))
  295. }
  296. return err
  297. }
  298. func (sn *SwarmNet) Test() error {
  299. sn.PssSub("sym", "vocdoni", "vocdoni_test", "")
  300. go func() {
  301. for {
  302. msg := <-sn.PssTopics["vocdoni_test"].Delivery
  303. fmt.Printf("Pss received: %s\n", msg)
  304. }
  305. }()
  306. hostname, _ := os.Hostname()
  307. for {
  308. err := sn.PssPub("sym", "vocdoni", "vocdoni_test", fmt.Sprintf("Hello world from %s", hostname), "")
  309. log.Info("pss sent", "err", err)
  310. time.Sleep(10 * time.Second)
  311. }
  312. }