Browse Source

node rpc Start & Ping, add rpc-test (client), add ID marshalers

master
arnaucube 5 years ago
parent
commit
0e39e2facb
9 changed files with 192 additions and 3 deletions
  1. +4
    -1
      cmd/cmd.go
  2. +9
    -1
      config.yaml
  3. +25
    -0
      config/config.go
  4. +18
    -0
      kademlia/id.go
  5. +22
    -0
      kademlia/id_test.go
  6. +6
    -0
      kademlia/kademlia.go
  7. +2
    -0
      main.go
  8. +41
    -1
      node/node.go
  9. +65
    -0
      rpc-test/test.go

+ 4
- 1
cmd/cmd.go

@ -1,6 +1,7 @@
package cmd package cmd
import ( import (
"fmt"
"go-dht/config" "go-dht/config"
"go-dht/node" "go-dht/node"
@ -20,6 +21,7 @@ func cmdStart(c *cli.Context) error {
if err := config.MustRead(c); err != nil { if err := config.MustRead(c); err != nil {
return err return err
} }
fmt.Println(config.C)
var n node.Node var n node.Node
var err error var err error
@ -36,5 +38,6 @@ func cmdStart(c *cli.Context) error {
} }
log.Info("New node created with ID: ", n.ID()) log.Info("New node created with ID: ", n.ID())
} }
return nil
err = n.Start()
return err
} }

+ 9
- 1
config.yaml

@ -1,2 +1,10 @@
id: "1564a997c00e9b168705f9ca9d87f4ba5efefef7"
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000 port: 5000
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5001
- id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
addr: 127.0.0.1
port: 5002

+ 25
- 0
config/config.go

@ -1,12 +1,23 @@
package config package config
import ( import (
"go-dht/kademlia"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/urfave/cli" "github.com/urfave/cli"
) )
type Config struct { type Config struct {
ID string
Addr string
Port string
KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"`
KnownNodes []kademlia.ListedNode `mapstructure:"-"`
}
type KnownNodeStr struct {
ID string ID string
Addr string
Port string Port string
} }
@ -27,5 +38,19 @@ func MustRead(c *cli.Context) error {
if err := viper.Unmarshal(&C); err != nil { if err := viper.Unmarshal(&C); err != nil {
return err return err
} }
for _, v := range C.KnownNodesStr {
id, err := kademlia.IDFromString(v.ID)
if err != nil {
return err
}
kn := kademlia.ListedNode{
ID: id,
Addr: v.Addr,
Port: v.Port,
}
C.KnownNodes = append(C.KnownNodes, kn)
}
C.KnownNodesStr = []KnownNodeStr{}
return nil return nil
} }

