You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

352 lines
8.8 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package swarm
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "fmt"
  6. "os"
  7. "os/user"
  8. "strings"
  9. "time"
  10. "github.com/ethereum/go-ethereum/common/hexutil"
  11. "github.com/ethereum/go-ethereum/node"
  12. "github.com/ethereum/go-ethereum/crypto"
  13. "github.com/ethereum/go-ethereum/p2p"
  14. "github.com/ethereum/go-ethereum/log"
  15. "github.com/ethereum/go-ethereum/p2p/enode"
  16. "github.com/ethereum/go-ethereum/swarm"
  17. swarmapi "github.com/ethereum/go-ethereum/swarm/api"
  18. "github.com/ethereum/go-ethereum/swarm/network"
  19. "github.com/ethereum/go-ethereum/swarm/pss"
  20. )
  21. const (
  22. // MaxPeers is the maximum number of p2p peer connections
  23. MaxPeers = 10
  24. )
  25. // SwarmBootnodes list of bootnodes for the SWARM network
  26. var SwarmBootnodes = []string{
  27. // EF Swarm Bootnode - AWS - eu-central-1
  28. "enode://4c113504601930bf2000c29bcd98d1716b6167749f58bad703bae338332fe93cc9d9204f08afb44100dc7bea479205f5d162df579f9a8f76f8b402d339709023@3.122.203.99:30301",
  29. // EF Swarm Bootnode - AWS - us-west-2
  30. "enode://89f2ede3371bff1ad9f2088f2012984e280287a4e2b68007c2a6ad994909c51886b4a8e9e2ecc97f9910aca538398e0a5804b0ee80a187fde1ba4f32626322ba@52.35.212.179:30301",
  31. }
  32. func newNode(key *ecdsa.PrivateKey, port int, httpport int, wsport int,
  33. datadir string, modules ...string) (*node.Node, *node.Config, error) {
  34. if port == 0 {
  35. port = 30100
  36. }
  37. cfg := &node.DefaultConfig
  38. if key != nil {
  39. cfg.P2P.PrivateKey = key
  40. }
  41. cfg.P2P.MaxPeers = MaxPeers
  42. cfg.P2P.ListenAddr = fmt.Sprintf("0.0.0.0:%d", port)
  43. cfg.P2P.EnableMsgEvents = true
  44. cfg.P2P.NoDiscovery = false
  45. cfg.P2P.DiscoveryV5 = true
  46. cfg.IPCPath = datadir + "/node.ipc"
  47. cfg.DataDir = datadir
  48. if httpport > 0 {
  49. cfg.HTTPHost = node.DefaultHTTPHost
  50. cfg.HTTPPort = httpport
  51. cfg.HTTPCors = []string{"*"}
  52. }
  53. if wsport > 0 {
  54. cfg.WSHost = node.DefaultWSHost
  55. cfg.WSPort = wsport
  56. cfg.WSOrigins = []string{"*"}
  57. for i := 0; i < len(modules); i++ {
  58. cfg.WSModules = append(cfg.WSModules, modules[i])
  59. }
  60. }
  61. stack, err := node.New(cfg)
  62. if err != nil {
  63. return nil, nil, fmt.Errorf("ServiceNode create fail: %v", err)
  64. }
  65. return stack, cfg, nil
  66. }
  67. func newSwarm(privkey *ecdsa.PrivateKey, datadir string, port int) (*swarm.Swarm, *swarmapi.Config, node.ServiceConstructor) {
  68. // create swarm service
  69. swarmCfg := swarmapi.NewConfig()
  70. swarmCfg.SyncEnabled = true
  71. swarmCfg.Port = fmt.Sprintf("%d", port)
  72. swarmCfg.Path = datadir
  73. swarmCfg.HiveParams.Discovery = true
  74. swarmCfg.Discovery = true
  75. swarmCfg.Pss.MsgTTL = time.Second * 10
  76. swarmCfg.Pss.CacheTTL = time.Second * 30
  77. swarmCfg.Pss.AllowRaw = true
  78. swarmCfg.Init(privkey)
  79. swarmNode, err := swarm.NewSwarm(swarmCfg, nil)
  80. if err != nil {
  81. log.Crit("cannot crate swarm node")
  82. }
  83. // register swarm service to the node
  84. var swarmService node.ServiceConstructor = func(ctx *node.ServiceContext) (node.Service, error) {
  85. return swarmNode, nil
  86. }
  87. return swarmNode, swarmCfg, swarmService
  88. }
  89. type swarmPorts struct {
  90. WebSockets int
  91. HTTPRPC int
  92. Bzz int
  93. P2P int
  94. }
  95. func NewSwarmPorts() *swarmPorts {
  96. sp := new(swarmPorts)
  97. sp.WebSockets = 8544
  98. sp.HTTPRPC = 8543
  99. sp.Bzz = 8542
  100. sp.P2P = 31000
  101. return sp
  102. }
  103. type pssMsg struct {
  104. Msg []byte
  105. Peer *p2p.Peer
  106. Asym bool
  107. Keyid string
  108. }
  109. type pssSub struct {
  110. Unregister func()
  111. Delivery (chan pssMsg)
  112. Address string
  113. }
  114. type SimplePss struct {
  115. Node *node.Node
  116. NodeConfig *node.Config
  117. EnodeID string
  118. Datadir string
  119. Key *ecdsa.PrivateKey
  120. Pss *pss.API
  121. PssPubKey string
  122. PssAddr pss.PssAddress
  123. PssTopics map[string]*pssSub
  124. Hive *network.Hive
  125. Ports *swarmPorts
  126. LogLevel string
  127. }
  128. func (sn *SimplePss) SetLog(level string) error {
  129. // ensure good log formats for terminal
  130. // handle verbosity flag
  131. loglevel, err := log.LvlFromString(level)
  132. if err != nil {
  133. return err
  134. }
  135. hs := log.StreamHandler(os.Stderr, log.TerminalFormat(true))
  136. hf := log.LvlFilterHandler(loglevel, hs)
  137. h := log.CallerFileHandler(hf)
  138. log.Root().SetHandler(h)
  139. return nil
  140. }
  141. func (sn *SimplePss) PrintStats() {
  142. // statistics thread
  143. go func() {
  144. for {
  145. if sn.Node.Server() != nil && sn.Hive != nil {
  146. addr := fmt.Sprintf("%x", sn.PssAddr)
  147. var addrs [][]byte
  148. addrs = append(addrs, []byte(addr))
  149. peerCount := sn.Node.Server().PeerCount()
  150. log.Info(fmt.Sprintf("PeerCount:%d NeighDepth:%d", peerCount, sn.Hive.NeighbourhoodDepth()))
  151. }
  152. time.Sleep(time.Second * 5)
  153. }
  154. }()
  155. }
  156. func (sn *SimplePss) SetDatadir(datadir string) {
  157. sn.Datadir = datadir
  158. }
  159. func (sn *SimplePss) SetKey(key *ecdsa.PrivateKey) {
  160. sn.Key = key
  161. }
  162. func (sn *SimplePss) Init() error {
  163. var err error
  164. if len(sn.Datadir) < 1 {
  165. usr, err := user.Current()
  166. if err != nil {
  167. return err
  168. }
  169. sn.Datadir = usr.HomeDir + "/.dvote/swarm"
  170. os.MkdirAll(sn.Datadir, 0755)
  171. }
  172. sn.SetLog(sn.LogLevel)
  173. // if sn.Ports are not defined, set the default config
  174. if (sn.Ports.WebSockets == 0) && (sn.Ports.HTTPRPC == 0) && (sn.Ports.Bzz == 0) && (sn.Ports.P2P == 0) {
  175. sn.Ports = NewSwarmPorts()
  176. }
  177. // create node
  178. sn.Node, sn.NodeConfig, err = newNode(sn.Key, sn.Ports.P2P,
  179. sn.Ports.HTTPRPC, sn.Ports.WebSockets, sn.Datadir, "pss")
  180. if err != nil {
  181. return err
  182. }
  183. // set node key, if not set use the storage one or generate it
  184. if sn.Key == nil {
  185. sn.Key = sn.NodeConfig.NodeKey()
  186. }
  187. // create and register Swarm service
  188. swarmNode, _, swarmHandler := newSwarm(sn.Key, sn.Datadir, sn.Ports.Bzz)
  189. err = sn.Node.Register(swarmHandler)
  190. if err != nil {
  191. return fmt.Errorf("swarm register fail %v", err)
  192. }
  193. // start the node
  194. err = sn.Node.Start()
  195. if err != nil {
  196. return err
  197. }
  198. for _, url := range SwarmBootnodes {
  199. log.Info("Add bootnode " + url)
  200. node, err := enode.ParseV4(url)
  201. if err != nil {
  202. return err
  203. }
  204. sn.Node.Server().AddPeer(node)
  205. }
  206. // wait to connect to the p2p network
  207. _, cancel := context.WithTimeout(context.Background(), time.Second)
  208. defer cancel()
  209. time.Sleep(time.Second * 5)
  210. // Get the services API
  211. for _, a := range swarmNode.APIs() {
  212. switch a.Service.(type) {
  213. case *network.Hive:
  214. sn.Hive = a.Service.(*network.Hive)
  215. case *pss.API:
  216. sn.Pss = a.Service.(*pss.API)
  217. }
  218. }
  219. // Create topics map
  220. sn.PssTopics = make(map[string]*pssSub)
  221. // Set some extra data
  222. sn.EnodeID = sn.Node.Server().NodeInfo().Enode
  223. sn.PssPubKey = hexutil.Encode(crypto.FromECDSAPub(sn.Pss.PublicKey()))
  224. sn.PssAddr, err = sn.Pss.BaseAddr()
  225. if err != nil {
  226. return fmt.Errorf("pss API fail %v", err)
  227. }
  228. // Print some information
  229. log.Info(fmt.Sprintf("My PSS pubkey is %s", sn.PssPubKey))
  230. log.Info(fmt.Sprintf("My PSS address is %x", sn.PssAddr))
  231. // Run statistics goroutine
  232. sn.PrintStats()
  233. return nil
  234. }
  235. func strTopic(topic string) pss.Topic {
  236. return pss.BytesToTopic([]byte(topic))
  237. }
  238. func strSymKey(key string) []byte {
  239. symKey := make([]byte, 32)
  240. copy(symKey, []byte(key))
  241. return symKey
  242. }
  243. func strAddress(addr string) pss.PssAddress {
  244. var pssAddress pss.PssAddress
  245. pssAddress = []byte(addr)
  246. return pssAddress
  247. }
  248. func (sn *SimplePss) PssSub(subType, key, topic, address string) error {
  249. pssTopic := strTopic(topic)
  250. pssAddress := strAddress(address)
  251. if subType == "sym" {
  252. _, err := sn.Pss.SetSymmetricKey(strSymKey(key), pssTopic, pssAddress, true)
  253. if err != nil {
  254. return err
  255. }
  256. }
  257. sn.PssTopics[topic] = new(pssSub)
  258. sn.PssTopics[topic].Address = address
  259. sn.PssTopics[topic].Delivery = make(chan pssMsg)
  260. var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error {
  261. log.Debug("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid))
  262. sn.PssTopics[topic].Delivery <- pssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid}
  263. return nil
  264. }
  265. topicHandler := pss.NewHandler(pssHandler)
  266. if subType == "raw" {
  267. topicHandler = topicHandler.WithProxBin().WithRaw()
  268. }
  269. sn.PssTopics[topic].Unregister = sn.Pss.Register(&pssTopic, topicHandler)
  270. log.Info(fmt.Sprintf("Subscribed to [%s] topic %s", subType, pssTopic.String()))
  271. return nil
  272. }
  273. func (sn *SimplePss) PssPub(subType, key, topic, msg, address string) error {
  274. var err error
  275. dstAddr := strAddress(address)
  276. dstTopic := strTopic(topic)
  277. if subType == "sym" {
  278. symKeyId, err := sn.Pss.SetSymmetricKey(strSymKey(key), dstTopic, dstAddr, false)
  279. if err != nil {
  280. return err
  281. }
  282. // send symetric message
  283. err = sn.Pss.SendSym(symKeyId, strTopic(topic), hexutil.Bytes(msg))
  284. }
  285. if subType == "raw" {
  286. // sed raw message
  287. err = sn.Pss.SendRaw(hexutil.Bytes(address), dstTopic, hexutil.Bytes(msg))
  288. }
  289. if subType == "asym" {
  290. // add 0x prefix if not present
  291. if hasHexPrefix := strings.HasPrefix(key, "0x"); !hasHexPrefix {
  292. key = "0x" + key
  293. }
  294. // check if topic+address is already set for a pubKey
  295. _, err := sn.Pss.GetPeerAddress(key, dstTopic)
  296. if err != nil {
  297. pubKeyBytes, err := hexutil.Decode(key)
  298. if err != nil {
  299. return err
  300. }
  301. err = sn.Pss.SetPeerPublicKey(pubKeyBytes, dstTopic, dstAddr)
  302. if err != nil {
  303. return err
  304. }
  305. }
  306. // send asymetric message
  307. err = sn.Pss.SendAsym(key, dstTopic, hexutil.Bytes(msg))
  308. }
  309. return err
  310. }