+ 18
- 0
kademlia/id.go

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"fmt"
) )
type ID [B]byte type ID [B]byte
@ -23,6 +24,23 @@ func (id ID) String() string {
return hex.EncodeToString(id[:]) return hex.EncodeToString(id[:])
} }
func (id ID) MarshalText() ([]byte, error) {
return []byte(hex.EncodeToString(id[:])), nil
}
func (id *ID) UnmarshalText(data []byte) error {
fmt.Println("UNMARSHAL")
fmt.Println("d", string(data))
var err error
var idFromStr ID
idFromStr, err = IDFromString(string(data))
if err != nil {
return err
}
copy(id[:], idFromStr[:])
return nil
}
func IDFromString(s string) (ID, error) { func IDFromString(s string) (ID, error) {
b, err := hex.DecodeString(s) b, err := hex.DecodeString(s)
if err != nil { if err != nil {

+ 22
- 0
kademlia/id_test.go

@ -1,6 +1,8 @@
package kademlia package kademlia
import ( import (
"encoding/json"
"fmt"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -19,6 +21,26 @@ func TestNewID(t *testing.T) {
assert.Equal(t, "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7", idA.String()) assert.Equal(t, "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7", idA.String())
} }
func TestIDMarshalers(t *testing.T) {
id, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7")
assert.Nil(t, err)
idStr, err := json.Marshal(id)
assert.Nil(t, err)
assert.Equal(t, "\"0fd85ddddf15aeec2d5d8b01b013dbca030a18d7\"", string(idStr))
fmt.Println("idStr", string(idStr))
var idParsed ID
err = json.Unmarshal(idStr, &idParsed)
assert.Nil(t, err)
assert.Equal(t, id, idParsed)
var idParsed2 ID
err = json.Unmarshal([]byte("\"0fd85ddddf15aeec2d5d8b01b013dbca030a18d7\""), &idParsed2)
assert.Nil(t, err)
assert.Equal(t, id, idParsed2)
}
func TestIDCmp(t *testing.T) { func TestIDCmp(t *testing.T) {
idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7") idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7")
assert.Nil(t, err) assert.Nil(t, err)

+ 6
- 0
kademlia/kademlia.go

@ -3,6 +3,8 @@ package kademlia
import ( import (
"math/bits" "math/bits"
"strconv" "strconv"
log "github.com/sirupsen/logrus"
) )
const ( const (
@ -14,6 +16,7 @@ const (
type ListedNode struct { type ListedNode struct {
ID ID ID ID
Addr string Addr string
Port string
} }
type Kademlia struct { type Kademlia struct {
@ -60,6 +63,7 @@ func (kad *Kademlia) Update(o ListedNode) {
kb := kad.KBuckets[k] kb := kad.KBuckets[k]
if len(kb) >= KBucketSize { if len(kb) >= KBucketSize {
// if n.KBuckets[k] is alrady full, perform ping of the first element // if n.KBuckets[k] is alrady full, perform ping of the first element
log.Debug("node.KBuckets[k] already full, performing ping to node.KBuckets[0]")
kad.Ping(k, o) kad.Ping(k, o)
return return
} }
@ -68,10 +72,12 @@ func (kad *Kademlia) Update(o ListedNode) {
if exist { if exist {
// update position of o to the bottom // update position of o to the bottom
kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos) kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos)
log.Debug("ListedNode already exists, moved to bottom")
return return
} }
// not exists, add it to the kBucket // not exists, add it to the kBucket
kad.KBuckets[k] = append(kad.KBuckets[k], o) kad.KBuckets[k] = append(kad.KBuckets[k], o)
log.Debug("ListedNode not exists, added to the bottom")
return return
} }

+ 2
- 0
main.go

@ -10,6 +10,8 @@ import (
) )
func main() { func main() {
log.SetLevel(log.DebugLevel)
app := cli.NewApp() app := cli.NewApp()
app.Name = "go-dht" app.Name = "go-dht"
app.Version = "0.0.1-alpha" app.Version = "0.0.1-alpha"

+ 41
- 1
node/node.go

@ -1,7 +1,13 @@
package node package node
import ( import (
"go-dht/config"
"go-dht/kademlia" "go-dht/kademlia"
"net"
"net/http"
"net/rpc"
log "github.com/sirupsen/logrus"
) )
type Node struct { type Node struct {
@ -34,22 +40,56 @@ func (n Node) ID() kademlia.ID {
return n.kademlia.ID return n.kademlia.ID
} }
func (n *Node) Start() error {
err := rpc.Register(n)
if err != nil {
return err
}
rpc.HandleHTTP()
listener, err := net.Listen("tcp", ":"+config.C.Port)
if err != nil {
return err
}
err = http.Serve(listener, nil)
if err != nil {
return err
}
// TODO ping config.C.KnownNodes
return nil
}
// Exposed RPC calls: Ping, Store, FindNode, FindValue // Exposed RPC calls: Ping, Store, FindNode, FindValue
func (n *Node) Ping(ln kademlia.ListedNode, ack *bool) error {
func (n *Node) Ping(ln kademlia.ListedNode, thisLn *kademlia.ListedNode) error {
log.Info("[rpc] PING from ", ln.ID)
n.kademlia.Update(ln)
*thisLn = kademlia.ListedNode{
ID: n.ID(),
Addr: config.C.Addr,
Port: config.C.Port,
}
// TODO perform PONG call to the requester (maybe ping&pong can be unified)
return nil
}
func (n *Node) Pong(ln kademlia.ListedNode, ack *bool) error {
log.Info("[rpc] PONG")
n.kademlia.Update(ln) n.kademlia.Update(ln)
return nil return nil
} }
func (n *Node) Store(data []byte, ack *bool) error { func (n *Node) Store(data []byte, ack *bool) error {
log.Info("[rpc] STORE")
return nil return nil
} }
func (n *Node) FindNode() { func (n *Node) FindNode() {
log.Info("[rpc] FIND_NODE")
} }
func (n *Node) FindValue() { func (n *Node) FindValue() {
log.Info("[rpc] FIND_VALUE")
} }

+ 65
- 0
rpc-test/test.go

@ -0,0 +1,65 @@
package main
import (
"fmt"
"go-dht/kademlia"
"log"
"net/rpc"
)
// Utility to test the Node RPC methods
func prepareTestListedNodes() []kademlia.ListedNode {
lnIDs := []string{
"c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b",
"420bfebd44fc62615253224328f40f29c9b225fa",
"6272bb67b1661abfa07d1d32cd9b810e531d42cd",
"07db608db033384895e48098a1bbe25266387463",
"c19c3285ab9ada4b420050ae1a204640b2bed508",
"f8971d5da24517c8cc5a316fb0658de8906c4155",
"04122a5f2dec42284147b1847ec1bc41ecd78626",
"407a90870d7b482a271446c85fda940ce78a4c7a",
"5ebe4539e7a33721a8623f7ebfab62600aa503e7",
"fc44a42879ef3a74d6bd8303bc3e4e205a92acf9",
"fc44a42879ef3a74d6bd8303bc3e4e205a92acf9",
"10c86f96ebaa1685a46a6417e6faa2ef34a68976",
"243c81515196a5b0e2d4675e73f0da3eead12793",
"0fd85ddddf15aeec2d5d8b01b013dbca030a18d7",
"0fd85ddddf15aeec2d5d8b01b013dbca030a18d5",
"0fd85ddddf15aeec2d5d8b01b013dbca030a18d0",
"0fd85ddddf15aeec2d5d8b01b013dbca030a1800",
"0750931c40a52a2facd220a02851f7d34f95e1fa",
"ebfba615ac50bcd0f5c2420741d032e885abf576",
}
var lns []kademlia.ListedNode
for i := 0; i < len(lnIDs); i++ {
idI, err := kademlia.IDFromString(lnIDs[i])
if err != nil {
panic(err)
}
lnI := kademlia.ListedNode{
ID: idI,
Addr: "",
}
lns = append(lns, lnI)
}
return lns
}
func main() {
lns := prepareTestListedNodes()
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil {
log.Fatal("Connection error: ", err)
}
var reply kademlia.ListedNode
for _, ln := range lns {
err = client.Call("Node.Ping", ln, &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
}
}

Loading…
Cancel
Save